Add more use of mistral-lib in mistral
This is a middle step to move serveral parts into mistral-lib Fixes a typo on the deprecation message Changes the last actions to use mistral-lib actions Convert the serialization to use mistral-lib serialization Use mistral-lib results as the standard. (the serialization mixin change is required for the results to work) Next steps are: change mistral-lib serialization to take care of all serialization change dependent libraries to use mistral-lib directly Change-Id: I4eacf5ce2e72916b700e8bc77ac9d95859131931
This commit is contained in:
parent
0c26db68e7
commit
e420357dcd
@ -16,7 +16,7 @@ import abc
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"mistral.actions.Action is deprecated as of the 5.0.0 release in favor of"
|
||||
"mistral.actions.Action is deprecated as of the 5.0.0 release in favor of "
|
||||
"mistral_lib. It will be removed in a future release.", DeprecationWarning
|
||||
)
|
||||
|
||||
|
@ -25,7 +25,6 @@ import time
|
||||
from mistral import exceptions as exc
|
||||
from mistral.utils import javascript
|
||||
from mistral.utils import ssh_utils
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -223,7 +222,7 @@ class HTTPAction(actions.Action):
|
||||
}
|
||||
|
||||
if resp.status_code not in range(200, 307):
|
||||
return wf_utils.Result(error=_result)
|
||||
return actions.Result(error=_result)
|
||||
|
||||
return _result
|
||||
|
||||
|
@ -29,7 +29,7 @@ from mistral.rpc import clients as rpc
|
||||
from mistral.utils import filter_utils
|
||||
from mistral.utils import rest_utils
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -169,13 +169,13 @@ class ActionExecutionsController(rest.RestController):
|
||||
output = action_ex.output
|
||||
|
||||
if action_ex.state == states.SUCCESS:
|
||||
result = wf_utils.Result(data=output)
|
||||
result = ml_actions.Result(data=output)
|
||||
elif action_ex.state == states.ERROR:
|
||||
if not output:
|
||||
output = 'Unknown error'
|
||||
result = wf_utils.Result(error=output)
|
||||
result = ml_actions.Result(error=output)
|
||||
elif action_ex.state == states.CANCELLED:
|
||||
result = wf_utils.Result(cancel=True)
|
||||
result = ml_actions.Result(cancel=True)
|
||||
else:
|
||||
raise exc.InvalidResultException(
|
||||
"Error. Expected one of %s, actual: %s" % (
|
||||
|
@ -33,7 +33,7 @@ from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -410,7 +410,7 @@ class AdHocAction(PythonAction):
|
||||
transformer = adhoc_action_spec.get_output()
|
||||
|
||||
if transformer is not None:
|
||||
result = wf_utils.Result(
|
||||
result = ml_actions.Result(
|
||||
data=expr.evaluate_recursively(transformer, result.data),
|
||||
error=result.error
|
||||
)
|
||||
|
@ -36,7 +36,7 @@ from mistral.workflow import commands
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import lookup_utils
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -448,21 +448,21 @@ def _send_result_to_parent_workflow(wf_ex_id):
|
||||
wf_output = wf_ex.output
|
||||
|
||||
if wf_ex.state == states.SUCCESS:
|
||||
result = wf_utils.Result(data=wf_output)
|
||||
result = ml_actions.Result(data=wf_output)
|
||||
elif wf_ex.state == states.ERROR:
|
||||
err_msg = (
|
||||
wf_ex.state_info or
|
||||
'Failed subworkflow [execution_id=%s]' % wf_ex.id
|
||||
)
|
||||
|
||||
result = wf_utils.Result(error=err_msg)
|
||||
result = ml_actions.Result(error=err_msg)
|
||||
elif wf_ex.state == states.CANCELLED:
|
||||
err_msg = (
|
||||
wf_ex.state_info or
|
||||
'Cancelled subworkflow [execution_id=%s]' % wf_ex.id
|
||||
)
|
||||
|
||||
result = wf_utils.Result(error=err_msg, cancel=True)
|
||||
result = ml_actions.Result(error=err_msg, cancel=True)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"Method _send_result_to_parent_workflow() must never be called"
|
||||
|
@ -15,10 +15,12 @@
|
||||
import abc
|
||||
import six
|
||||
|
||||
from mistral import serialization
|
||||
from mistral_lib.actions import types
|
||||
from stevedore import driver
|
||||
|
||||
|
||||
_EXECUTORS = {}
|
||||
serialization.register_serializer(types.Result, types.ResultSerializer())
|
||||
|
||||
|
||||
def cleanup():
|
||||
|
@ -24,7 +24,6 @@ from mistral import exceptions as exc
|
||||
from mistral.executors import base
|
||||
from mistral.rpc import clients as rpc
|
||||
from mistral.utils import inspect_utils as i_u
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -55,7 +54,7 @@ class DefaultExecutor(base.Executor):
|
||||
"""
|
||||
|
||||
def send_error_back(error_msg):
|
||||
error_result = wf_utils.Result(error=error_msg)
|
||||
error_result = mistral_lib.Result(error=error_msg)
|
||||
|
||||
if action_ex_id:
|
||||
self._engine_client.on_action_complete(
|
||||
@ -114,8 +113,8 @@ class DefaultExecutor(base.Executor):
|
||||
# Note: it's made for backwards compatibility with already
|
||||
# existing Mistral actions which don't return result as
|
||||
# instance of workflow.utils.Result.
|
||||
if not isinstance(result, wf_utils.Result):
|
||||
result = wf_utils.Result(data=result)
|
||||
if not isinstance(result, mistral_lib.Result):
|
||||
result = mistral_lib.Result(data=result)
|
||||
|
||||
except Exception as e:
|
||||
msg = (
|
||||
|
@ -12,87 +12,20 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
from mistral_lib import serialization as ml_serialization
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
||||
_SERIALIZER = None
|
||||
|
||||
|
||||
class Serializer(object):
|
||||
"""Base interface for entity serializers.
|
||||
|
||||
A particular serializer knows how to convert a certain object
|
||||
into a string and back from that string into an object whose
|
||||
state is equivalent to the initial object.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize(self, entity):
|
||||
"""Converts the given object into a string.
|
||||
|
||||
:param entity: A object to be serialized.
|
||||
:return String containing the state of the object in serialized form.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize(self, data_str):
|
||||
"""Converts the given string into an object.
|
||||
|
||||
:param data_str: String containing the state of the object in
|
||||
serialized form.
|
||||
:return: An object.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
# Backwards compatibility
|
||||
Serializer = ml_serialization.Serializer
|
||||
DictBasedSerializer = ml_serialization.DictBasedSerializer
|
||||
MistralSerializable = ml_serialization.MistralSerializable
|
||||
|
||||
|
||||
class DictBasedSerializer(Serializer):
|
||||
"""Dictionary-based serializer.
|
||||
|
||||
It slightly simplifies implementing custom serializers by introducing
|
||||
a contract based on dictionary. A serializer class extending this class
|
||||
just needs to implement conversion from object into dict and from dict
|
||||
to object. It doesn't need to convert into string and back as required
|
||||
bye the base serializer contract. Conversion into string is implemented
|
||||
once with regard to possible problems that may occur for collection and
|
||||
primitive types as circular dependencies, correct date format etc.
|
||||
"""
|
||||
|
||||
def serialize(self, entity):
|
||||
if entity is None:
|
||||
return None
|
||||
|
||||
entity_dict = self.serialize_to_dict(entity)
|
||||
|
||||
return jsonutils.dumps(
|
||||
jsonutils.to_primitive(entity_dict, convert_instances=True)
|
||||
)
|
||||
|
||||
def deserialize(self, data_str):
|
||||
if data_str is None:
|
||||
return None
|
||||
|
||||
entity_dict = jsonutils.loads(data_str)
|
||||
|
||||
return self.deserialize_from_dict(entity_dict)
|
||||
|
||||
@abc.abstractmethod
|
||||
def serialize_to_dict(self, entity):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def deserialize_from_dict(self, entity_dict):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class MistralSerializable(object):
|
||||
"""A mixin to generate a serialization key for a custom object."""
|
||||
|
||||
@classmethod
|
||||
def get_serialization_key(cls):
|
||||
return "%s.%s" % (cls.__module__, cls.__name__)
|
||||
# PolymorphicSerializer cannot be used from mistral-lib yet
|
||||
# as mistral-lib does not have the unregister method.
|
||||
# Once it does this will be removed also in favor of mistral-lib
|
||||
|
||||
|
||||
class PolymorphicSerializer(Serializer):
|
||||
|
@ -19,7 +19,7 @@ import requests
|
||||
|
||||
from mistral.actions import std_actions as std
|
||||
from mistral.tests.unit import base
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
URL = 'http://some_url'
|
||||
@ -109,7 +109,7 @@ class HTTPActionTest(base.BaseTest):
|
||||
|
||||
result = action.run(mock_ctx)
|
||||
|
||||
self.assertIsInstance(result, wf_utils.Result)
|
||||
self.assertIsInstance(result, ml_actions.Result)
|
||||
self.assertEqual(401, result.error['status'])
|
||||
|
||||
mocked_method.assert_called_with(
|
||||
|
@ -31,7 +31,7 @@ from mistral.rpc import clients as rpc_clients
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.utils import rest_utils
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
# This line is needed for correct initialization of messaging config.
|
||||
oslo_messaging.get_rpc_transport(cfg.CONF)
|
||||
@ -332,7 +332,7 @@ class TestActionExecutionsController(base.APITest):
|
||||
|
||||
f.assert_called_once_with(
|
||||
UPDATED_ACTION['id'],
|
||||
wf_utils.Result(data=ACTION_EX_DB.output)
|
||||
ml_actions.Result(data=ACTION_EX_DB.output)
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
|
||||
@ -349,7 +349,7 @@ class TestActionExecutionsController(base.APITest):
|
||||
|
||||
f.assert_called_once_with(
|
||||
ERROR_ACTION_WITH_OUTPUT['id'],
|
||||
wf_utils.Result(error=ERROR_ACTION_RES_WITH_OUTPUT)
|
||||
ml_actions.Result(error=ERROR_ACTION_RES_WITH_OUTPUT)
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
|
||||
@ -362,7 +362,7 @@ class TestActionExecutionsController(base.APITest):
|
||||
|
||||
f.assert_called_once_with(
|
||||
ERROR_ACTION_FOR_EMPTY_OUTPUT['id'],
|
||||
wf_utils.Result(error=DEFAULT_ERROR_OUTPUT)
|
||||
ml_actions.Result(error=DEFAULT_ERROR_OUTPUT)
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
|
||||
@ -378,7 +378,7 @@ class TestActionExecutionsController(base.APITest):
|
||||
|
||||
f.assert_called_once_with(
|
||||
ERROR_ACTION_FOR_EMPTY_OUTPUT['id'],
|
||||
wf_utils.Result(error=DEFAULT_ERROR_OUTPUT)
|
||||
ml_actions.Result(error=DEFAULT_ERROR_OUTPUT)
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'on_action_complete')
|
||||
@ -392,7 +392,7 @@ class TestActionExecutionsController(base.APITest):
|
||||
|
||||
on_action_complete_mock_func.assert_called_once_with(
|
||||
CANCELLED_ACTION['id'],
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
@mock.patch.object(
|
||||
|
@ -29,7 +29,7 @@ from mistral.services import workbooks as wb_service
|
||||
from mistral.tests.unit import base
|
||||
from mistral.tests.unit.engine import base as eng_test_base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -337,7 +337,7 @@ class DefaultEngineTest(base.DbTestCase):
|
||||
# Finish action of 'task1'.
|
||||
task1_action_ex = self.engine.on_action_complete(
|
||||
task1_action_ex.id,
|
||||
wf_utils.Result(data='Hey')
|
||||
ml_actions.Result(data='Hey')
|
||||
)
|
||||
|
||||
self.assertIsInstance(task1_action_ex, models.ActionExecution)
|
||||
@ -379,7 +379,7 @@ class DefaultEngineTest(base.DbTestCase):
|
||||
# Finish 'task2'.
|
||||
task2_action_ex = self.engine.on_action_complete(
|
||||
task2_action_ex.id,
|
||||
wf_utils.Result(data='Hi')
|
||||
ml_actions.Result(data='Hi')
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
|
@ -20,7 +20,7 @@ from mistral import exceptions as exc
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -352,7 +352,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
# Update async action execution result.
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
wf_utils.Result(data='foobar')
|
||||
ml_actions.Result(data='foobar')
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
@ -554,7 +554,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
# Update async action execution result.
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
wf_utils.Result(data='foobar')
|
||||
ml_actions.Result(data='foobar')
|
||||
)
|
||||
|
||||
# Assert that task1 is SUCCESS and workflow is ERROR.
|
||||
|
@ -21,7 +21,7 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
@ -88,7 +88,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
# Cancel action execution for task.
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_task_cancelled(wf1_t1_ex.id)
|
||||
@ -134,7 +134,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_exs[1].id,
|
||||
wf_utils.Result(data={'foo': 'bar'})
|
||||
ml_actions.Result(data={'foo': 'bar'})
|
||||
)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
@ -333,7 +333,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
# Mark async action execution complete.
|
||||
self.engine.on_action_complete(
|
||||
wf2_t1_action_exs[0].id,
|
||||
wf_utils.Result(data={'foo': 'bar'})
|
||||
ml_actions.Result(data={'foo': 'bar'})
|
||||
)
|
||||
|
||||
# Wait for the workflows to succeed.
|
||||
@ -457,7 +457,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
# Cancel action execution for task.
|
||||
self.engine.on_action_complete(
|
||||
wf2_t1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(wf2_ex.id)
|
||||
@ -507,7 +507,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
# Mark async action execution complete.
|
||||
self.engine.on_action_complete(
|
||||
wf2_t1_action_exs[1].id,
|
||||
wf_utils.Result(data={'foo': 'bar'})
|
||||
ml_actions.Result(data={'foo': 'bar'})
|
||||
)
|
||||
|
||||
# Wait for the workflows to succeed.
|
||||
@ -587,7 +587,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
for wf1_t1_action_ex in wf1_t1_action_exs:
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_ex.id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(wf1_ex.id)
|
||||
@ -633,7 +633,7 @@ class DirectWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
for i in range(3, 6):
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_exs[i].id,
|
||||
wf_utils.Result(data={'foo': 'bar'})
|
||||
ml_actions.Result(data={'foo': 'bar'})
|
||||
)
|
||||
|
||||
# Wait for the workflows to succeed.
|
||||
|
@ -14,13 +14,13 @@
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import base as actions_base
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -32,7 +32,7 @@ class InvalidUnicodeAction(actions_base.Action):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
def run(self, context):
|
||||
return b'\xf8'
|
||||
|
||||
def test(self):
|
||||
|
@ -14,15 +14,13 @@
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import base as actions_base
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
@ -63,8 +61,8 @@ class MyAction(actions_base.Action):
|
||||
self.success_result = success_result
|
||||
self.error_result = error_result
|
||||
|
||||
def run(self):
|
||||
return wf_utils.Result(
|
||||
def run(self, context):
|
||||
return actions_base.Result(
|
||||
data=self.success_result,
|
||||
error=self.error_result
|
||||
)
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral_lib.actions import base as actions_base
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
@ -22,8 +22,6 @@ from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
@ -70,9 +68,9 @@ class MyAction(actions_base.Action):
|
||||
result[i] = 'A'
|
||||
|
||||
if not self.error:
|
||||
return wf_utils.Result(data=result)
|
||||
return actions_base.Result(data=result)
|
||||
else:
|
||||
return wf_utils.Result(error=result)
|
||||
return actions_base.Result(error=result)
|
||||
|
||||
def test(self):
|
||||
raise NotImplementedError
|
||||
|
@ -17,12 +17,12 @@ from eventlet import semaphore
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
|
||||
from mistral.actions import base as action_base
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit import base as test_base
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -88,7 +88,7 @@ ACTION_SEMAPHORE = None
|
||||
TEST_SEMAPHORE = None
|
||||
|
||||
|
||||
class BlockingAction(action_base.Action):
|
||||
class BlockingAction(actions_base.Action):
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@ -100,7 +100,7 @@ class BlockingAction(action_base.Action):
|
||||
def wait_for_test():
|
||||
ACTION_SEMAPHORE.acquire()
|
||||
|
||||
def run(self):
|
||||
def run(self, context):
|
||||
self.unblock_test()
|
||||
self.wait_for_test()
|
||||
|
||||
|
@ -21,7 +21,7 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.services import workbooks as wb_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -86,7 +86,7 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
# Cancel action execution for task.
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(wf1_ex.id)
|
||||
@ -142,7 +142,7 @@ class ReverseWorkflowRerunCancelledTest(base.EngineTestCase):
|
||||
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_exs[1].id,
|
||||
wf_utils.Result(data={'foo': 'bar'})
|
||||
ml_actions.Result(data={'foo': 'bar'})
|
||||
)
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
|
@ -22,7 +22,7 @@ from mistral import exceptions as exc
|
||||
from mistral.services import actions
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
@ -188,7 +188,7 @@ class RunActionEngineTest(base.EngineTestCase):
|
||||
|
||||
@mock.patch.object(
|
||||
std_actions.AsyncNoOpAction, 'run',
|
||||
mock.MagicMock(return_value=wf_utils.Result(error='Invoke erred.')))
|
||||
mock.MagicMock(return_value=ml_actions.Result(error='Invoke erred.')))
|
||||
def test_run_action_async_invoke_with_error(self):
|
||||
action_ex = self.engine.start_action('std.async_noop', {})
|
||||
|
||||
@ -303,7 +303,7 @@ class RunActionEngineTest(base.EngineTestCase):
|
||||
'scope': 'public'
|
||||
})
|
||||
def_mock.return_value = action_def
|
||||
run_mock.return_value = wf_utils.Result(data='Hello')
|
||||
run_mock.return_value = ml_actions.Result(data='Hello')
|
||||
|
||||
class_ret = mock.MagicMock()
|
||||
class_mock.return_value = class_ret
|
||||
|
@ -22,7 +22,7 @@ from mistral.services import workbooks as wb_service
|
||||
from mistral.services import workflows as wf_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
class TaskCancelTest(base.EngineTestCase):
|
||||
@ -75,7 +75,7 @@ class TaskCancelTest(base.EngineTestCase):
|
||||
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(wf_ex.id)
|
||||
@ -171,7 +171,7 @@ class TaskCancelTest(base.EngineTestCase):
|
||||
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(subwf_ex.id)
|
||||
@ -242,7 +242,7 @@ class TaskCancelTest(base.EngineTestCase):
|
||||
|
||||
self.engine.on_action_complete(
|
||||
task_1_action_exs[0].id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_workflow_cancelled(wf_ex.id)
|
||||
@ -333,7 +333,7 @@ class TaskCancelTest(base.EngineTestCase):
|
||||
for wf1_t1_action_ex in wf1_t1_action_exs:
|
||||
self.engine.on_action_complete(
|
||||
wf1_t1_action_ex.id,
|
||||
wf_utils.Result(cancel=True)
|
||||
ml_actions.Result(cancel=True)
|
||||
)
|
||||
|
||||
self.await_task_cancelled(wf1_t1_ex.id)
|
||||
|
@ -16,7 +16,6 @@ import copy
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import base as action_base
|
||||
from mistral.actions import std_actions
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
@ -27,7 +26,7 @@ from mistral.tests.unit.engine import base
|
||||
from mistral import utils
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as actions_base
|
||||
|
||||
# TODO(nmakhotkin) Need to write more tests.
|
||||
|
||||
@ -152,11 +151,11 @@ WF_INPUT_ONE_ITEM = {
|
||||
}
|
||||
|
||||
|
||||
class RandomSleepEchoAction(action_base.Action):
|
||||
class RandomSleepEchoAction(actions_base.Action):
|
||||
def __init__(self, output):
|
||||
self.output = output
|
||||
|
||||
def run(self):
|
||||
def run(self, context):
|
||||
utils.random_sleep(1)
|
||||
|
||||
return self.output
|
||||
@ -399,11 +398,17 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
|
||||
act_exs = task_ex.executions
|
||||
|
||||
self.engine.on_action_complete(act_exs[0].id, wf_utils.Result("Ivan"))
|
||||
self.engine.on_action_complete(act_exs[1].id, wf_utils.Result("John"))
|
||||
self.engine.on_action_complete(
|
||||
act_exs[0].id,
|
||||
actions_base.Result("Ivan")
|
||||
)
|
||||
self.engine.on_action_complete(
|
||||
act_exs[1].id,
|
||||
actions_base.Result("John")
|
||||
)
|
||||
self.engine.on_action_complete(
|
||||
act_exs[2].id,
|
||||
wf_utils.Result("Mistral")
|
||||
actions_base.Result("Mistral")
|
||||
)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
@ -636,7 +641,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 1st iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("John")
|
||||
actions_base.Result("John")
|
||||
)
|
||||
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
@ -652,7 +657,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 2nd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("Ivan")
|
||||
actions_base.Result("Ivan")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
@ -666,7 +671,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 3rd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("Mistral")
|
||||
actions_base.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1))
|
||||
@ -800,7 +805,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 1st iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("John")
|
||||
actions_base.Result("John")
|
||||
)
|
||||
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
@ -818,7 +823,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 2nd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("Ivan")
|
||||
actions_base.Result("Ivan")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
@ -834,7 +839,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 3rd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("Mistral")
|
||||
actions_base.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
@ -849,7 +854,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 4th iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Hello")
|
||||
actions_base.Result("Hello")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1))
|
||||
@ -952,7 +957,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 1st iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
self._get_incomplete_action(task_ex).id,
|
||||
wf_utils.Result("John")
|
||||
actions_base.Result("John")
|
||||
)
|
||||
|
||||
# Wait till the delayed on_action_complete is processed.
|
||||
@ -969,7 +974,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 2nd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Ivan")
|
||||
actions_base.Result("Ivan")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) == 1)
|
||||
@ -984,7 +989,7 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
# 3rd iteration complete.
|
||||
self.engine.on_action_complete(
|
||||
incomplete_action.id,
|
||||
wf_utils.Result("Mistral")
|
||||
actions_base.Result("Mistral")
|
||||
)
|
||||
|
||||
self._await(lambda: len(db_api.get_delayed_calls()) in (0, 1))
|
||||
|
@ -22,7 +22,7 @@ from mistral.services import workbooks as wb_service
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
@ -365,7 +365,7 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
task_execution_id=task2_ex.id
|
||||
)[0]
|
||||
|
||||
self.engine.on_action_complete(task2_action_ex.id, utils.Result())
|
||||
self.engine.on_action_complete(task2_action_ex.id, ml_actions.Result())
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
|
@ -22,7 +22,7 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import scheduler
|
||||
from mistral.tests.unit import base
|
||||
from mistral.workflow import utils as wf_utils
|
||||
from mistral_lib import actions as ml_actions
|
||||
|
||||
|
||||
FACTORY_METHOD_PATH = (
|
||||
@ -195,7 +195,7 @@ class SchedulerServiceTest(base.DbTestCase):
|
||||
def test_scheduler_with_serializer(self, factory):
|
||||
target_method = 'run_something'
|
||||
|
||||
task_result = wf_utils.Result('data', 'error')
|
||||
task_result = ml_actions.Result('data', 'error')
|
||||
|
||||
method_args = {
|
||||
'name': 'task',
|
||||
@ -230,7 +230,7 @@ class SchedulerServiceTest(base.DbTestCase):
|
||||
|
||||
result = factory().run_something.call_args[1].get('result')
|
||||
|
||||
self.assertIsInstance(result, wf_utils.Result)
|
||||
self.assertIsInstance(result, ml_actions.Result)
|
||||
self.assertEqual('data', result.data)
|
||||
self.assertEqual('error', result.error)
|
||||
|
||||
|
@ -14,65 +14,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral import serialization
|
||||
from mistral import utils
|
||||
from mistral_lib.actions import types
|
||||
|
||||
# For backwards compatibility
|
||||
|
||||
class Result(serialization.MistralSerializable):
|
||||
"""Explicit data structure containing a result of task execution."""
|
||||
|
||||
def __init__(self, data=None, error=None, cancel=False):
|
||||
self.data = data
|
||||
self.error = error
|
||||
self.cancel = cancel
|
||||
|
||||
def __repr__(self):
|
||||
return 'Result [data=%s, error=%s, cancel=%s]' % (
|
||||
repr(self.data), repr(self.error), str(self.cancel)
|
||||
)
|
||||
|
||||
def cut_repr(self):
|
||||
return 'Result [data=%s, error=%s, cancel=%s]' % (
|
||||
utils.cut(self.data), utils.cut(self.error), str(self.cancel)
|
||||
)
|
||||
|
||||
def is_cancel(self):
|
||||
return self.cancel
|
||||
|
||||
def is_error(self):
|
||||
return self.error is not None and not self.is_cancel()
|
||||
|
||||
def is_success(self):
|
||||
return not self.is_error() and not self.is_cancel()
|
||||
|
||||
def __eq__(self, other):
|
||||
return (
|
||||
self.data == other.data and
|
||||
self.error == other.error and
|
||||
self.cancel == other.cancel
|
||||
)
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def to_dict(self):
|
||||
return ({'result': self.data}
|
||||
if self.is_success() else {'result': self.error})
|
||||
|
||||
|
||||
class ResultSerializer(serialization.DictBasedSerializer):
|
||||
def serialize_to_dict(self, entity):
|
||||
return {
|
||||
'data': entity.data,
|
||||
'error': entity.error,
|
||||
'cancel': entity.cancel
|
||||
}
|
||||
|
||||
def deserialize_from_dict(self, entity_dict):
|
||||
return Result(
|
||||
entity_dict['data'],
|
||||
entity_dict['error'],
|
||||
entity_dict.get('cancel', False)
|
||||
)
|
||||
|
||||
serialization.register_serializer(Result, ResultSerializer())
|
||||
Result = types.Result
|
||||
ResultSerializer = types.ResultSerializer
|
||||
|
Loading…
x
Reference in New Issue
Block a user