From dfdd144f96956aee2b0baf84429ec9b27f76aa35 Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Wed, 27 Mar 2013 02:45:02 +0400 Subject: [PATCH] logging and bug-fixes --- conductor/conductor/app.py | 23 +++++---- conductor/conductor/cloud_formation.py | 2 - .../conductor/commands/cloud_formation.py | 48 +++++++++++++------ conductor/conductor/commands/windows_agent.py | 11 +++-- conductor/conductor/windows_agent.py | 7 ++- conductor/etc/conductor.conf | 2 + 6 files changed, 62 insertions(+), 31 deletions(-) diff --git a/conductor/conductor/app.py b/conductor/conductor/app.py index 11c0933..41f4ef0 100644 --- a/conductor/conductor/app.py +++ b/conductor/conductor/app.py @@ -3,9 +3,11 @@ import glob import sys import traceback +import anyjson from conductor.openstack.common import service from workflow import Workflow from commands.dispatcher import CommandDispatcher +from openstack.common import log as logging from config import Config import reporting import rabbitmq @@ -15,17 +17,20 @@ import cloud_formation config = Config(sys.argv[1] if len(sys.argv) > 1 else None) +log = logging.getLogger(__name__) + def task_received(task, message_id): with rabbitmq.RmqClient() as rmqclient: - print 'Starting at', datetime.datetime.now() + log.info('Starting processing task {0}: {1}'.format( + message_id, anyjson.dumps(task))) reporter = reporting.Reporter(rmqclient, message_id, task['id']) command_dispatcher = CommandDispatcher( task['id'], rmqclient, task['token']) workflows = [] for path in glob.glob("data/workflows/*.xml"): - print "loading", path + log.debug('Loading XML {0}'.format(path)) workflow = Workflow(path, task, command_dispatcher, config, reporter) workflows.append(workflow) @@ -42,7 +47,7 @@ def task_received(task, message_id): if not command_dispatcher.execute_pending(): break except Exception as ex: - traceback.print_exc() + log.exception(ex) break command_dispatcher.close() @@ -53,7 +58,9 @@ def task_received(task, message_id): result_msg.id = message_id rmqclient.send(message=result_msg, key='task-results') - print 'Finished at', datetime.datetime.now() + log.info('Finished processing task {0}. Result = {1}'.format( + message_id, anyjson.dumps(task))) + class ConductorWorkflowService(service.Service): @@ -71,13 +78,13 @@ class ConductorWorkflowService(service.Service): while True: try: with rabbitmq.RmqClient() as rmq: - rmq.declare('tasks2', 'tasks2') - rmq.declare('task-results', 'tasks2') - with rmq.open('tasks2') as subscription: + rmq.declare('tasks', 'tasks') + rmq.declare('task-results', 'tasks') + with rmq.open('tasks') as subscription: while True: msg = subscription.get_message() self.tg.add_thread( task_received, msg.body, msg.id) except Exception as ex: - print ex + log.exception(ex) diff --git a/conductor/conductor/cloud_formation.py b/conductor/conductor/cloud_formation.py index db81b9a..ba33c8e 100644 --- a/conductor/conductor/cloud_formation.py +++ b/conductor/conductor/cloud_formation.py @@ -9,7 +9,6 @@ import string def update_cf_stack(engine, context, body, template, mappings, arguments, **kwargs): command_dispatcher = context['/commandDispatcher'] - print "update-cf", template callback = lambda result: engine.evaluate_content( body.find('success'), context) @@ -20,7 +19,6 @@ def update_cf_stack(engine, context, body, template, def delete_cf_stack(engine, context, body, **kwargs): command_dispatcher = context['/commandDispatcher'] - print "delete-cf" callback = lambda result: engine.evaluate_content( body.find('success'), context) diff --git a/conductor/conductor/commands/cloud_formation.py b/conductor/conductor/commands/cloud_formation.py index 9bc822c..cf0c2f0 100644 --- a/conductor/conductor/commands/cloud_formation.py +++ b/conductor/conductor/commands/cloud_formation.py @@ -1,14 +1,15 @@ import anyjson -import os -import uuid import eventlet +from conductor.openstack.common import log as logging import conductor.helpers from command import CommandBase import conductor.config from heatclient.client import Client import heatclient.exc +import types +log = logging.getLogger(__name__) class HeatExecutor(CommandBase): @@ -21,6 +22,8 @@ class HeatExecutor(CommandBase): token_only=True, token=token) def execute(self, command, callback, **kwargs): + log.debug('Got command {0} on stack {1}'.format(command, self._stack)) + if command == 'CreateOrUpdate': return self._execute_create_update( kwargs['template'], @@ -71,26 +74,27 @@ class HeatExecutor(CommandBase): arguments = conductor.helpers.merge_dicts( arguments, t['arguments'], max_levels=1) - print 'Executing heat template', anyjson.dumps(template), \ - 'with arguments', arguments, 'on stack', self._stack + log.info('Executing heat template {0} with arguments {1} on stack {2}' + .format(anyjson.dumps(template), arguments, self._stack)) try: - print 'try update' self._heat_client.stacks.update( stack_id=self._stack, parameters=arguments, template=template) - print 'wait update' + log.debug('Waiting for the stack {0} to be update' + .format(self._stack)) self._wait_state('UPDATE_COMPLETE') + log.info('Stack {0} updated'.format(self._stack)) except heatclient.exc.HTTPNotFound: - print 'try create' - self._heat_client.stacks.create( stack_name=self._stack, parameters=arguments, template=template) - print 'wait create' + log.debug('Waiting for the stack {0} to be create' + .format(self._stack)) self._wait_state('CREATE_COMPLETE') + log.info('Stack {0} created'.format(self._stack)) pending_list = self._update_pending_list @@ -105,12 +109,16 @@ class HeatExecutor(CommandBase): if not len(self._delete_pending_list): return False + log.debug('Deleting stack {0}'.format(self._stack)) try: self._heat_client.stacks.delete( stack_id=self._stack) - self._wait_state('DELETE_COMPLETE') - except Exception: - pass + log.debug('Waiting for the stack {0} to be deleted' + .format(self._stack)) + self._wait_state(['DELETE_COMPLETE', '']) + log.info('Stack {0} deleted'.format(self._stack)) + except Exception as ex: + log.exception(ex) pending_list = self._delete_pending_list self._delete_pending_list = [] @@ -120,13 +128,23 @@ class HeatExecutor(CommandBase): return True def _wait_state(self, state): + if isinstance(state, types.ListType): + states = state + else: + states = [state] + + while True: - status = self._heat_client.stacks.get( - stack_id=self._stack).stack_status + try: + status = self._heat_client.stacks.get( + stack_id=self._stack).stack_status + except heatclient.exc.HTTPNotFound: + status = '' + if 'IN_PROGRESS' in status: eventlet.sleep(1) continue - if status != state: + if status not in states: raise EnvironmentError() return diff --git a/conductor/conductor/commands/windows_agent.py b/conductor/conductor/commands/windows_agent.py index 8573489..c4d8086 100644 --- a/conductor/conductor/commands/windows_agent.py +++ b/conductor/conductor/commands/windows_agent.py @@ -1,17 +1,20 @@ import json import uuid -from conductor.rabbitmq import Message +from conductor.openstack.common import log as logging +from conductor.rabbitmq import Message import conductor.helpers from command import CommandBase +log = logging.getLogger(__name__) + class WindowsAgentExecutor(CommandBase): def __init__(self, stack, rmqclient): self._stack = stack self._rmqclient = rmqclient self._pending_list = [] - self._results_queue = '-execution-results-%s' % str(stack) + self._results_queue = '-execution-results-%s' % str(stack).lower() rmqclient.declare(self._results_queue) def execute(self, template, mappings, host, service, callback): @@ -34,8 +37,8 @@ class WindowsAgentExecutor(CommandBase): msg.id = id self._rmqclient.declare(host) self._rmqclient.send(message=msg, key=host) - print 'Sending RMQ message %s to %s' % ( - template_data, host) + log.info('Sending RMQ message {0} to {1} with id {2}'.format( + template_data, host, id)) def has_pending_commands(self): return len(self._pending_list) > 0 diff --git a/conductor/conductor/windows_agent.py b/conductor/conductor/windows_agent.py index 3a4fda7..2778a0d 100644 --- a/conductor/conductor/windows_agent.py +++ b/conductor/conductor/windows_agent.py @@ -1,5 +1,8 @@ import xml_code_engine +from openstack.common import log as logging +log = logging.getLogger(__name__) + def send_command(engine, context, body, template, service, host, mappings=None, result=None, **kwargs): @@ -7,8 +10,8 @@ def send_command(engine, context, body, template, service, host, mappings=None, command_dispatcher = context['/commandDispatcher'] def callback(result_value): - print "Received result for %s: %s. Body is %s" % \ - (template, result_value, body) + log.info('Received result from {3} for {0}: {1}. Body is {2}'.format( + template, result_value, body, host)) if result is not None: context[result] = result_value['Result'] diff --git a/conductor/etc/conductor.conf b/conductor/etc/conductor.conf index 24930b1..03f9913 100644 --- a/conductor/etc/conductor.conf +++ b/conductor/etc/conductor.conf @@ -1,5 +1,7 @@ [DEFAULT] log_file = logs/conductor.log +debug=True +verbose=True [heat] url = http://172.18.124.101:8004/v1/16eb78cbb688459c8308d89678bcef50