From e420357dcd64a84e4172eb6f0b5529bf43e89ac7 Mon Sep 17 00:00:00 2001 From: Adriano Petrich Date: Thu, 15 Jun 2017 16:17:54 +0100 Subject: [PATCH] Add more use of mistral-lib in mistral This is a middle step to move serveral parts into mistral-lib Fixes a typo on the deprecation message Changes the last actions to use mistral-lib actions Convert the serialization to use mistral-lib serialization Use mistral-lib results as the standard. (the serialization mixin change is required for the results to work) Next steps are: change mistral-lib serialization to take care of all serialization change dependent libraries to use mistral-lib directly Change-Id: I4eacf5ce2e72916b700e8bc77ac9d95859131931 --- mistral/actions/base.py | 2 +- mistral/actions/std_actions.py | 3 +- .../api/controllers/v2/action_execution.py | 8 +- mistral/engine/actions.py | 4 +- mistral/engine/workflows.py | 8 +- mistral/executors/base.py | 4 +- mistral/executors/default_executor.py | 7 +- mistral/serialization.py | 83 ++----------------- .../unit/actions/test_std_http_action.py | 4 +- .../unit/api/v2/test_action_executions.py | 12 +-- .../tests/unit/engine/test_default_engine.py | 6 +- .../tests/unit/engine/test_direct_workflow.py | 6 +- .../test_direct_workflow_rerun_cancelled.py | 16 ++-- .../tests/unit/engine/test_error_handling.py | 4 +- .../tests/unit/engine/test_error_result.py | 8 +- .../test_execution_fields_size_limitation.py | 8 +- .../tests/unit/engine/test_race_condition.py | 6 +- .../test_reverse_workflow_rerun_cancelled.py | 6 +- mistral/tests/unit/engine/test_run_action.py | 6 +- mistral/tests/unit/engine/test_task_cancel.py | 10 +-- mistral/tests/unit/engine/test_with_items.py | 39 +++++---- .../tests/unit/engine/test_workflow_resume.py | 4 +- mistral/tests/unit/services/test_scheduler.py | 6 +- mistral/workflow/utils.py | 64 +------------- 24 files changed, 101 insertions(+), 223 deletions(-) diff --git a/mistral/actions/base.py b/mistral/actions/base.py index 139f47ac6..1154a0d86 100644 --- a/mistral/actions/base.py +++ b/mistral/actions/base.py @@ -16,7 +16,7 @@ import abc import warnings warnings.warn( - "mistral.actions.Action is deprecated as of the 5.0.0 release in favor of" + "mistral.actions.Action is deprecated as of the 5.0.0 release in favor of " "mistral_lib. It will be removed in a future release.", DeprecationWarning ) diff --git a/mistral/actions/std_actions.py b/mistral/actions/std_actions.py index a89b48449..ef9e41b67 100644 --- a/mistral/actions/std_actions.py +++ b/mistral/actions/std_actions.py @@ -25,7 +25,6 @@ import time from mistral import exceptions as exc from mistral.utils import javascript from mistral.utils import ssh_utils -from mistral.workflow import utils as wf_utils from mistral_lib import actions from oslo_log import log as logging @@ -223,7 +222,7 @@ class HTTPAction(actions.Action): } if resp.status_code not in range(200, 307): - return wf_utils.Result(error=_result) + return actions.Result(error=_result) return _result diff --git a/mistral/api/controllers/v2/action_execution.py b/mistral/api/controllers/v2/action_execution.py index 081dc9928..26169373c 100644 --- a/mistral/api/controllers/v2/action_execution.py +++ b/mistral/api/controllers/v2/action_execution.py @@ -29,7 +29,7 @@ from mistral.rpc import clients as rpc from mistral.utils import filter_utils from mistral.utils import rest_utils from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions LOG = logging.getLogger(__name__) @@ -169,13 +169,13 @@ class ActionExecutionsController(rest.RestController): output = action_ex.output if action_ex.state == states.SUCCESS: - result = wf_utils.Result(data=output) + result = ml_actions.Result(data=output) elif action_ex.state == states.ERROR: if not output: output = 'Unknown error' - result = wf_utils.Result(error=output) + result = ml_actions.Result(error=output) elif action_ex.state == states.CANCELLED: - result = wf_utils.Result(cancel=True) + result = ml_actions.Result(cancel=True) else: raise exc.InvalidResultException( "Error. Expected one of %s, actual: %s" % ( diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 582448188..28414927d 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -33,7 +33,7 @@ from mistral import utils from mistral.utils import wf_trace from mistral.workflow import data_flow from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions LOG = logging.getLogger(__name__) @@ -410,7 +410,7 @@ class AdHocAction(PythonAction): transformer = adhoc_action_spec.get_output() if transformer is not None: - result = wf_utils.Result( + result = ml_actions.Result( data=expr.evaluate_recursively(transformer, result.data), error=result.error ) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index b64f4644f..eba20a063 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -36,7 +36,7 @@ from mistral.workflow import commands from mistral.workflow import data_flow from mistral.workflow import lookup_utils from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions LOG = logging.getLogger(__name__) @@ -448,21 +448,21 @@ def _send_result_to_parent_workflow(wf_ex_id): wf_output = wf_ex.output if wf_ex.state == states.SUCCESS: - result = wf_utils.Result(data=wf_output) + result = ml_actions.Result(data=wf_output) elif wf_ex.state == states.ERROR: err_msg = ( wf_ex.state_info or 'Failed subworkflow [execution_id=%s]' % wf_ex.id ) - result = wf_utils.Result(error=err_msg) + result = ml_actions.Result(error=err_msg) elif wf_ex.state == states.CANCELLED: err_msg = ( wf_ex.state_info or 'Cancelled subworkflow [execution_id=%s]' % wf_ex.id ) - result = wf_utils.Result(error=err_msg, cancel=True) + result = ml_actions.Result(error=err_msg, cancel=True) else: raise RuntimeError( "Method _send_result_to_parent_workflow() must never be called" diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 81ebd5ca6..9789a8d46 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -15,10 +15,12 @@ import abc import six +from mistral import serialization +from mistral_lib.actions import types from stevedore import driver - _EXECUTORS = {} +serialization.register_serializer(types.Result, types.ResultSerializer()) def cleanup(): diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index 6d09c6df6..8b39f219c 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -24,7 +24,6 @@ from mistral import exceptions as exc from mistral.executors import base from mistral.rpc import clients as rpc from mistral.utils import inspect_utils as i_u -from mistral.workflow import utils as wf_utils LOG = logging.getLogger(__name__) @@ -55,7 +54,7 @@ class DefaultExecutor(base.Executor): """ def send_error_back(error_msg): - error_result = wf_utils.Result(error=error_msg) + error_result = mistral_lib.Result(error=error_msg) if action_ex_id: self._engine_client.on_action_complete( @@ -114,8 +113,8 @@ class DefaultExecutor(base.Executor): # Note: it's made for backwards compatibility with already # existing Mistral actions which don't return result as # instance of workflow.utils.Result. - if not isinstance(result, wf_utils.Result): - result = wf_utils.Result(data=result) + if not isinstance(result, mistral_lib.Result): + result = mistral_lib.Result(data=result) except Exception as e: msg = ( diff --git a/mistral/serialization.py b/mistral/serialization.py index 6cd6330e8..8bfe5b68e 100644 --- a/mistral/serialization.py +++ b/mistral/serialization.py @@ -12,87 +12,20 @@ # License for the specific language governing permissions and limitations # under the License. -import abc - +from mistral_lib import serialization as ml_serialization from oslo_serialization import jsonutils - _SERIALIZER = None - -class Serializer(object): - """Base interface for entity serializers. - - A particular serializer knows how to convert a certain object - into a string and back from that string into an object whose - state is equivalent to the initial object. - """ - - @abc.abstractmethod - def serialize(self, entity): - """Converts the given object into a string. - - :param entity: A object to be serialized. - :return String containing the state of the object in serialized form. - """ - raise NotImplementedError - - @abc.abstractmethod - def deserialize(self, data_str): - """Converts the given string into an object. - - :param data_str: String containing the state of the object in - serialized form. - :return: An object. - """ - raise NotImplementedError +# Backwards compatibility +Serializer = ml_serialization.Serializer +DictBasedSerializer = ml_serialization.DictBasedSerializer +MistralSerializable = ml_serialization.MistralSerializable -class DictBasedSerializer(Serializer): - """Dictionary-based serializer. - - It slightly simplifies implementing custom serializers by introducing - a contract based on dictionary. A serializer class extending this class - just needs to implement conversion from object into dict and from dict - to object. It doesn't need to convert into string and back as required - bye the base serializer contract. Conversion into string is implemented - once with regard to possible problems that may occur for collection and - primitive types as circular dependencies, correct date format etc. - """ - - def serialize(self, entity): - if entity is None: - return None - - entity_dict = self.serialize_to_dict(entity) - - return jsonutils.dumps( - jsonutils.to_primitive(entity_dict, convert_instances=True) - ) - - def deserialize(self, data_str): - if data_str is None: - return None - - entity_dict = jsonutils.loads(data_str) - - return self.deserialize_from_dict(entity_dict) - - @abc.abstractmethod - def serialize_to_dict(self, entity): - raise NotImplementedError - - @abc.abstractmethod - def deserialize_from_dict(self, entity_dict): - raise NotImplementedError - - -class MistralSerializable(object): - """A mixin to generate a serialization key for a custom object.""" - - @classmethod - def get_serialization_key(cls): - return "%s.%s" % (cls.__module__, cls.__name__) +# PolymorphicSerializer cannot be used from mistral-lib yet +# as mistral-lib does not have the unregister method. +# Once it does this will be removed also in favor of mistral-lib class PolymorphicSerializer(Serializer): diff --git a/mistral/tests/unit/actions/test_std_http_action.py b/mistral/tests/unit/actions/test_std_http_action.py index 4bb159529..f45569a8f 100644 --- a/mistral/tests/unit/actions/test_std_http_action.py +++ b/mistral/tests/unit/actions/test_std_http_action.py @@ -19,7 +19,7 @@ import requests from mistral.actions import std_actions as std from mistral.tests.unit import base -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions URL = 'http://some_url' @@ -109,7 +109,7 @@ class HTTPActionTest(base.BaseTest): result = action.run(mock_ctx) - self.assertIsInstance(result, wf_utils.Result) + self.assertIsInstance(result, ml_actions.Result) self.assertEqual(401, result.error['status']) mocked_method.assert_called_with( diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index 69a843aaa..a0314426f 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -31,7 +31,7 @@ from mistral.rpc import clients as rpc_clients from mistral.tests.unit.api import base from mistral.utils import rest_utils from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # This line is needed for correct initialization of messaging config. oslo_messaging.get_rpc_transport(cfg.CONF) @@ -332,7 +332,7 @@ class TestActionExecutionsController(base.APITest): f.assert_called_once_with( UPDATED_ACTION['id'], - wf_utils.Result(data=ACTION_EX_DB.output) + ml_actions.Result(data=ACTION_EX_DB.output) ) @mock.patch.object(rpc_clients.EngineClient, 'on_action_complete') @@ -349,7 +349,7 @@ class TestActionExecutionsController(base.APITest): f.assert_called_once_with( ERROR_ACTION_WITH_OUTPUT['id'], - wf_utils.Result(error=ERROR_ACTION_RES_WITH_OUTPUT) + ml_actions.Result(error=ERROR_ACTION_RES_WITH_OUTPUT) ) @mock.patch.object(rpc_clients.EngineClient, 'on_action_complete') @@ -362,7 +362,7 @@ class TestActionExecutionsController(base.APITest): f.assert_called_once_with( ERROR_ACTION_FOR_EMPTY_OUTPUT['id'], - wf_utils.Result(error=DEFAULT_ERROR_OUTPUT) + ml_actions.Result(error=DEFAULT_ERROR_OUTPUT) ) @mock.patch.object(rpc_clients.EngineClient, 'on_action_complete') @@ -378,7 +378,7 @@ class TestActionExecutionsController(base.APITest): f.assert_called_once_with( ERROR_ACTION_FOR_EMPTY_OUTPUT['id'], - wf_utils.Result(error=DEFAULT_ERROR_OUTPUT) + ml_actions.Result(error=DEFAULT_ERROR_OUTPUT) ) @mock.patch.object(rpc_clients.EngineClient, 'on_action_complete') @@ -392,7 +392,7 @@ class TestActionExecutionsController(base.APITest): on_action_complete_mock_func.assert_called_once_with( CANCELLED_ACTION['id'], - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) @mock.patch.object( diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 7df297d1c..20331a921 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -29,7 +29,7 @@ from mistral.services import workbooks as wb_service from mistral.tests.unit import base from mistral.tests.unit.engine import base as eng_test_base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases @@ -337,7 +337,7 @@ class DefaultEngineTest(base.DbTestCase): # Finish action of 'task1'. task1_action_ex = self.engine.on_action_complete( task1_action_ex.id, - wf_utils.Result(data='Hey') + ml_actions.Result(data='Hey') ) self.assertIsInstance(task1_action_ex, models.ActionExecution) @@ -379,7 +379,7 @@ class DefaultEngineTest(base.DbTestCase): # Finish 'task2'. task2_action_ex = self.engine.on_action_complete( task2_action_ex.id, - wf_utils.Result(data='Hi') + ml_actions.Result(data='Hi') ) with db_api.transaction(): diff --git a/mistral/tests/unit/engine/test_direct_workflow.py b/mistral/tests/unit/engine/test_direct_workflow.py index bb1c2e1f8..f223b2669 100644 --- a/mistral/tests/unit/engine/test_direct_workflow.py +++ b/mistral/tests/unit/engine/test_direct_workflow.py @@ -20,7 +20,7 @@ from mistral import exceptions as exc from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases @@ -352,7 +352,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): # Update async action execution result. self.engine.on_action_complete( task_1_action_exs[0].id, - wf_utils.Result(data='foobar') + ml_actions.Result(data='foobar') ) with db_api.transaction(): @@ -554,7 +554,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase): # Update async action execution result. self.engine.on_action_complete( task_1_action_exs[0].id, - wf_utils.Result(data='foobar') + ml_actions.Result(data='foobar') ) # Assert that task1 is SUCCESS and workflow is ERROR. diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py b/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py index 2a750edba..5240c6d59 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py @@ -21,7 +21,7 @@ from mistral.db.v2 import api as db_api from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. @@ -88,7 +88,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): # Cancel action execution for task. self.engine.on_action_complete( wf1_t1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_task_cancelled(wf1_t1_ex.id) @@ -134,7 +134,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): self.engine.on_action_complete( wf1_t1_action_exs[1].id, - wf_utils.Result(data={'foo': 'bar'}) + ml_actions.Result(data={'foo': 'bar'}) ) # Wait for the workflow to succeed. @@ -333,7 +333,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): # Mark async action execution complete. self.engine.on_action_complete( wf2_t1_action_exs[0].id, - wf_utils.Result(data={'foo': 'bar'}) + ml_actions.Result(data={'foo': 'bar'}) ) # Wait for the workflows to succeed. @@ -457,7 +457,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): # Cancel action execution for task. self.engine.on_action_complete( wf2_t1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(wf2_ex.id) @@ -507,7 +507,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): # Mark async action execution complete. self.engine.on_action_complete( wf2_t1_action_exs[1].id, - wf_utils.Result(data={'foo': 'bar'}) + ml_actions.Result(data={'foo': 'bar'}) ) # Wait for the workflows to succeed. @@ -587,7 +587,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): for wf1_t1_action_ex in wf1_t1_action_exs: self.engine.on_action_complete( wf1_t1_action_ex.id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(wf1_ex.id) @@ -633,7 +633,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase): for i in range(3, 6): self.engine.on_action_complete( wf1_t1_action_exs[i].id, - wf_utils.Result(data={'foo': 'bar'}) + ml_actions.Result(data={'foo': 'bar'}) ) # Wait for the workflows to succeed. diff --git a/mistral/tests/unit/engine/test_error_handling.py b/mistral/tests/unit/engine/test_error_handling.py index bb3afe81f..46ec536f3 100644 --- a/mistral/tests/unit/engine/test_error_handling.py +++ b/mistral/tests/unit/engine/test_error_handling.py @@ -14,13 +14,13 @@ from oslo_config import cfg -from mistral.actions import base as actions_base from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states +from mistral_lib import actions as actions_base # Use the set_default method to set value otherwise in certain test cases @@ -32,7 +32,7 @@ class InvalidUnicodeAction(actions_base.Action): def __init__(self): pass - def run(self): + def run(self, context): return b'\xf8' def test(self): diff --git a/mistral/tests/unit/engine/test_error_result.py b/mistral/tests/unit/engine/test_error_result.py index d56b74999..de4c7cf87 100644 --- a/mistral/tests/unit/engine/test_error_result.py +++ b/mistral/tests/unit/engine/test_error_result.py @@ -14,15 +14,13 @@ from oslo_config import cfg -from mistral.actions import base as actions_base from mistral.db.v2 import api as db_api from mistral.services import workflows as wf_service from mistral.tests.unit import base as test_base from mistral.tests.unit.engine import base from mistral.workflow import data_flow from mistral.workflow import states -from mistral.workflow import utils as wf_utils - +from mistral_lib import actions as actions_base # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. @@ -63,8 +61,8 @@ class MyAction(actions_base.Action): self.success_result = success_result self.error_result = error_result - def run(self): - return wf_utils.Result( + def run(self, context): + return actions_base.Result( data=self.success_result, error=self.error_result ) diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index 269faea82..0d01a56d8 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -14,7 +14,7 @@ from oslo_config import cfg -from mistral_lib.actions import base as actions_base +from mistral_lib import actions as actions_base from mistral.db.v2 import api as db_api from mistral import exceptions as exc @@ -22,8 +22,6 @@ from mistral.services import workflows as wf_service from mistral.tests.unit import base as test_base from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils - # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. @@ -70,9 +68,9 @@ class MyAction(actions_base.Action): result[i] = 'A' if not self.error: - return wf_utils.Result(data=result) + return actions_base.Result(data=result) else: - return wf_utils.Result(error=result) + return actions_base.Result(error=result) def test(self): raise NotImplementedError diff --git a/mistral/tests/unit/engine/test_race_condition.py b/mistral/tests/unit/engine/test_race_condition.py index 9f79266e0..70e7d0180 100644 --- a/mistral/tests/unit/engine/test_race_condition.py +++ b/mistral/tests/unit/engine/test_race_condition.py @@ -17,12 +17,12 @@ from eventlet import semaphore from oslo_config import cfg import testtools -from mistral.actions import base as action_base from mistral.db.v2 import api as db_api from mistral.services import workflows as wf_service from mistral.tests.unit import base as test_base from mistral.tests.unit.engine import base from mistral.workflow import states +from mistral_lib import actions as actions_base # Use the set_default method to set value otherwise in certain test cases @@ -88,7 +88,7 @@ ACTION_SEMAPHORE = None TEST_SEMAPHORE = None -class BlockingAction(action_base.Action): +class BlockingAction(actions_base.Action): def __init__(self): pass @@ -100,7 +100,7 @@ class BlockingAction(action_base.Action): def wait_for_test(): ACTION_SEMAPHORE.acquire() - def run(self): + def run(self, context): self.unblock_test() self.wait_for_test() diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py index 476f732cf..49002a303 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py @@ -21,7 +21,7 @@ from mistral.db.v2 import api as db_api from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases @@ -86,7 +86,7 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase): # Cancel action execution for task. self.engine.on_action_complete( wf1_t1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(wf1_ex.id) @@ -142,7 +142,7 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase): self.engine.on_action_complete( wf1_t1_action_exs[1].id, - wf_utils.Result(data={'foo': 'bar'}) + ml_actions.Result(data={'foo': 'bar'}) ) # Wait for the workflow to succeed. diff --git a/mistral/tests/unit/engine/test_run_action.py b/mistral/tests/unit/engine/test_run_action.py index 2314e8869..a0e3e9338 100644 --- a/mistral/tests/unit/engine/test_run_action.py +++ b/mistral/tests/unit/engine/test_run_action.py @@ -22,7 +22,7 @@ from mistral import exceptions as exc from mistral.services import actions from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases # the change in value is not permanent. @@ -188,7 +188,7 @@ class RunActionEngineTest(base.EngineTestCase): @mock.patch.object( std_actions.AsyncNoOpAction, 'run', - mock.MagicMock(return_value=wf_utils.Result(error='Invoke erred.'))) + mock.MagicMock(return_value=ml_actions.Result(error='Invoke erred.'))) def test_run_action_async_invoke_with_error(self): action_ex = self.engine.start_action('std.async_noop', {}) @@ -303,7 +303,7 @@ class RunActionEngineTest(base.EngineTestCase): 'scope': 'public' }) def_mock.return_value = action_def - run_mock.return_value = wf_utils.Result(data='Hello') + run_mock.return_value = ml_actions.Result(data='Hello') class_ret = mock.MagicMock() class_mock.return_value = class_ret diff --git a/mistral/tests/unit/engine/test_task_cancel.py b/mistral/tests/unit/engine/test_task_cancel.py index e89801669..edf9a9eef 100644 --- a/mistral/tests/unit/engine/test_task_cancel.py +++ b/mistral/tests/unit/engine/test_task_cancel.py @@ -22,7 +22,7 @@ from mistral.services import workbooks as wb_service from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions class TaskCancelTest(base.EngineTestCase): @@ -75,7 +75,7 @@ class TaskCancelTest(base.EngineTestCase): self.engine.on_action_complete( task_1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(wf_ex.id) @@ -171,7 +171,7 @@ class TaskCancelTest(base.EngineTestCase): self.engine.on_action_complete( task_1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(subwf_ex.id) @@ -242,7 +242,7 @@ class TaskCancelTest(base.EngineTestCase): self.engine.on_action_complete( task_1_action_exs[0].id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_workflow_cancelled(wf_ex.id) @@ -333,7 +333,7 @@ class TaskCancelTest(base.EngineTestCase): for wf1_t1_action_ex in wf1_t1_action_exs: self.engine.on_action_complete( wf1_t1_action_ex.id, - wf_utils.Result(cancel=True) + ml_actions.Result(cancel=True) ) self.await_task_cancelled(wf1_t1_ex.id) diff --git a/mistral/tests/unit/engine/test_with_items.py b/mistral/tests/unit/engine/test_with_items.py index 85209bc50..7107cd045 100644 --- a/mistral/tests/unit/engine/test_with_items.py +++ b/mistral/tests/unit/engine/test_with_items.py @@ -16,7 +16,6 @@ import copy import mock from oslo_config import cfg -from mistral.actions import base as action_base from mistral.actions import std_actions from mistral.db.v2 import api as db_api from mistral import exceptions as exc @@ -27,7 +26,7 @@ from mistral.tests.unit.engine import base from mistral import utils from mistral.workflow import data_flow from mistral.workflow import states -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as actions_base # TODO(nmakhotkin) Need to write more tests. @@ -152,11 +151,11 @@ WF_INPUT_ONE_ITEM = { } -class RandomSleepEchoAction(action_base.Action): +class RandomSleepEchoAction(actions_base.Action): def __init__(self, output): self.output = output - def run(self): + def run(self, context): utils.random_sleep(1) return self.output @@ -399,11 +398,17 @@ class WithItemsEngineTest(base.EngineTestCase): act_exs = task_ex.executions - self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan")) - self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John")) + self.engine.on_action_complete( + act_exs[0].id, + actions_base.Result("Ivan") + ) + self.engine.on_action_complete( + act_exs[1].id, + actions_base.Result("John") + ) self.engine.on_action_complete( act_exs[2].id, - wf_utils.Result("Mistral") + actions_base.Result("Mistral") ) self.await_workflow_success(wf_ex.id) @@ -636,7 +641,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 1st iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("John") + actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. @@ -652,7 +657,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 2nd iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("Ivan") + actions_base.Result("Ivan") ) self._await(lambda: len(db_api.get_delayed_calls()) == 1) @@ -666,7 +671,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 3rd iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("Mistral") + actions_base.Result("Mistral") ) self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) @@ -800,7 +805,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 1st iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("John") + actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. @@ -818,7 +823,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 2nd iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("Ivan") + actions_base.Result("Ivan") ) self._await(lambda: len(db_api.get_delayed_calls()) == 1) @@ -834,7 +839,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 3rd iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("Mistral") + actions_base.Result("Mistral") ) self._await(lambda: len(db_api.get_delayed_calls()) == 1) @@ -849,7 +854,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 4th iteration complete. self.engine.on_action_complete( incomplete_action.id, - wf_utils.Result("Hello") + actions_base.Result("Hello") ) self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) @@ -952,7 +957,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 1st iteration complete. self.engine.on_action_complete( self._get_incomplete_action(task_ex).id, - wf_utils.Result("John") + actions_base.Result("John") ) # Wait till the delayed on_action_complete is processed. @@ -969,7 +974,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 2nd iteration complete. self.engine.on_action_complete( incomplete_action.id, - wf_utils.Result("Ivan") + actions_base.Result("Ivan") ) self._await(lambda: len(db_api.get_delayed_calls()) == 1) @@ -984,7 +989,7 @@ class WithItemsEngineTest(base.EngineTestCase): # 3rd iteration complete. self.engine.on_action_complete( incomplete_action.id, - wf_utils.Result("Mistral") + actions_base.Result("Mistral") ) self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1)) diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index 0cb4c32ae..4c8a860b5 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -22,7 +22,7 @@ from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base from mistral.workflow import data_flow from mistral.workflow import states -from mistral.workflow import utils +from mistral_lib import actions as ml_actions # Use the set_default method to set value otherwise in certain test cases @@ -365,7 +365,7 @@ class WorkflowResumeTest(base.EngineTestCase): task_execution_id=task2_ex.id )[0] - self.engine.on_action_complete(task2_action_ex.id, utils.Result()) + self.engine.on_action_complete(task2_action_ex.id, ml_actions.Result()) self.await_workflow_success(wf_ex.id) diff --git a/mistral/tests/unit/services/test_scheduler.py b/mistral/tests/unit/services/test_scheduler.py index ac5197ed5..18c38f80f 100644 --- a/mistral/tests/unit/services/test_scheduler.py +++ b/mistral/tests/unit/services/test_scheduler.py @@ -22,7 +22,7 @@ from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.services import scheduler from mistral.tests.unit import base -from mistral.workflow import utils as wf_utils +from mistral_lib import actions as ml_actions FACTORY_METHOD_PATH = ( @@ -195,7 +195,7 @@ class SchedulerServiceTest(base.DbTestCase): def test_scheduler_with_serializer(self, factory): target_method = 'run_something' - task_result = wf_utils.Result('data', 'error') + task_result = ml_actions.Result('data', 'error') method_args = { 'name': 'task', @@ -230,7 +230,7 @@ class SchedulerServiceTest(base.DbTestCase): result = factory().run_something.call_args[1].get('result') - self.assertIsInstance(result, wf_utils.Result) + self.assertIsInstance(result, ml_actions.Result) self.assertEqual('data', result.data) self.assertEqual('error', result.error) diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index 02d006913..79d4bac45 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -14,65 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral import serialization -from mistral import utils +from mistral_lib.actions import types +# For backwards compatibility -class Result(serialization.MistralSerializable): - """Explicit data structure containing a result of task execution.""" - - def __init__(self, data=None, error=None, cancel=False): - self.data = data - self.error = error - self.cancel = cancel - - def __repr__(self): - return 'Result [data=%s, error=%s, cancel=%s]' % ( - repr(self.data), repr(self.error), str(self.cancel) - ) - - def cut_repr(self): - return 'Result [data=%s, error=%s, cancel=%s]' % ( - utils.cut(self.data), utils.cut(self.error), str(self.cancel) - ) - - def is_cancel(self): - return self.cancel - - def is_error(self): - return self.error is not None and not self.is_cancel() - - def is_success(self): - return not self.is_error() and not self.is_cancel() - - def __eq__(self, other): - return ( - self.data == other.data and - self.error == other.error and - self.cancel == other.cancel - ) - - def __ne__(self, other): - return not self.__eq__(other) - - def to_dict(self): - return ({'result': self.data} - if self.is_success() else {'result': self.error}) - - -class ResultSerializer(serialization.DictBasedSerializer): - def serialize_to_dict(self, entity): - return { - 'data': entity.data, - 'error': entity.error, - 'cancel': entity.cancel - } - - def deserialize_from_dict(self, entity_dict): - return Result( - entity_dict['data'], - entity_dict['error'], - entity_dict.get('cancel', False) - ) - -serialization.register_serializer(Result, ResultSerializer()) +Result = types.Result +ResultSerializer = types.ResultSerializer