Merge "Enhance Watcher Applier Engine"
This commit is contained in:
commit
b45e8a1464
@ -76,6 +76,7 @@ Watcher Applier
|
||||
This component is in charge of executing the
|
||||
:ref:`Action Plan <action_plan_definition>` built by the
|
||||
:ref:`Watcher Decision Engine <watcher_decision_engine_definition>`.
|
||||
Taskflow is the default workflow engine for Watcher.
|
||||
|
||||
It connects to the :ref:`message bus <amqp_bus_definition>` and launches the
|
||||
:ref:`Action Plan <action_plan_definition>` whenever a triggering message is
|
||||
@ -110,6 +111,23 @@ If the :ref:`Action <action_definition>` fails, the
|
||||
previous state of the :ref:`Managed resource <managed_resource_definition>`
|
||||
(i.e. before the command was sent to the underlying OpenStack service).
|
||||
|
||||
In Stein, added a new config option 'action_execution_rule' which is a
|
||||
dict type. Its key field is strategy name and the value is 'ALWAYS' or 'ANY'.
|
||||
'ALWAYS' means the callback function returns True as usual.
|
||||
'ANY' means the return depends on the result of previous action execution.
|
||||
The callback returns True if previous action gets failed, and the engine
|
||||
continues to run the next action. If previous action executes success,
|
||||
the callback returns False then the next action will be ignored.
|
||||
For strategies that aren't in 'action_execution_rule', the callback always
|
||||
returns True.
|
||||
Please add the next section in the watcher.conf file
|
||||
if your strategy needs this feature.
|
||||
|
||||
::
|
||||
|
||||
[watcher_workflow_engines.taskflow]
|
||||
action_execution_rule = {'your strategy name': 'ANY'}
|
||||
|
||||
.. _archi_watcher_cli_definition:
|
||||
|
||||
Watcher CLI
|
||||
|
@ -0,0 +1,16 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Added a new config option 'action_execution_rule' which is a dict type.
|
||||
Its key field is strategy name and the value is 'ALWAYS' or 'ANY'.
|
||||
'ALWAYS' means the callback function returns True as usual.
|
||||
'ANY' means the return depends on the result of previous action execution.
|
||||
The callback returns True if previous action gets failed, and the engine
|
||||
continues to run the next action. If previous action executes success,
|
||||
the callback returns False then the next action will be ignored.
|
||||
For strategies that aren't in 'action_execution_rule', the callback always
|
||||
returns True.
|
||||
Please add the next section in the watcher.conf file
|
||||
if your strategy needs this feature.
|
||||
[watcher_workflow_engines.taskflow]
|
||||
action_execution_rule = {'your strategy name': 'ANY'}
|
@ -58,6 +58,7 @@ class BaseWorkFlowEngine(loadable.Loadable):
|
||||
self._action_factory = factory.ActionFactory()
|
||||
self._osc = None
|
||||
self._is_notified = False
|
||||
self.execution_rule = None
|
||||
|
||||
@classmethod
|
||||
def get_config_opts(cls):
|
||||
@ -206,11 +207,14 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
et = eventlet.spawn(_do_execute_action, *args, **kwargs)
|
||||
# NOTE: check for the state of action plan periodically,so that if
|
||||
# action is finished or action plan is cancelled we can exit from here.
|
||||
result = False
|
||||
while True:
|
||||
action_object = objects.Action.get_by_uuid(
|
||||
self.engine.context, self._db_action.uuid, eager=True)
|
||||
action_plan_object = objects.ActionPlan.get_by_id(
|
||||
self.engine.context, action_object.action_plan_id)
|
||||
if action_object.state == objects.action.State.SUCCEEDED:
|
||||
result = True
|
||||
if (action_object.state in [objects.action.State.SUCCEEDED,
|
||||
objects.action.State.FAILED] or
|
||||
action_plan_object.state in CANCEL_STATE):
|
||||
@ -226,6 +230,7 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
if (action_plan_object.state in CANCEL_STATE and abort):
|
||||
et.kill()
|
||||
et.wait()
|
||||
return result
|
||||
|
||||
# NOTE: catch the greenlet exit exception due to thread kill,
|
||||
# taskflow will call revert for the action,
|
||||
@ -236,7 +241,8 @@ class BaseTaskFlowActionContainer(flow_task.Task):
|
||||
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
raise
|
||||
# return False instead of raising an exception
|
||||
return False
|
||||
|
||||
def post_execute(self):
|
||||
try:
|
||||
|
@ -38,8 +38,6 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
"""
|
||||
|
||||
def decider(self, history):
|
||||
# FIXME(jed) not possible with the current Watcher Planner
|
||||
#
|
||||
# decider – A callback function that will be expected to
|
||||
# decide at runtime whether v should be allowed to execute
|
||||
# (or whether the execution of v should be ignored,
|
||||
@ -48,7 +46,11 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
# all u decidable links that have v as a target. It is expected
|
||||
# to return a single boolean
|
||||
# (True to allow v execution or False to not).
|
||||
return True
|
||||
LOG.info("decider history: %s", history)
|
||||
if history and self.execution_rule == 'ANY':
|
||||
return not list(history.values())[0]
|
||||
else:
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def get_config_opts(cls):
|
||||
@ -59,9 +61,27 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
min=1,
|
||||
required=True,
|
||||
help='Number of workers for taskflow engine '
|
||||
'to execute actions.')
|
||||
'to execute actions.'),
|
||||
cfg.DictOpt(
|
||||
'action_execution_rule',
|
||||
default={},
|
||||
help='The execution rule for linked actions,'
|
||||
'the key is strategy name and '
|
||||
'value ALWAYS means all actions will be executed,'
|
||||
'value ANY means if previous action executes '
|
||||
'success, the next action will be ignored.'
|
||||
'None means ALWAYS.')
|
||||
]
|
||||
|
||||
def get_execution_rule(self, actions):
|
||||
if actions:
|
||||
actionplan_object = objects.ActionPlan.get_by_id(
|
||||
self.context, actions[0].action_plan_id)
|
||||
strategy_object = objects.Strategy.get_by_id(
|
||||
self.context, actionplan_object.strategy_id)
|
||||
return self.config.action_execution_rule.get(
|
||||
strategy_object.name)
|
||||
|
||||
def execute(self, actions):
|
||||
try:
|
||||
# NOTE(jed) We want to have a strong separation of concern
|
||||
@ -72,6 +92,7 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
|
||||
# the users to change it.
|
||||
# The current implementation uses graph with linked actions.
|
||||
# todo(jed) add olso conf for retry and name
|
||||
self.execution_rule = self.get_execution_rule(actions)
|
||||
flow = gf.Flow("watcher_flow")
|
||||
actions_uuid = {}
|
||||
for a in actions:
|
||||
|
@ -66,9 +66,11 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
applier_manager=mock.MagicMock())
|
||||
self.engine.config.max_workers = 2
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch('taskflow.engines.load')
|
||||
@mock.patch('taskflow.patterns.graph_flow.Flow.link')
|
||||
def test_execute(self, graph_flow, engines):
|
||||
def test_execute(self, graph_flow, engines, m_actionplan, m_strategy):
|
||||
actions = mock.MagicMock()
|
||||
try:
|
||||
self.engine.execute(actions)
|
||||
@ -111,14 +113,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_one_action(self, mock_send_update,
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = [self.create_action("nop", {'message': 'test'})]
|
||||
try:
|
||||
self.engine.execute(actions)
|
||||
@ -127,14 +132,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_nop_sleep(self, mock_send_update,
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = []
|
||||
first_nop = self.create_action("nop", {'message': 'test'})
|
||||
second_nop = self.create_action("nop", {'message': 'second test'})
|
||||
@ -149,14 +157,17 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_parents(self, mock_send_update,
|
||||
mock_execution_notification,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = []
|
||||
first_nop = self.create_action(
|
||||
"nop", {'message': 'test'},
|
||||
@ -221,13 +232,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_two_actions(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = []
|
||||
second = self.create_action("sleep", {'duration': 0.0})
|
||||
first = self.create_action("nop", {'message': 'test'})
|
||||
@ -242,13 +256,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_three_actions(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = []
|
||||
third = self.create_action("nop", {'message': 'next'})
|
||||
second = self.create_action("sleep", {'duration': 0.0})
|
||||
@ -269,13 +286,16 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
def test_execute_with_exception(self, m_send_update, m_execution,
|
||||
m_get_actionplan):
|
||||
m_get_actionplan, m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = []
|
||||
|
||||
third = self.create_action("no_exist", {'message': 'next'})
|
||||
@ -290,29 +310,28 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
actions.append(second)
|
||||
actions.append(third)
|
||||
|
||||
self.assertRaises(exception.WorkflowExecutionException,
|
||||
self.engine.execute, actions)
|
||||
self.engine.execute(actions)
|
||||
|
||||
self.check_action_state(first, objects.action.State.SUCCEEDED)
|
||||
self.check_action_state(second, objects.action.State.SUCCEEDED)
|
||||
self.check_action_state(third, objects.action.State.FAILED)
|
||||
|
||||
@mock.patch.object(objects.Strategy, "get_by_id")
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_id")
|
||||
@mock.patch.object(notifications.action, 'send_execution_notification')
|
||||
@mock.patch.object(notifications.action, 'send_update')
|
||||
@mock.patch.object(factory.ActionFactory, "make_action")
|
||||
def test_execute_with_action_exception(self, m_make_action, m_send_update,
|
||||
m_send_execution, m_get_actionplan):
|
||||
def test_execute_with_action_failed(self, m_make_action, m_send_update,
|
||||
m_send_execution, m_get_actionplan,
|
||||
m_get_strategy):
|
||||
m_get_actionplan.return_value = obj_utils.get_test_action_plan(
|
||||
self.context, id=0)
|
||||
m_get_strategy.return_value = obj_utils.get_test_strategy(
|
||||
self.context, id=1)
|
||||
actions = [self.create_action("fake_action", {})]
|
||||
m_make_action.return_value = FakeAction(mock.Mock())
|
||||
|
||||
exc = self.assertRaises(exception.WorkflowExecutionException,
|
||||
self.engine.execute, actions)
|
||||
|
||||
self.assertIsInstance(exc.kwargs['error'],
|
||||
exception.ActionExecutionFailure)
|
||||
self.engine.execute(actions)
|
||||
self.check_action_state(actions[0], objects.action.State.FAILED)
|
||||
|
||||
@mock.patch.object(objects.ActionPlan, "get_by_uuid")
|
||||
@ -353,3 +372,20 @@ class TestDefaultWorkFlowEngine(base.DbTestCase):
|
||||
|
||||
except Exception as exc:
|
||||
self.fail(exc)
|
||||
|
||||
def test_decider(self):
|
||||
# execution_rule is ALWAYS
|
||||
self.engine.execution_rule = 'ALWAYS'
|
||||
history = {'action1': True}
|
||||
self.assertTrue(self.engine.decider(history))
|
||||
|
||||
history = {'action1': False}
|
||||
self.assertTrue(self.engine.decider(history))
|
||||
|
||||
# execution_rule is ANY
|
||||
self.engine.execution_rule = 'ANY'
|
||||
history = {'action1': True}
|
||||
self.assertFalse(self.engine.decider(history))
|
||||
|
||||
history = {'action1': False}
|
||||
self.assertTrue(self.engine.decider(history))
|
||||
|
@ -21,7 +21,6 @@ import mock
|
||||
|
||||
from watcher.applier.workflow_engine import default as tflow
|
||||
from watcher.common import clients
|
||||
from watcher.common import exception
|
||||
from watcher.common import nova_helper
|
||||
from watcher import objects
|
||||
from watcher.tests.db import base
|
||||
@ -81,8 +80,8 @@ class TestTaskFlowActionContainer(base.DbTestCase):
|
||||
db_action=action,
|
||||
engine=self.engine)
|
||||
|
||||
self.assertRaises(exception.ActionExecutionFailure,
|
||||
action_container.execute, action_id=action.uuid)
|
||||
result = action_container.execute()
|
||||
self.assertFalse(result)
|
||||
|
||||
self.assertTrue(action.state, objects.action.State.FAILED)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user