From 74958e10525b85ab25a40095025a09ef1f1b6ece Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Wed, 27 Mar 2013 02:06:53 +0400 Subject: [PATCH] Evironment/Units deletion, bug-fixes --- conductor/conductor/app.py | 4 +- conductor/conductor/cloud_formation.py | 49 ++++++++++-- .../conductor/commands/cloud_formation.py | 76 +++++++++++++------ conductor/conductor/workflow.py | 22 +++--- conductor/data/workflows/AD.xml | 5 +- conductor/data/workflows/Common.xml | 12 +++ 6 files changed, 124 insertions(+), 44 deletions(-) diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py index fe44f2d..11c0933 100644 --- a/conductor/conductor/app.py +++ b/conductor/conductor/app.py @@ -1,6 +1,7 @@ import datetime import glob import sys +import traceback from conductor.openstack.common import service from workflow import Workflow @@ -40,7 +41,8 @@ def task_received(task, message_id): break if not command_dispatcher.execute_pending(): break - except Exception: + except Exception as ex: + traceback.print_exc() break command_dispatcher.close() diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py index 9798832..db81b9a 100644 --- a/conductor/conductor/cloud_formation.py +++ b/conductor/conductor/cloud_formation.py @@ -4,6 +4,7 @@ import xml_code_engine import config from random import choice import time +import string def update_cf_stack(engine, context, body, template, mappings, arguments, **kwargs): @@ -14,8 +15,18 @@ def update_cf_stack(engine, context, body, template, body.find('success'), context) command_dispatcher.execute( - name='cf', template=template, mappings=mappings, - arguments=arguments, callback=callback) + name='cf', command='CreateOrUpdate', template=template, + mappings=mappings, arguments=arguments, callback=callback) + +def delete_cf_stack(engine, context, body, **kwargs): + command_dispatcher = context['/commandDispatcher'] + print "delete-cf" + + callback = lambda result: engine.evaluate_content( + body.find('success'), context) + + command_dispatcher.execute( + name='cf', command='Delete', callback=callback) def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs): @@ -31,11 +42,12 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw template_data = template_data.replace( '%RABBITMQ_INPUT_QUEUE%', '-'.join([str(context['/dataSource']['id']), - str(service), str(unit)]) + str(service), str(unit)]).lower() ) template_data = template_data.replace( '%RESULT_QUEUE%', - '-execution-results-%s' % str(context['/dataSource']['id'])) + '-execution-results-%s' % + str(context['/dataSource']['id']).lower()) init_script = init_script.replace( '%WINDOWS_AGENT_CONFIG_BASE64%', @@ -45,10 +57,30 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw return init_script +counter = 0 + +def int2base(x, base): + digs = string.digits + string.lowercase + if x < 0: sign = -1 + elif x==0: return '0' + else: sign = 1 + x *= sign + digits = [] + while x: + digits.append(digs[x % base]) + x /= base + if sign < 0: + digits.append('-') + digits.reverse() + return ''.join(digits) + def generate_hostname(**kwargs): - chars = 'abcdefghijklmnopqrstuvwxyz' - prefix = ''.join(choice(chars) for _ in range(4)) - return prefix + str(int(time.time() * 10)) + global counter + prefix = ''.join(choice(string.lowercase) for _ in range(5)) + timestamp = int2base(int(time.time() * 1000), 36)[:8] + suffix = int2base(counter, 36) + counter = (counter + 1) % 1296 + return prefix + timestamp + suffix @@ -56,6 +88,9 @@ def generate_hostname(**kwargs): xml_code_engine.XmlCodeEngine.register_function( update_cf_stack, "update-cf-stack") +xml_code_engine.XmlCodeEngine.register_function( + delete_cf_stack, "delete-cf-stack") + xml_code_engine.XmlCodeEngine.register_function( prepare_user_data, "prepare-user-data") diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py index 44f4e08..9bc822c 100644 --- a/conductor/conductor/commands/cloud_formation.py +++ b/conductor/conductor/commands/cloud_formation.py @@ -13,35 +13,59 @@ import heatclient.exc class HeatExecutor(CommandBase): def __init__(self, stack, token): - self._pending_list = [] - self._stack = stack + self._update_pending_list = [] + self._delete_pending_list = [] + self._stack = 'e' + stack settings = conductor.config.CONF.heat self._heat_client = Client('1', settings.url, token_only=True, token=token) - def execute(self, template, mappings, arguments, callback): + def execute(self, command, callback, **kwargs): + if command == 'CreateOrUpdate': + return self._execute_create_update( + kwargs['template'], + kwargs['mappings'], + kwargs['arguments'], + callback) + elif command == 'Delete': + return self._execute_delete(callback) + + + def _execute_create_update(self, template, mappings, arguments, callback): with open('data/templates/cf/%s.template' % template) as template_file: template_data = template_file.read() template_data = conductor.helpers.transform_json( anyjson.loads(template_data), mappings) - self._pending_list.append({ + self._update_pending_list.append({ 'template': template_data, 'arguments': arguments, 'callback': callback }) + def _execute_delete(self, callback): + self._delete_pending_list.append({ + 'callback': callback + }) + + def has_pending_commands(self): - return len(self._pending_list) > 0 + return len(self._update_pending_list) + \ + len(self._delete_pending_list) > 0 def execute_pending(self): - if not self.has_pending_commands(): + r1 = self._execute_pending_updates() + r2 = self._execute_pending_deletes() + return r1 or r2 + + def _execute_pending_updates(self): + if not len(self._update_pending_list): return False template = {} arguments = {} - for t in self._pending_list: + for t in self._update_pending_list: template = conductor.helpers.merge_dicts( template, t['template'], max_levels=2) arguments = conductor.helpers.merge_dicts( @@ -50,22 +74,6 @@ class HeatExecutor(CommandBase): print 'Executing heat template', anyjson.dumps(template), \ 'with arguments', arguments, 'on stack', self._stack - # if not os.path.exists("tmp"): - # os.mkdir("tmp") - # file_name = "tmp/" + str(uuid.uuid4()) - # print "Saving template to", file_name - # with open(file_name, "w") as f: - # f.write(anyjson.dumps(template)) - # - # arguments_str = ';'.join(['%s=%s' % (key, value) - # for (key, value) in arguments.items()]) - # call([ - # "./heat_run", "stack-create", - # "-f" + file_name, - # "-P" + arguments_str, - # self._stack - # ]) - try: print 'try update' self._heat_client.stacks.update( @@ -85,14 +93,32 @@ class HeatExecutor(CommandBase): self._wait_state('CREATE_COMPLETE') - pending_list = self._pending_list - self._pending_list = [] + pending_list = self._update_pending_list + self._update_pending_list = [] for item in pending_list: item['callback'](True) return True + def _execute_pending_deletes(self): + if not len(self._delete_pending_list): + return False + + try: + self._heat_client.stacks.delete( + stack_id=self._stack) + self._wait_state('DELETE_COMPLETE') + except Exception: + pass + + pending_list = self._delete_pending_list + self._delete_pending_list = [] + + for item in pending_list: + item['callback'](True) + return True + def _wait_state(self, state): while True: status = self._heat_client.stacks.get( diff --git a/conductor/conductor/workflow.py b/conductor/conductor/workflow.py index 94d357e..bdf6224 100644 --- a/conductor/conductor/workflow.py +++ b/conductor/conductor/workflow.py @@ -5,7 +5,6 @@ import re import xml_code_engine import function_context - class Workflow(object): def __init__(self, filename, data, command_dispatcher, config, reporter): self._data = data @@ -26,10 +25,6 @@ class Workflow(object): @staticmethod def _get_path(obj, path, create_non_existing=False): - # result = jsonpath.jsonpath(obj, '.'.join(path)) - # if not result or len(result) < 1: - # return None - # return result[0] current = obj for part in path: if isinstance(current, types.ListType): @@ -116,6 +111,7 @@ class Workflow(object): if Workflow._get_path(data, position) != body_data: Workflow._set_path(data, position, body_data) context['/hasSideEffects'] = True + else: data = context['/dataSource'] new_position = Workflow._correct_position(path, context) @@ -127,8 +123,6 @@ class Workflow(object): def _rule_func(match, context, body, engine, limit=0, name=None, **kwargs): position = context['__dataSource_currentPosition'] or [] - if name == 'marker': - print "!" # data = context['__dataSource_currentObj'] # if data is None: # data = context['/dataSource'] @@ -136,21 +130,29 @@ class Workflow(object): data = Workflow._get_path(context['/dataSource'], position) match = re.sub(r'@\.([\w.]+)', r"Workflow._get_path(@, '\1'.split('.'))", match) - selected = jsonpath.jsonpath(data, match, 'IPATH') or [] - + match = match.replace('$.', '$[*].') + selected = jsonpath.jsonpath([data], match, 'IPATH') or [] index = 0 for found_match in selected: if 0 < int(limit) <= index: break index += 1 - new_position = position + found_match + new_position = position + found_match[1:] context['__dataSource_currentPosition'] = new_position context['__dataSource_currentObj'] = Workflow._get_path( context['/dataSource'], new_position) for element in body: + if element.tag == 'empty': + continue engine.evaluate(element, context) if element.tag == 'rule' and context['/hasSideEffects']: break + if not index: + empty_handler = body.find('empty') + if empty_handler is not None: + + engine.evaluate_content(empty_handler, context) + @staticmethod def _workflow_func(context, body, engine, **kwargs): diff --git a/conductor/data/workflows/AD.xml b/conductor/data/workflows/AD.xml index c394fbd..b350763 100644 --- a/conductor/data/workflows/AD.xml +++ b/conductor/data/workflows/AD.xml @@ -143,6 +143,9 @@ + /$.services.activeDirectories[?(@.domain == ' - diff --git a/conductor/data/workflows/Common.xml b/conductor/data/workflows/Common.xml index bace431..76de593 100644 --- a/conductor/data/workflows/Common.xml +++ b/conductor/data/workflows/Common.xml @@ -4,4 +4,16 @@ + + + + + + + + + + + + \ No newline at end of file