Evironment/Units deletion, bug-fixes

This commit is contained in:
Stan Lagun 2013-03-27 02:06:53 +04:00
parent 7be1f27f0b
commit 74958e1052
6 changed files with 124 additions and 44 deletions

View File

@ -1,6 +1,7 @@
import datetime
import glob
import sys
import traceback
from conductor.openstack.common import service
from workflow import Workflow
@ -40,7 +41,8 @@ def task_received(task, message_id):
break
if not command_dispatcher.execute_pending():
break
except Exception:
except Exception as ex:
traceback.print_exc()
break
command_dispatcher.close()

View File

@ -4,6 +4,7 @@ import xml_code_engine
import config
from random import choice
import time
import string
def update_cf_stack(engine, context, body, template,
mappings, arguments, **kwargs):
@ -14,8 +15,18 @@ def update_cf_stack(engine, context, body, template,
body.find('success'), context)
command_dispatcher.execute(
name='cf', template=template, mappings=mappings,
arguments=arguments, callback=callback)
name='cf', command='CreateOrUpdate', template=template,
mappings=mappings, arguments=arguments, callback=callback)
def delete_cf_stack(engine, context, body, **kwargs):
command_dispatcher = context['/commandDispatcher']
print "delete-cf"
callback = lambda result: engine.evaluate_content(
body.find('success'), context)
command_dispatcher.execute(
name='cf', command='Delete', callback=callback)
def prepare_user_data(context, hostname, service, unit, template='Default', **kwargs):
@ -31,11 +42,12 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
template_data = template_data.replace(
'%RABBITMQ_INPUT_QUEUE%',
'-'.join([str(context['/dataSource']['id']),
str(service), str(unit)])
str(service), str(unit)]).lower()
)
template_data = template_data.replace(
'%RESULT_QUEUE%',
'-execution-results-%s' % str(context['/dataSource']['id']))
'-execution-results-%s' %
str(context['/dataSource']['id']).lower())
init_script = init_script.replace(
'%WINDOWS_AGENT_CONFIG_BASE64%',
@ -45,10 +57,30 @@ def prepare_user_data(context, hostname, service, unit, template='Default', **kw
return init_script
counter = 0
def int2base(x, base):
digs = string.digits + string.lowercase
if x < 0: sign = -1
elif x==0: return '0'
else: sign = 1
x *= sign
digits = []
while x:
digits.append(digs[x % base])
x /= base
if sign < 0:
digits.append('-')
digits.reverse()
return ''.join(digits)
def generate_hostname(**kwargs):
chars = 'abcdefghijklmnopqrstuvwxyz'
prefix = ''.join(choice(chars) for _ in range(4))
return prefix + str(int(time.time() * 10))
global counter
prefix = ''.join(choice(string.lowercase) for _ in range(5))
timestamp = int2base(int(time.time() * 1000), 36)[:8]
suffix = int2base(counter, 36)
counter = (counter + 1) % 1296
return prefix + timestamp + suffix
@ -56,6 +88,9 @@ def generate_hostname(**kwargs):
xml_code_engine.XmlCodeEngine.register_function(
update_cf_stack, "update-cf-stack")
xml_code_engine.XmlCodeEngine.register_function(
delete_cf_stack, "delete-cf-stack")
xml_code_engine.XmlCodeEngine.register_function(
prepare_user_data, "prepare-user-data")

View File

@ -13,35 +13,59 @@ import heatclient.exc
class HeatExecutor(CommandBase):
def __init__(self, stack, token):
self._pending_list = []
self._stack = stack
self._update_pending_list = []
self._delete_pending_list = []
self._stack = 'e' + stack
settings = conductor.config.CONF.heat
self._heat_client = Client('1', settings.url,
token_only=True, token=token)
def execute(self, template, mappings, arguments, callback):
def execute(self, command, callback, **kwargs):
if command == 'CreateOrUpdate':
return self._execute_create_update(
kwargs['template'],
kwargs['mappings'],
kwargs['arguments'],
callback)
elif command == 'Delete':
return self._execute_delete(callback)
def _execute_create_update(self, template, mappings, arguments, callback):
with open('data/templates/cf/%s.template' % template) as template_file:
template_data = template_file.read()
template_data = conductor.helpers.transform_json(
anyjson.loads(template_data), mappings)
self._pending_list.append({
self._update_pending_list.append({
'template': template_data,
'arguments': arguments,
'callback': callback
})
def _execute_delete(self, callback):
self._delete_pending_list.append({
'callback': callback
})
def has_pending_commands(self):
return len(self._pending_list) > 0
return len(self._update_pending_list) + \
len(self._delete_pending_list) > 0
def execute_pending(self):
if not self.has_pending_commands():
r1 = self._execute_pending_updates()
r2 = self._execute_pending_deletes()
return r1 or r2
def _execute_pending_updates(self):
if not len(self._update_pending_list):
return False
template = {}
arguments = {}
for t in self._pending_list:
for t in self._update_pending_list:
template = conductor.helpers.merge_dicts(
template, t['template'], max_levels=2)
arguments = conductor.helpers.merge_dicts(
@ -50,22 +74,6 @@ class HeatExecutor(CommandBase):
print 'Executing heat template', anyjson.dumps(template), \
'with arguments', arguments, 'on stack', self._stack
# if not os.path.exists("tmp"):
# os.mkdir("tmp")
# file_name = "tmp/" + str(uuid.uuid4())
# print "Saving template to", file_name
# with open(file_name, "w") as f:
# f.write(anyjson.dumps(template))
#
# arguments_str = ';'.join(['%s=%s' % (key, value)
# for (key, value) in arguments.items()])
# call([
# "./heat_run", "stack-create",
# "-f" + file_name,
# "-P" + arguments_str,
# self._stack
# ])
try:
print 'try update'
self._heat_client.stacks.update(
@ -85,14 +93,32 @@ class HeatExecutor(CommandBase):
self._wait_state('CREATE_COMPLETE')
pending_list = self._pending_list
self._pending_list = []
pending_list = self._update_pending_list
self._update_pending_list = []
for item in pending_list:
item['callback'](True)
return True
def _execute_pending_deletes(self):
if not len(self._delete_pending_list):
return False
try:
self._heat_client.stacks.delete(
stack_id=self._stack)
self._wait_state('DELETE_COMPLETE')
except Exception:
pass
pending_list = self._delete_pending_list
self._delete_pending_list = []
for item in pending_list:
item['callback'](True)
return True
def _wait_state(self, state):
while True:
status = self._heat_client.stacks.get(

View File

@ -5,7 +5,6 @@ import re
import xml_code_engine
import function_context
class Workflow(object):
def __init__(self, filename, data, command_dispatcher, config, reporter):
self._data = data
@ -26,10 +25,6 @@ class Workflow(object):
@staticmethod
def _get_path(obj, path, create_non_existing=False):
# result = jsonpath.jsonpath(obj, '.'.join(path))
# if not result or len(result) < 1:
# return None
# return result[0]
current = obj
for part in path:
if isinstance(current, types.ListType):
@ -116,6 +111,7 @@ class Workflow(object):
if Workflow._get_path(data, position) != body_data:
Workflow._set_path(data, position, body_data)
context['/hasSideEffects'] = True
else:
data = context['/dataSource']
new_position = Workflow._correct_position(path, context)
@ -127,8 +123,6 @@ class Workflow(object):
def _rule_func(match, context, body, engine, limit=0, name=None, **kwargs):
position = context['__dataSource_currentPosition'] or []
if name == 'marker':
print "!"
# data = context['__dataSource_currentObj']
# if data is None:
# data = context['/dataSource']
@ -136,21 +130,29 @@ class Workflow(object):
data = Workflow._get_path(context['/dataSource'], position)
match = re.sub(r'@\.([\w.]+)',
r"Workflow._get_path(@, '\1'.split('.'))", match)
selected = jsonpath.jsonpath(data, match, 'IPATH') or []
match = match.replace('$.', '$[*].')
selected = jsonpath.jsonpath([data], match, 'IPATH') or []
index = 0
for found_match in selected:
if 0 < int(limit) <= index:
break
index += 1
new_position = position + found_match
new_position = position + found_match[1:]
context['__dataSource_currentPosition'] = new_position
context['__dataSource_currentObj'] = Workflow._get_path(
context['/dataSource'], new_position)
for element in body:
if element.tag == 'empty':
continue
engine.evaluate(element, context)
if element.tag == 'rule' and context['/hasSideEffects']:
break
if not index:
empty_handler = body.find('empty')
if empty_handler is not None:
engine.evaluate_content(empty_handler, context)
@staticmethod
def _workflow_func(context, body, engine, **kwargs):

View File

@ -143,6 +143,9 @@
<set path="#unit">
<select/>
</set>
<set path="#service">
<select path="::"/>
</set>
<rule>
<parameter name="match">/$.services.activeDirectories[?(@.domain == '<select path="domain"/>' and @.state.primaryDcIp)]</parameter>
@ -151,7 +154,7 @@
<select path="name" source="unit"/>
</parameter>
<parameter name="service">
<select path="::id"/>
<select path="id" source="service"/>
</parameter>
<parameter name="mappings">
<map>

View File

@ -4,4 +4,16 @@
<set path="state.hostname"><generate-hostname/></set>
</rule>
<rule match="$[?(not @.state.deleted)]">
<rule match="$.services[*][*].units[*]">
<empty>
<delete-cf-stack>
<success>
<set path="/state.deleted"><true/></set>
</success>
</delete-cf-stack>
</empty>
</rule>
</rule>
</workflow>