logging and bug-fixes
This commit is contained in:
parent
8ffd75cc1e
commit
dfdd144f96
@ -3,9 +3,11 @@ import glob
|
|||||||
import sys
|
import sys
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
import anyjson
|
||||||
from conductor.openstack.common import service
|
from conductor.openstack.common import service
|
||||||
from workflow import Workflow
|
from workflow import Workflow
|
||||||
from commands.dispatcher import CommandDispatcher
|
from commands.dispatcher import CommandDispatcher
|
||||||
|
from openstack.common import log as logging
|
||||||
from config import Config
|
from config import Config
|
||||||
import reporting
|
import reporting
|
||||||
import rabbitmq
|
import rabbitmq
|
||||||
@ -15,17 +17,20 @@ import cloud_formation
|
|||||||
|
|
||||||
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
|
config = Config(sys.argv[1] if len(sys.argv) > 1 else None)
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def task_received(task, message_id):
|
def task_received(task, message_id):
|
||||||
with rabbitmq.RmqClient() as rmqclient:
|
with rabbitmq.RmqClient() as rmqclient:
|
||||||
print 'Starting at', datetime.datetime.now()
|
log.info('Starting processing task {0}: {1}'.format(
|
||||||
|
message_id, anyjson.dumps(task)))
|
||||||
reporter = reporting.Reporter(rmqclient, message_id, task['id'])
|
reporter = reporting.Reporter(rmqclient, message_id, task['id'])
|
||||||
|
|
||||||
command_dispatcher = CommandDispatcher(
|
command_dispatcher = CommandDispatcher(
|
||||||
task['id'], rmqclient, task['token'])
|
task['id'], rmqclient, task['token'])
|
||||||
workflows = []
|
workflows = []
|
||||||
for path in glob.glob("data/workflows/*.xml"):
|
for path in glob.glob("data/workflows/*.xml"):
|
||||||
print "loading", path
|
log.debug('Loading XML {0}'.format(path))
|
||||||
workflow = Workflow(path, task, command_dispatcher, config,
|
workflow = Workflow(path, task, command_dispatcher, config,
|
||||||
reporter)
|
reporter)
|
||||||
workflows.append(workflow)
|
workflows.append(workflow)
|
||||||
@ -42,7 +47,7 @@ def task_received(task, message_id):
|
|||||||
if not command_dispatcher.execute_pending():
|
if not command_dispatcher.execute_pending():
|
||||||
break
|
break
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
traceback.print_exc()
|
log.exception(ex)
|
||||||
break
|
break
|
||||||
|
|
||||||
command_dispatcher.close()
|
command_dispatcher.close()
|
||||||
@ -53,7 +58,9 @@ def task_received(task, message_id):
|
|||||||
result_msg.id = message_id
|
result_msg.id = message_id
|
||||||
|
|
||||||
rmqclient.send(message=result_msg, key='task-results')
|
rmqclient.send(message=result_msg, key='task-results')
|
||||||
print 'Finished at', datetime.datetime.now()
|
log.info('Finished processing task {0}. Result = {1}'.format(
|
||||||
|
message_id, anyjson.dumps(task)))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class ConductorWorkflowService(service.Service):
|
class ConductorWorkflowService(service.Service):
|
||||||
@ -71,13 +78,13 @@ class ConductorWorkflowService(service.Service):
|
|||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
with rabbitmq.RmqClient() as rmq:
|
with rabbitmq.RmqClient() as rmq:
|
||||||
rmq.declare('tasks2', 'tasks2')
|
rmq.declare('tasks', 'tasks')
|
||||||
rmq.declare('task-results', 'tasks2')
|
rmq.declare('task-results', 'tasks')
|
||||||
with rmq.open('tasks2') as subscription:
|
with rmq.open('tasks') as subscription:
|
||||||
while True:
|
while True:
|
||||||
msg = subscription.get_message()
|
msg = subscription.get_message()
|
||||||
self.tg.add_thread(
|
self.tg.add_thread(
|
||||||
task_received, msg.body, msg.id)
|
task_received, msg.body, msg.id)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
print ex
|
log.exception(ex)
|
||||||
|
|
||||||
|
@ -9,7 +9,6 @@ import string
|
|||||||
def update_cf_stack(engine, context, body, template,
|
def update_cf_stack(engine, context, body, template,
|
||||||
mappings, arguments, **kwargs):
|
mappings, arguments, **kwargs):
|
||||||
command_dispatcher = context['/commandDispatcher']
|
command_dispatcher = context['/commandDispatcher']
|
||||||
print "update-cf", template
|
|
||||||
|
|
||||||
callback = lambda result: engine.evaluate_content(
|
callback = lambda result: engine.evaluate_content(
|
||||||
body.find('success'), context)
|
body.find('success'), context)
|
||||||
@ -20,7 +19,6 @@ def update_cf_stack(engine, context, body, template,
|
|||||||
|
|
||||||
def delete_cf_stack(engine, context, body, **kwargs):
|
def delete_cf_stack(engine, context, body, **kwargs):
|
||||||
command_dispatcher = context['/commandDispatcher']
|
command_dispatcher = context['/commandDispatcher']
|
||||||
print "delete-cf"
|
|
||||||
|
|
||||||
callback = lambda result: engine.evaluate_content(
|
callback = lambda result: engine.evaluate_content(
|
||||||
body.find('success'), context)
|
body.find('success'), context)
|
||||||
|
@ -1,14 +1,15 @@
|
|||||||
import anyjson
|
import anyjson
|
||||||
import os
|
|
||||||
import uuid
|
|
||||||
import eventlet
|
import eventlet
|
||||||
|
|
||||||
|
from conductor.openstack.common import log as logging
|
||||||
import conductor.helpers
|
import conductor.helpers
|
||||||
from command import CommandBase
|
from command import CommandBase
|
||||||
import conductor.config
|
import conductor.config
|
||||||
from heatclient.client import Client
|
from heatclient.client import Client
|
||||||
import heatclient.exc
|
import heatclient.exc
|
||||||
|
import types
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class HeatExecutor(CommandBase):
|
class HeatExecutor(CommandBase):
|
||||||
@ -21,6 +22,8 @@ class HeatExecutor(CommandBase):
|
|||||||
token_only=True, token=token)
|
token_only=True, token=token)
|
||||||
|
|
||||||
def execute(self, command, callback, **kwargs):
|
def execute(self, command, callback, **kwargs):
|
||||||
|
log.debug('Got command {0} on stack {1}'.format(command, self._stack))
|
||||||
|
|
||||||
if command == 'CreateOrUpdate':
|
if command == 'CreateOrUpdate':
|
||||||
return self._execute_create_update(
|
return self._execute_create_update(
|
||||||
kwargs['template'],
|
kwargs['template'],
|
||||||
@ -71,26 +74,27 @@ class HeatExecutor(CommandBase):
|
|||||||
arguments = conductor.helpers.merge_dicts(
|
arguments = conductor.helpers.merge_dicts(
|
||||||
arguments, t['arguments'], max_levels=1)
|
arguments, t['arguments'], max_levels=1)
|
||||||
|
|
||||||
print 'Executing heat template', anyjson.dumps(template), \
|
log.info('Executing heat template {0} with arguments {1} on stack {2}'
|
||||||
'with arguments', arguments, 'on stack', self._stack
|
.format(anyjson.dumps(template), arguments, self._stack))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print 'try update'
|
|
||||||
self._heat_client.stacks.update(
|
self._heat_client.stacks.update(
|
||||||
stack_id=self._stack,
|
stack_id=self._stack,
|
||||||
parameters=arguments,
|
parameters=arguments,
|
||||||
template=template)
|
template=template)
|
||||||
print 'wait update'
|
log.debug('Waiting for the stack {0} to be update'
|
||||||
|
.format(self._stack))
|
||||||
self._wait_state('UPDATE_COMPLETE')
|
self._wait_state('UPDATE_COMPLETE')
|
||||||
|
log.info('Stack {0} updated'.format(self._stack))
|
||||||
except heatclient.exc.HTTPNotFound:
|
except heatclient.exc.HTTPNotFound:
|
||||||
print 'try create'
|
|
||||||
|
|
||||||
self._heat_client.stacks.create(
|
self._heat_client.stacks.create(
|
||||||
stack_name=self._stack,
|
stack_name=self._stack,
|
||||||
parameters=arguments,
|
parameters=arguments,
|
||||||
template=template)
|
template=template)
|
||||||
print 'wait create'
|
log.debug('Waiting for the stack {0} to be create'
|
||||||
|
.format(self._stack))
|
||||||
self._wait_state('CREATE_COMPLETE')
|
self._wait_state('CREATE_COMPLETE')
|
||||||
|
log.info('Stack {0} created'.format(self._stack))
|
||||||
|
|
||||||
|
|
||||||
pending_list = self._update_pending_list
|
pending_list = self._update_pending_list
|
||||||
@ -105,12 +109,16 @@ class HeatExecutor(CommandBase):
|
|||||||
if not len(self._delete_pending_list):
|
if not len(self._delete_pending_list):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
log.debug('Deleting stack {0}'.format(self._stack))
|
||||||
try:
|
try:
|
||||||
self._heat_client.stacks.delete(
|
self._heat_client.stacks.delete(
|
||||||
stack_id=self._stack)
|
stack_id=self._stack)
|
||||||
self._wait_state('DELETE_COMPLETE')
|
log.debug('Waiting for the stack {0} to be deleted'
|
||||||
except Exception:
|
.format(self._stack))
|
||||||
pass
|
self._wait_state(['DELETE_COMPLETE', ''])
|
||||||
|
log.info('Stack {0} deleted'.format(self._stack))
|
||||||
|
except Exception as ex:
|
||||||
|
log.exception(ex)
|
||||||
|
|
||||||
pending_list = self._delete_pending_list
|
pending_list = self._delete_pending_list
|
||||||
self._delete_pending_list = []
|
self._delete_pending_list = []
|
||||||
@ -120,13 +128,23 @@ class HeatExecutor(CommandBase):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def _wait_state(self, state):
|
def _wait_state(self, state):
|
||||||
|
if isinstance(state, types.ListType):
|
||||||
|
states = state
|
||||||
|
else:
|
||||||
|
states = [state]
|
||||||
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
status = self._heat_client.stacks.get(
|
try:
|
||||||
stack_id=self._stack).stack_status
|
status = self._heat_client.stacks.get(
|
||||||
|
stack_id=self._stack).stack_status
|
||||||
|
except heatclient.exc.HTTPNotFound:
|
||||||
|
status = ''
|
||||||
|
|
||||||
if 'IN_PROGRESS' in status:
|
if 'IN_PROGRESS' in status:
|
||||||
eventlet.sleep(1)
|
eventlet.sleep(1)
|
||||||
continue
|
continue
|
||||||
if status != state:
|
if status not in states:
|
||||||
raise EnvironmentError()
|
raise EnvironmentError()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1,17 +1,20 @@
|
|||||||
import json
|
import json
|
||||||
import uuid
|
import uuid
|
||||||
from conductor.rabbitmq import Message
|
|
||||||
|
|
||||||
|
from conductor.openstack.common import log as logging
|
||||||
|
from conductor.rabbitmq import Message
|
||||||
import conductor.helpers
|
import conductor.helpers
|
||||||
from command import CommandBase
|
from command import CommandBase
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class WindowsAgentExecutor(CommandBase):
|
class WindowsAgentExecutor(CommandBase):
|
||||||
def __init__(self, stack, rmqclient):
|
def __init__(self, stack, rmqclient):
|
||||||
self._stack = stack
|
self._stack = stack
|
||||||
self._rmqclient = rmqclient
|
self._rmqclient = rmqclient
|
||||||
self._pending_list = []
|
self._pending_list = []
|
||||||
self._results_queue = '-execution-results-%s' % str(stack)
|
self._results_queue = '-execution-results-%s' % str(stack).lower()
|
||||||
rmqclient.declare(self._results_queue)
|
rmqclient.declare(self._results_queue)
|
||||||
|
|
||||||
def execute(self, template, mappings, host, service, callback):
|
def execute(self, template, mappings, host, service, callback):
|
||||||
@ -34,8 +37,8 @@ class WindowsAgentExecutor(CommandBase):
|
|||||||
msg.id = id
|
msg.id = id
|
||||||
self._rmqclient.declare(host)
|
self._rmqclient.declare(host)
|
||||||
self._rmqclient.send(message=msg, key=host)
|
self._rmqclient.send(message=msg, key=host)
|
||||||
print 'Sending RMQ message %s to %s' % (
|
log.info('Sending RMQ message {0} to {1} with id {2}'.format(
|
||||||
template_data, host)
|
template_data, host, id))
|
||||||
|
|
||||||
def has_pending_commands(self):
|
def has_pending_commands(self):
|
||||||
return len(self._pending_list) > 0
|
return len(self._pending_list) > 0
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
import xml_code_engine
|
import xml_code_engine
|
||||||
|
|
||||||
|
from openstack.common import log as logging
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def send_command(engine, context, body, template, service, host, mappings=None,
|
def send_command(engine, context, body, template, service, host, mappings=None,
|
||||||
result=None, **kwargs):
|
result=None, **kwargs):
|
||||||
@ -7,8 +10,8 @@ def send_command(engine, context, body, template, service, host, mappings=None,
|
|||||||
command_dispatcher = context['/commandDispatcher']
|
command_dispatcher = context['/commandDispatcher']
|
||||||
|
|
||||||
def callback(result_value):
|
def callback(result_value):
|
||||||
print "Received result for %s: %s. Body is %s" % \
|
log.info('Received result from {3} for {0}: {1}. Body is {2}'.format(
|
||||||
(template, result_value, body)
|
template, result_value, body, host))
|
||||||
if result is not None:
|
if result is not None:
|
||||||
context[result] = result_value['Result']
|
context[result] = result_value['Result']
|
||||||
|
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
log_file = logs/conductor.log
|
log_file = logs/conductor.log
|
||||||
|
debug=True
|
||||||
|
verbose=True
|
||||||
|
|
||||||
[heat]
|
[heat]
|
||||||
url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50
|
url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50
|
||||||
|
Loading…
x
Reference in New Issue
Block a user