
These codes are not useful. Change-Id: Id2ce428fef37794dba4772aedaed6a53ac10b934 Closes-bug: #1529541
630 lines
24 KiB
Python
630 lines
24 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import json
|
|
|
|
from django.utils.translation import ugettext_lazy as _
|
|
import six
|
|
|
|
from horizon import exceptions
|
|
from horizon import forms
|
|
from horizon import workflows
|
|
|
|
from sahara_dashboard.api import sahara as saharaclient
|
|
import sahara_dashboard.content.data_processing. \
|
|
cluster_templates.workflows.create as t_flows
|
|
import sahara_dashboard.content.data_processing. \
|
|
clusters.workflows.create as c_flow
|
|
import sahara_dashboard.content.data_processing. \
|
|
utils.workflow_helpers as whelpers
|
|
|
|
|
|
DATA_SOURCE_CREATE_URL = ("horizon:project:data_processing.data_sources"
|
|
":create-data-source")
|
|
|
|
|
|
class JobExecutionGeneralConfigAction(workflows.Action):
|
|
job_input = forms.DynamicChoiceField(
|
|
label=_("Input"),
|
|
initial=(None, "None"),
|
|
add_item_link=DATA_SOURCE_CREATE_URL,
|
|
required=False)
|
|
|
|
job_output = forms.DynamicChoiceField(
|
|
label=_("Output"),
|
|
initial=(None, "None"),
|
|
add_item_link=DATA_SOURCE_CREATE_URL,
|
|
required=False)
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
super(JobExecutionGeneralConfigAction, self).__init__(request,
|
|
*args,
|
|
**kwargs)
|
|
|
|
if request.REQUEST.get("job_id", None) is None:
|
|
self.fields["job"] = forms.ChoiceField(
|
|
label=_("Job"))
|
|
self.fields["job"].choices = self.populate_job_choices(request)
|
|
else:
|
|
self.fields["job"] = forms.CharField(
|
|
widget=forms.HiddenInput(),
|
|
initial=request.REQUEST.get("job_id", None))
|
|
|
|
def populate_job_input_choices(self, request, context):
|
|
return self.get_data_source_choices(request, context)
|
|
|
|
def populate_job_output_choices(self, request, context):
|
|
return self.get_data_source_choices(request, context)
|
|
|
|
def get_data_source_choices(self, request, context):
|
|
try:
|
|
data_sources = saharaclient.data_source_list(request)
|
|
except Exception:
|
|
data_sources = []
|
|
exceptions.handle(request,
|
|
_("Unable to fetch data sources."))
|
|
|
|
choices = [(data_source.id, data_source.name)
|
|
for data_source in data_sources]
|
|
choices.insert(0, (None, 'None'))
|
|
|
|
return choices
|
|
|
|
def populate_job_choices(self, request):
|
|
try:
|
|
jobs = saharaclient.job_list(request)
|
|
except Exception:
|
|
jobs = []
|
|
exceptions.handle(request,
|
|
_("Unable to fetch jobs."))
|
|
|
|
choices = [(job.id, job.name)
|
|
for job in jobs]
|
|
|
|
return choices
|
|
|
|
class Meta(object):
|
|
name = _("Job")
|
|
help_text_template = (
|
|
"project/data_processing.jobs/_launch_job_help.html")
|
|
|
|
|
|
class JobExecutionExistingGeneralConfigAction(JobExecutionGeneralConfigAction):
|
|
cluster = forms.ChoiceField(
|
|
label=_("Cluster"),
|
|
initial=(None, "None"),
|
|
widget=forms.Select(attrs={"class": "cluster_choice"}))
|
|
|
|
def populate_cluster_choices(self, request, context):
|
|
try:
|
|
clusters = saharaclient.cluster_list(request)
|
|
except Exception:
|
|
clusters = []
|
|
exceptions.handle(request,
|
|
_("Unable to fetch clusters."))
|
|
|
|
choices = [(cluster.id, cluster.name)
|
|
for cluster in clusters]
|
|
|
|
return choices
|
|
|
|
class Meta(object):
|
|
name = _("Job")
|
|
help_text_template = (
|
|
"project/data_processing.jobs/_launch_job_help.html")
|
|
|
|
|
|
def _merge_interface_with_configs(interface, job_configs):
|
|
interface_by_mapping = {(arg['mapping_type'], arg['location']): arg
|
|
for arg in interface}
|
|
mapped_types = ("configs", "params")
|
|
mapped_configs = {
|
|
(mapping_type, key): value for mapping_type in mapped_types
|
|
for key, value in job_configs.get(mapping_type, {}).items()
|
|
}
|
|
for index, arg in enumerate(job_configs.get('args', [])):
|
|
mapped_configs['args', str(index)] = arg
|
|
free_arguments, interface_arguments = {}, {}
|
|
for mapping, value in mapped_configs.items():
|
|
if mapping in interface_by_mapping:
|
|
arg = interface_by_mapping[mapping]
|
|
interface_arguments[arg['id']] = value
|
|
else:
|
|
free_arguments[mapping] = value
|
|
configs = {"configs": {}, "params": {}, "args": {}}
|
|
for mapping, value in free_arguments.items():
|
|
mapping_type, location = mapping
|
|
configs[mapping_type][location] = value
|
|
configs["args"] = [
|
|
value for key, value in sorted(configs["args"].items(),
|
|
key=lambda x: int(x[0]))]
|
|
return configs, interface_arguments
|
|
|
|
|
|
class JobConfigAction(workflows.Action):
|
|
MAIN_CLASS = "edp.java.main_class"
|
|
JAVA_OPTS = "edp.java.java_opts"
|
|
EDP_MAPPER = "edp.streaming.mapper"
|
|
EDP_REDUCER = "edp.streaming.reducer"
|
|
EDP_PREFIX = "edp."
|
|
EDP_HBASE_COMMON_LIB = "edp.hbase_common_lib"
|
|
EDP_ADAPT_FOR_OOZIE = "edp.java.adapt_for_oozie"
|
|
EDP_ADAPT_SPARK_SWIFT = "edp.spark.adapt_for_swift"
|
|
EDP_SUBST_DATASOURCE_NAME = "edp.substitute_data_source_for_name"
|
|
EDP_SUBST_DATASOURCE_UUID = "edp.substitute_data_source_for_uuid"
|
|
|
|
property_name = forms.ChoiceField(
|
|
required=False,
|
|
)
|
|
|
|
job_configs = forms.CharField(
|
|
required=False,
|
|
widget=forms.HiddenInput())
|
|
|
|
job_params = forms.CharField(
|
|
required=False,
|
|
widget=forms.HiddenInput())
|
|
|
|
job_args_array = forms.CharField(
|
|
required=False,
|
|
widget=forms.HiddenInput())
|
|
|
|
job_type = forms.CharField(
|
|
required=False,
|
|
widget=forms.HiddenInput())
|
|
|
|
main_class = forms.CharField(label=_("Main Class"),
|
|
required=False)
|
|
|
|
java_opts = forms.CharField(label=_("Java Opts"),
|
|
required=False)
|
|
|
|
streaming_mapper = forms.CharField(label=_("Mapper"))
|
|
|
|
streaming_reducer = forms.CharField(label=_("Reducer"))
|
|
|
|
hbase_common_lib = forms.BooleanField(
|
|
label=_("Use HBase Common library"),
|
|
help_text=_("Run HBase EDP Jobs with common HBase library on HDFS"),
|
|
required=False, initial=True)
|
|
|
|
adapt_oozie = forms.BooleanField(
|
|
label=_("Adapt For Oozie"),
|
|
help_text=_("Automatically modify the Hadoop configuration"
|
|
" so that job config values are set and so that"
|
|
" Oozie will handle exit codes correctly."),
|
|
required=False, initial=True)
|
|
|
|
adapt_spark_swift = forms.BooleanField(
|
|
label=_("Enable Swift Paths"),
|
|
help_text=_("Modify the configuration so that swift URLs can "
|
|
"be dereferenced through HDFS at runtime."),
|
|
required=False, initial=True)
|
|
|
|
datasource_substitute = forms.BooleanField(
|
|
label=_("Use Data Source Substitution for Names and UUIDs"),
|
|
help_text=_("Substitute data source objects for URLs of "
|
|
"the form datasource://name or uuid."),
|
|
required=False, initial=True)
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
super(JobConfigAction, self).__init__(request, *args, **kwargs)
|
|
job_ex_id = request.REQUEST.get("job_execution_id")
|
|
if job_ex_id is not None:
|
|
job_ex = saharaclient.job_execution_get(request, job_ex_id)
|
|
job = saharaclient.job_get(request, job_ex.job_id)
|
|
job_configs, interface_args = _merge_interface_with_configs(
|
|
job.interface, job_ex.job_configs)
|
|
edp_configs = {}
|
|
|
|
if 'configs' in job_configs:
|
|
configs, edp_configs = (
|
|
self.clean_edp_configs(job_configs['configs']))
|
|
self.fields['job_configs'].initial = (
|
|
json.dumps(configs))
|
|
|
|
if 'params' in job_configs:
|
|
self.fields['job_params'].initial = (
|
|
json.dumps(job_configs['params']))
|
|
|
|
if 'args' in job_configs:
|
|
self.fields['job_args_array'].initial = (
|
|
json.dumps(job_configs['args']))
|
|
|
|
if self.MAIN_CLASS in edp_configs:
|
|
self.fields['main_class'].initial = (
|
|
edp_configs[self.MAIN_CLASS])
|
|
if self.JAVA_OPTS in edp_configs:
|
|
self.fields['java_opts'].initial = (
|
|
edp_configs[self.JAVA_OPTS])
|
|
|
|
if self.EDP_MAPPER in edp_configs:
|
|
self.fields['streaming_mapper'].initial = (
|
|
edp_configs[self.EDP_MAPPER])
|
|
if self.EDP_REDUCER in edp_configs:
|
|
self.fields['streaming_reducer'].initial = (
|
|
edp_configs[self.EDP_REDUCER])
|
|
if self.EDP_HBASE_COMMON_LIB in edp_configs:
|
|
self.fields['hbase_common_lib'].initial = (
|
|
edp_configs[self.EDP_HBASE_COMMON_LIB])
|
|
if self.EDP_ADAPT_FOR_OOZIE in edp_configs:
|
|
self.fields['adapt_oozie'].initial = (
|
|
edp_configs[self.EDP_ADAPT_FOR_OOZIE])
|
|
if self.EDP_ADAPT_SPARK_SWIFT in edp_configs:
|
|
self.fields['adapt_spark_swift'].initial = (
|
|
edp_configs[self.EDP_ADAPT_SPARK_SWIFT])
|
|
if (self.EDP_SUBST_DATASOURCE_NAME in edp_configs or
|
|
self.EDP_SUBST_DATASOURCE_UUID in edp_configs):
|
|
self.fields['datasource_substitute'].initial = (
|
|
edp_configs.get(self.EDP_SUBST_DATASOURCE_UUID, True)
|
|
or
|
|
edp_configs.get(self.EDP_SUBST_DATASOURCE_NAME, True))
|
|
|
|
def clean(self):
|
|
cleaned_data = super(workflows.Action, self).clean()
|
|
job_type = cleaned_data.get("job_type", None)
|
|
|
|
if job_type != "MapReduce.Streaming":
|
|
if "streaming_mapper" in self._errors:
|
|
del self._errors["streaming_mapper"]
|
|
if "streaming_reducer" in self._errors:
|
|
del self._errors["streaming_reducer"]
|
|
|
|
return cleaned_data
|
|
|
|
def populate_property_name_choices(self, request, context):
|
|
job_id = request.REQUEST.get("job_id") or request.REQUEST.get("job")
|
|
job_type = saharaclient.job_get(request, job_id).type
|
|
job_configs = (
|
|
saharaclient.job_get_configs(request, job_type).job_config)
|
|
choices = [(param['value'], param['name'])
|
|
for param in job_configs['configs']]
|
|
return choices
|
|
|
|
def clean_edp_configs(self, configs):
|
|
edp_configs = {}
|
|
for key, value in six.iteritems(configs):
|
|
if key.startswith(self.EDP_PREFIX):
|
|
edp_configs[key] = value
|
|
for rmkey in edp_configs.keys():
|
|
# remove all configs handled via other controls
|
|
# so they do not show up in the free entry inputs
|
|
if rmkey in [self.EDP_HBASE_COMMON_LIB,
|
|
self.EDP_MAPPER,
|
|
self.EDP_REDUCER,
|
|
self.MAIN_CLASS,
|
|
self.JAVA_OPTS,
|
|
self.EDP_ADAPT_FOR_OOZIE,
|
|
self.EDP_ADAPT_SPARK_SWIFT,
|
|
self.EDP_SUBST_DATASOURCE_UUID,
|
|
self.EDP_SUBST_DATASOURCE_NAME, ]:
|
|
del configs[rmkey]
|
|
return (configs, edp_configs)
|
|
|
|
class Meta(object):
|
|
name = _("Configure")
|
|
help_text_template = (
|
|
"project/data_processing.jobs/_launch_job_configure_help.html")
|
|
|
|
|
|
class JobExecutionGeneralConfig(workflows.Step):
|
|
action_class = JobExecutionGeneralConfigAction
|
|
|
|
def contribute(self, data, context):
|
|
for k, v in data.items():
|
|
if k in ["job_input", "job_output"]:
|
|
context["job_general_" + k] = None if (v in [None, ""]) else v
|
|
else:
|
|
context["job_general_" + k] = v
|
|
|
|
return context
|
|
|
|
|
|
class JobExecutionExistingGeneralConfig(workflows.Step):
|
|
action_class = JobExecutionExistingGeneralConfigAction
|
|
|
|
def contribute(self, data, context):
|
|
for k, v in data.items():
|
|
if k in ["job_input", "job_output"]:
|
|
context["job_general_" + k] = None if (v in [None, ""]) else v
|
|
else:
|
|
context["job_general_" + k] = v
|
|
|
|
return context
|
|
|
|
|
|
class JobConfig(workflows.Step):
|
|
action_class = JobConfigAction
|
|
template_name = 'project/data_processing.jobs/config_template.html'
|
|
|
|
def contribute(self, data, context):
|
|
job_config = self.clean_configs(
|
|
json.loads(data.get("job_configs", '{}')))
|
|
job_params = self.clean_configs(
|
|
json.loads(data.get("job_params", '{}')))
|
|
job_args_array = self.clean_configs(
|
|
json.loads(data.get("job_args_array", '[]')))
|
|
job_type = data.get("job_type", '')
|
|
|
|
context["job_type"] = job_type
|
|
context["job_config"] = {"configs": job_config}
|
|
context["job_config"]["args"] = job_args_array
|
|
|
|
context["job_config"]["configs"][
|
|
JobConfigAction.EDP_SUBST_DATASOURCE_UUID] = (
|
|
data.get("datasource_substitute", True))
|
|
context["job_config"]["configs"][
|
|
JobConfigAction.EDP_SUBST_DATASOURCE_NAME] = (
|
|
data.get("datasource_substitute", True))
|
|
|
|
if job_type in ["Java", "Spark", "Storm"]:
|
|
context["job_config"]["configs"][JobConfigAction.MAIN_CLASS] = (
|
|
data.get("main_class", ""))
|
|
context["job_config"]["configs"][JobConfigAction.JAVA_OPTS] = (
|
|
data.get("java_opts", ""))
|
|
context["job_config"]["configs"][
|
|
JobConfigAction.EDP_HBASE_COMMON_LIB] = (
|
|
data.get("hbase_common_lib", True))
|
|
if job_type == "Java":
|
|
context["job_config"]["configs"][
|
|
JobConfigAction.EDP_ADAPT_FOR_OOZIE] = (
|
|
data.get("adapt_oozie", True))
|
|
if job_type == "Spark":
|
|
context["job_config"]["configs"][
|
|
JobConfigAction.EDP_ADAPT_SPARK_SWIFT] = (
|
|
data.get("adapt_spark_swift", True))
|
|
elif job_type == "MapReduce.Streaming":
|
|
context["job_config"]["configs"][JobConfigAction.EDP_MAPPER] = (
|
|
data.get("streaming_mapper", ""))
|
|
context["job_config"]["configs"][JobConfigAction.EDP_REDUCER] = (
|
|
data.get("streaming_reducer", ""))
|
|
else:
|
|
context["job_config"]["params"] = job_params
|
|
|
|
return context
|
|
|
|
@staticmethod
|
|
def clean_configs(configs):
|
|
cleaned_conf = None
|
|
if isinstance(configs, dict):
|
|
cleaned_conf = dict([(k.strip(), v.strip())
|
|
for k, v in configs.items()
|
|
if len(v.strip()) > 0 and len(k.strip()) > 0])
|
|
elif isinstance(configs, list):
|
|
cleaned_conf = list([v.strip() for v in configs
|
|
if len(v.strip()) > 0])
|
|
return cleaned_conf
|
|
|
|
|
|
class NewClusterConfigAction(c_flow.GeneralConfigAction):
|
|
persist_cluster = forms.BooleanField(
|
|
label=_("Persist cluster after job exit"),
|
|
required=False)
|
|
|
|
class Meta(object):
|
|
name = _("Configure Cluster")
|
|
help_text_template = (
|
|
"project/data_processing.clusters/_configure_general_help.html")
|
|
|
|
|
|
class ClusterGeneralConfig(workflows.Step):
|
|
action_class = NewClusterConfigAction
|
|
contributes = ("hidden_configure_field", )
|
|
|
|
def contribute(self, data, context):
|
|
for k, v in data.items():
|
|
context["cluster_general_" + k] = v
|
|
|
|
return context
|
|
|
|
|
|
class JobExecutionInterfaceConfigAction(workflows.Action):
|
|
|
|
def __init__(self, request, *args, **kwargs):
|
|
super(JobExecutionInterfaceConfigAction, self).__init__(
|
|
request, *args, **kwargs)
|
|
job_id = (request.GET.get("job_id")
|
|
or request.POST.get("job"))
|
|
job = saharaclient.job_get(request, job_id)
|
|
interface = job.interface or []
|
|
interface_args = {}
|
|
|
|
job_ex_id = request.REQUEST.get("job_execution_id")
|
|
if job_ex_id is not None:
|
|
job_ex = saharaclient.job_execution_get(request, job_ex_id)
|
|
job = saharaclient.job_get(request, job_ex.job_id)
|
|
job_configs, interface_args = _merge_interface_with_configs(
|
|
job.interface, job_ex.job_configs)
|
|
|
|
for argument in interface:
|
|
field = forms.CharField(
|
|
required=argument.get('required'),
|
|
label=argument['name'],
|
|
initial=(interface_args.get(argument['id']) or
|
|
argument.get('default')),
|
|
help_text=argument.get('description'),
|
|
widget=forms.TextInput()
|
|
)
|
|
self.fields['argument_%s' % argument['id']] = field
|
|
self.fields['argument_ids'] = forms.CharField(
|
|
initial=json.dumps({argument['id']: argument['name']
|
|
for argument in interface}),
|
|
widget=forms.HiddenInput()
|
|
)
|
|
|
|
def clean(self):
|
|
cleaned_data = super(JobExecutionInterfaceConfigAction, self).clean()
|
|
return cleaned_data
|
|
|
|
class Meta(object):
|
|
name = _("Interface Arguments")
|
|
|
|
|
|
class JobExecutionInterfaceConfig(workflows.Step):
|
|
action_class = JobExecutionInterfaceConfigAction
|
|
|
|
def contribute(self, data, context):
|
|
for k, v in data.items():
|
|
context[k] = v
|
|
return context
|
|
|
|
|
|
class LaunchJob(workflows.Workflow):
|
|
slug = "launch_job"
|
|
name = _("Launch Job")
|
|
finalize_button_name = _("Launch")
|
|
success_message = _("Job launched")
|
|
failure_message = _("Could not launch job")
|
|
success_url = "horizon:project:data_processing.job_executions:index"
|
|
default_steps = (JobExecutionExistingGeneralConfig, JobConfig,
|
|
JobExecutionInterfaceConfig)
|
|
|
|
def handle(self, request, context):
|
|
argument_ids = json.loads(context['argument_ids'])
|
|
interface = {name: context["argument_" + str(arg_id)]
|
|
for arg_id, name in argument_ids.items()}
|
|
|
|
saharaclient.job_execution_create(
|
|
request,
|
|
context["job_general_job"],
|
|
context["job_general_cluster"],
|
|
context["job_general_job_input"],
|
|
context["job_general_job_output"],
|
|
context["job_config"],
|
|
interface)
|
|
return True
|
|
|
|
|
|
class SelectHadoopPluginAction(t_flows.SelectPluginAction):
|
|
def __init__(self, request, *args, **kwargs):
|
|
super(SelectHadoopPluginAction, self).__init__(request,
|
|
*args,
|
|
**kwargs)
|
|
self.fields["job_id"] = forms.ChoiceField(
|
|
label=_("Plugin name"),
|
|
initial=request.GET.get("job_id") or request.POST.get("job_id"),
|
|
widget=forms.HiddenInput(attrs={"class": "hidden_create_field"}))
|
|
|
|
self.fields["job_configs"] = forms.ChoiceField(
|
|
label=_("Job configs"),
|
|
widget=forms.HiddenInput(attrs={"class": "hidden_create_field"}))
|
|
|
|
self.fields["job_args"] = forms.ChoiceField(
|
|
label=_("Job args"),
|
|
widget=forms.HiddenInput(attrs={"class": "hidden_create_field"}))
|
|
|
|
self.fields["job_params"] = forms.ChoiceField(
|
|
label=_("Job params"),
|
|
widget=forms.HiddenInput(attrs={"class": "hidden_create_field"}))
|
|
|
|
job_ex_id = request.REQUEST.get("job_execution_id")
|
|
if job_ex_id is not None:
|
|
self.fields["job_execution_id"] = forms.ChoiceField(
|
|
label=_("Job Execution ID"),
|
|
initial=job_ex_id,
|
|
widget=forms.HiddenInput(
|
|
attrs={"class": "hidden_create_field"}))
|
|
|
|
job_configs = (
|
|
saharaclient.job_execution_get(request,
|
|
job_ex_id).job_configs)
|
|
|
|
if "configs" in job_configs:
|
|
self.fields["job_configs"].initial = (
|
|
json.dumps(job_configs["configs"]))
|
|
if "params" in job_configs:
|
|
self.fields["job_params"].initial = (
|
|
json.dumps(job_configs["params"]))
|
|
if "args" in job_configs:
|
|
self.fields["job_args"].initial = (
|
|
json.dumps(job_configs["args"]))
|
|
|
|
class Meta(object):
|
|
name = _("Select plugin and hadoop version for cluster")
|
|
help_text_template = ("project/data_processing.clusters/"
|
|
"_create_general_help.html")
|
|
|
|
|
|
class SelectHadoopPlugin(workflows.Step):
|
|
action_class = SelectHadoopPluginAction
|
|
|
|
|
|
class ChosePluginVersion(workflows.Workflow):
|
|
slug = "lunch_job"
|
|
name = _("Launch Job")
|
|
finalize_button_name = _("Create")
|
|
success_message = _("Created")
|
|
failure_message = _("Could not create")
|
|
success_url = "horizon:project:data_processing.cluster_templates:index"
|
|
default_steps = (SelectHadoopPlugin,)
|
|
|
|
|
|
class LaunchJobNewCluster(workflows.Workflow):
|
|
slug = "launch_job"
|
|
name = _("Launch Job")
|
|
finalize_button_name = _("Launch")
|
|
success_message = _("Job launched")
|
|
failure_message = _("Could not launch job")
|
|
success_url = "horizon:project:data_processing.jobs:index"
|
|
default_steps = (ClusterGeneralConfig,
|
|
JobExecutionGeneralConfig,
|
|
JobConfig,
|
|
JobExecutionInterfaceConfig)
|
|
|
|
def handle(self, request, context):
|
|
node_groups = None
|
|
|
|
plugin, hadoop_version = (
|
|
whelpers.get_plugin_and_hadoop_version(request))
|
|
|
|
ct_id = context["cluster_general_cluster_template"] or None
|
|
user_keypair = context["cluster_general_keypair"] or None
|
|
|
|
argument_ids = json.loads(context['argument_ids'])
|
|
interface = {name: context["argument_" + str(arg_id)]
|
|
for arg_id, name in argument_ids.items()}
|
|
|
|
try:
|
|
cluster = saharaclient.cluster_create(
|
|
request,
|
|
context["cluster_general_cluster_name"],
|
|
plugin, hadoop_version,
|
|
cluster_template_id=ct_id,
|
|
default_image_id=context["cluster_general_image"],
|
|
description=context["cluster_general_description"],
|
|
node_groups=node_groups,
|
|
user_keypair_id=user_keypair,
|
|
is_transient=not(context["cluster_general_persist_cluster"]),
|
|
net_id=context.get(
|
|
"cluster_general_neutron_management_network",
|
|
None))
|
|
except Exception:
|
|
exceptions.handle(request,
|
|
_("Unable to create new cluster for job."))
|
|
return False
|
|
|
|
try:
|
|
saharaclient.job_execution_create(
|
|
request,
|
|
context["job_general_job"],
|
|
cluster.id,
|
|
context["job_general_job_input"],
|
|
context["job_general_job_output"],
|
|
context["job_config"],
|
|
interface)
|
|
except Exception:
|
|
exceptions.handle(request,
|
|
_("Unable to launch job."))
|
|
return False
|
|
return True
|