diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py
index fe44f2d..11c0933 100644
--- a/conductor/conductor/app.py
+++ b/conductor/conductor/app.py
@@ -1,6 +1,7 @@
import datetime
import glob
import sys
+import traceback
from conductor.openstack.common import service
from workflow import Workflow
@@ -40,7 +41,8 @@ def task_received(task, message_id):
break
if not command_dispatcher.execute_pending():
break
- except Exception:
+ except Exception as ex:
+ traceback.print_exc()
break
command_dispatcher.close()
diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py
index 9798832..db81b9a 100644
--- a/conductor/conductor/cloud_formation.py
+++ b/conductor/conductor/cloud_formation.py
@@ -4,6 +4,7 @@ import xml_code_engine
import config
from random import choice
import time
+import string
def update_cf_stack(engine, context, body, template,
mappings, arguments, **kwargs):
@@ -14,8 +15,18 @@ def update_cf_stack(engine, context, body, template,
body.find('success'), context)
command_dispatcher.execute(
- name='cf', template=template, mappings=mappings,
- arguments=arguments, callback=callback)
+ name='cf', command='CreateOrUpdate', template=template,
+ mappings=mappings, arguments=arguments, callback=callback)
+
+def delete_cf_stack(engine, context, body, **kwargs):
+ command_dispatcher = context['/commandDispatcher']
+ print "delete-cf"
+
+ callback = lambda result: engine.evaluate_content(
+ body.find('success'), context)
+
+ command_dispatcher.execute(
+ name='cf', command='Delete', callback=callback)
def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs):
@@ -31,11 +42,12 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
template_data = template_data.replace(
'%RABBITMQ_INPUT_QUEUE%',
'-'.join([str(context['/dataSource']['id']),
- str(service), str(unit)])
+ str(service), str(unit)]).lower()
)
template_data = template_data.replace(
'%RESULT_QUEUE%',
- '-execution-results-%s' % str(context['/dataSource']['id']))
+ '-execution-results-%s' %
+ str(context['/dataSource']['id']).lower())
init_script = init_script.replace(
'%WINDOWS_AGENT_CONFIG_BASE64%',
@@ -45,10 +57,30 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
return init_script
+counter = 0
+
+def int2base(x, base):
+ digs = string.digits + string.lowercase
+ if x < 0: sign = -1
+ elif x==0: return '0'
+ else: sign = 1
+ x *= sign
+ digits = []
+ while x:
+ digits.append(digs[x % base])
+ x /= base
+ if sign < 0:
+ digits.append('-')
+ digits.reverse()
+ return ''.join(digits)
+
def generate_hostname(**kwargs):
- chars = 'abcdefghijklmnopqrstuvwxyz'
- prefix = ''.join(choice(chars) for _ in range(4))
- return prefix + str(int(time.time() * 10))
+ global counter
+ prefix = ''.join(choice(string.lowercase) for _ in range(5))
+ timestamp = int2base(int(time.time() * 1000), 36)[:8]
+ suffix = int2base(counter, 36)
+ counter = (counter + 1) % 1296
+ return prefix + timestamp + suffix
@@ -56,6 +88,9 @@ def generate_hostname(**kwargs):
xml_code_engine.XmlCodeEngine.register_function(
update_cf_stack, "update-cf-stack")
+xml_code_engine.XmlCodeEngine.register_function(
+ delete_cf_stack, "delete-cf-stack")
+
xml_code_engine.XmlCodeEngine.register_function(
prepare_user_data, "prepare-user-data")
diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py
index 44f4e08..9bc822c 100644
--- a/conductor/conductor/commands/cloud_formation.py
+++ b/conductor/conductor/commands/cloud_formation.py
@@ -13,35 +13,59 @@ import heatclient.exc
class HeatExecutor(CommandBase):
def __init__(self, stack, token):
- self._pending_list = []
- self._stack = stack
+ self._update_pending_list = []
+ self._delete_pending_list = []
+ self._stack = 'e' + stack
settings = conductor.config.CONF.heat
self._heat_client = Client('1', settings.url,
token_only=True, token=token)
- def execute(self, template, mappings, arguments, callback):
+ def execute(self, command, callback, **kwargs):
+ if command == 'CreateOrUpdate':
+ return self._execute_create_update(
+ kwargs['template'],
+ kwargs['mappings'],
+ kwargs['arguments'],
+ callback)
+ elif command == 'Delete':
+ return self._execute_delete(callback)
+
+
+ def _execute_create_update(self, template, mappings, arguments, callback):
with open('data/templates/cf/%s.template' % template) as template_file:
template_data = template_file.read()
template_data = conductor.helpers.transform_json(
anyjson.loads(template_data), mappings)
- self._pending_list.append({
+ self._update_pending_list.append({
'template': template_data,
'arguments': arguments,
'callback': callback
})
+ def _execute_delete(self, callback):
+ self._delete_pending_list.append({
+ 'callback': callback
+ })
+
+
def has_pending_commands(self):
- return len(self._pending_list) > 0
+ return len(self._update_pending_list) + \
+ len(self._delete_pending_list) > 0
def execute_pending(self):
- if not self.has_pending_commands():
+ r1 = self._execute_pending_updates()
+ r2 = self._execute_pending_deletes()
+ return r1 or r2
+
+ def _execute_pending_updates(self):
+ if not len(self._update_pending_list):
return False
template = {}
arguments = {}
- for t in self._pending_list:
+ for t in self._update_pending_list:
template = conductor.helpers.merge_dicts(
template, t['template'], max_levels=2)
arguments = conductor.helpers.merge_dicts(
@@ -50,22 +74,6 @@ class HeatExecutor(CommandBase):
print 'Executing heat template', anyjson.dumps(template), \
'with arguments', arguments, 'on stack', self._stack
- # if not os.path.exists("tmp"):
- # os.mkdir("tmp")
- # file_name = "tmp/" + str(uuid.uuid4())
- # print "Saving template to", file_name
- # with open(file_name, "w") as f:
- # f.write(anyjson.dumps(template))
- #
- # arguments_str = ';'.join(['%s=%s' % (key, value)
- # for (key, value) in arguments.items()])
- # call([
- # "./heat_run", "stack-create",
- # "-f" + file_name,
- # "-P" + arguments_str,
- # self._stack
- # ])
-
try:
print 'try update'
self._heat_client.stacks.update(
@@ -85,14 +93,32 @@ class HeatExecutor(CommandBase):
self._wait_state('CREATE_COMPLETE')
- pending_list = self._pending_list
- self._pending_list = []
+ pending_list = self._update_pending_list
+ self._update_pending_list = []
for item in pending_list:
item['callback'](True)
return True
+ def _execute_pending_deletes(self):
+ if not len(self._delete_pending_list):
+ return False
+
+ try:
+ self._heat_client.stacks.delete(
+ stack_id=self._stack)
+ self._wait_state('DELETE_COMPLETE')
+ except Exception:
+ pass
+
+ pending_list = self._delete_pending_list
+ self._delete_pending_list = []
+
+ for item in pending_list:
+ item['callback'](True)
+ return True
+
def _wait_state(self, state):
while True:
status = self._heat_client.stacks.get(
diff --git a/conductor/conductor/workflow.py b/conductor/conductor/workflow.py
index 94d357e..bdf6224 100644
--- a/conductor/conductor/workflow.py
+++ b/conductor/conductor/workflow.py
@@ -5,7 +5,6 @@ import re
import xml_code_engine
import function_context
-
class Workflow(object):
def __init__(self, filename, data, command_dispatcher, config, reporter):
self._data = data
@@ -26,10 +25,6 @@ class Workflow(object):
@staticmethod
def _get_path(obj, path, create_non_existing=False):
- # result = jsonpath.jsonpath(obj, '.'.join(path))
- # if not result or len(result) < 1:
- # return None
- # return result[0]
current = obj
for part in path:
if isinstance(current, types.ListType):
@@ -116,6 +111,7 @@ class Workflow(object):
if Workflow._get_path(data, position) != body_data:
Workflow._set_path(data, position, body_data)
context['/hasSideEffects'] = True
+
else:
data = context['/dataSource']
new_position = Workflow._correct_position(path, context)
@@ -127,8 +123,6 @@ class Workflow(object):
def _rule_func(match, context, body, engine, limit=0, name=None, **kwargs):
position = context['__dataSource_currentPosition'] or []
- if name == 'marker':
- print "!"
# data = context['__dataSource_currentObj']
# if data is None:
# data = context['/dataSource']
@@ -136,21 +130,29 @@ class Workflow(object):
data = Workflow._get_path(context['/dataSource'], position)
match = re.sub(r'@\.([\w.]+)',
r"Workflow._get_path(@, '\1'.split('.'))", match)
- selected = jsonpath.jsonpath(data, match, 'IPATH') or []
-
+ match = match.replace('$.', '$[*].')
+ selected = jsonpath.jsonpath([data], match, 'IPATH') or []
index = 0
for found_match in selected:
if 0 < int(limit) <= index:
break
index += 1
- new_position = position + found_match
+ new_position = position + found_match[1:]
context['__dataSource_currentPosition'] = new_position
context['__dataSource_currentObj'] = Workflow._get_path(
context['/dataSource'], new_position)
for element in body:
+ if element.tag == 'empty':
+ continue
engine.evaluate(element, context)
if element.tag == 'rule' and context['/hasSideEffects']:
break
+ if not index:
+ empty_handler = body.find('empty')
+ if empty_handler is not None:
+
+ engine.evaluate_content(empty_handler, context)
+
@staticmethod
def _workflow_func(context, body, engine, **kwargs):
diff --git a/conductor/data/workflows/AD.xml b/conductor/data/workflows/AD.xml
index c394fbd..b350763 100644
--- a/conductor/data/workflows/AD.xml
+++ b/conductor/data/workflows/AD.xml
@@ -143,6 +143,9 @@
+
+
+
/$.services.activeDirectories[?(@.domain == '' and @.state.primaryDcIp)]
@@ -151,7 +154,7 @@
-
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file