From 4e926a1f136a54ddf83e9eae43c33d9636d48aaf Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 7 Jun 2019 00:30:20 +0300 Subject: [PATCH] Fail-on policy Fail-on policy allows to fail success tasks by condition. It is useful in cases we have to fail task if its result is unacceptable and it makes workflow definition more readable. Change-Id: I57b4f3d1533982d3b9b7063925f8d70f044aefea Implements: blueprint fail-on-policy Signed-off-by: Oleg Ovcharuk --- doc/source/main_features.rst | 6 + doc/source/user/wf_lang_v2.rst | 9 ++ doc/source/user/wf_namespaces.rst | 8 +- mistral/engine/policies.py | 35 +++++ mistral/lang/v2/policies.py | 6 + mistral/lang/v2/task_defaults.py | 4 +- mistral/lang/v2/tasks.py | 4 +- mistral/tests/unit/engine/test_policies.py | 151 +++++++++++++++++++++ requirements.txt | 2 +- 9 files changed, 218 insertions(+), 7 deletions(-) diff --git a/doc/source/main_features.rst b/doc/source/main_features.rst index 2f21ff3e4..7f15d89fe 100644 --- a/doc/source/main_features.rst +++ b/doc/source/main_features.rst @@ -86,6 +86,7 @@ YAML example pause-before: true wait-before: 2 wait-after: 4 + fail-on: <% $.some_value < 4 %> timeout: 30 retry: count: 10 @@ -110,6 +111,11 @@ There are different types of policies in Mistral. has completed before starting the tasks specified in *'on-success'*, *'on-error'* or *'on-complete'*. +4. **fail-on** + + Specifies a condition under which the task will fail, even if + the action was completed successfully. + 4. **timeout** Specifies a period of time in seconds after which a task will be failed diff --git a/doc/source/user/wf_lang_v2.rst b/doc/source/user/wf_lang_v2.rst index 2603138aa..890f3084a 100644 --- a/doc/source/user/wf_lang_v2.rst +++ b/doc/source/user/wf_lang_v2.rst @@ -162,6 +162,7 @@ Common workflow attributes - **pause-before** - Configures pause-before policy. *Optional*. - **wait-before** - Configures wait-before policy. *Optional*. - **wait-after** - Configures wait-after policy. *Optional*. + - **fail-on** - Configures fail-on policy. *Optional*. - **timeout** - Configures timeout policy. *Optional*. - **retry** - Configures retry policy. *Optional*. - **concurrency** - Configures concurrency policy. *Optional*. @@ -270,6 +271,7 @@ attributes: - **pause-before** - Configures pause-before policy. *Optional*. - **wait-before** - Configures wait-before policy. *Optional*. - **wait-after** - Configures wait-after policy. *Optional*. +- **fail-on** - Configures fail-on policy. *Optional*. - **timeout** - Configures timeout policy. *Optional*. - **retry** - Configures retry policy. *Optional*. - **concurrency** - Configures concurrency policy. *Optional*. @@ -355,6 +357,7 @@ YAML example   pause-before: true   wait-before: 2   wait-after: 4 +   fail-on: <% $.some_value < 4 %>   timeout: 30   retry:     count: 10 @@ -381,6 +384,12 @@ has completed before starting next tasks defined in *on-success*, *on-error* or *on-complete*. +**fail-on** + +Defines a condition under which the task will fail, even if +the action was completed successfully. + + **timeout** Defines a period of time in seconds after which a task will be failed diff --git a/doc/source/user/wf_namespaces.rst b/doc/source/user/wf_namespaces.rst index 49abd697b..cf8d3680c 100644 --- a/doc/source/user/wf_namespaces.rst +++ b/doc/source/user/wf_namespaces.rst @@ -131,7 +131,7 @@ definitions: So the call chain looks like this: - .. code-block:: + .. code-block:: console wf1 -> wf2 -> wf3 @@ -154,7 +154,7 @@ these namespaces: And we create a workflow execution like this via API: - .. code-block:: + .. code-block:: console POST /v2/executions @@ -175,7 +175,7 @@ In this case, Mistral will: However, if we launch a workflow like this: - .. code-block:: + .. code-block:: console POST /v2/executions @@ -186,7 +186,7 @@ However, if we launch a workflow like this: We'll get the call chain - .. code-block:: + .. code-block:: console wf2 -> wf3 diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 4682f4d31..5722839b7 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -56,6 +56,7 @@ def get_policy_factories(): build_pause_before_policy, build_wait_before_policy, build_wait_after_policy, + build_fail_on_policy, build_retry_policy, build_timeout_policy, build_concurrency_policy @@ -150,6 +151,16 @@ def build_concurrency_policy(policies_spec): if concurrency_policy else None) +def build_fail_on_policy(policies_spec): + if not policies_spec: + return None + + fail_on_policy = policies_spec.get_fail_on() + + return (FailOnPolicy(fail_on_policy) + if fail_on_policy else None) + + def _ensure_context_has_key(runtime_context, key): if not runtime_context: runtime_context = {} @@ -542,6 +553,30 @@ class ConcurrencyPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context +class FailOnPolicy(base.TaskPolicy): + _schema = { + "properties": { + "fail-on": {"type": "boolean"}, + } + } + + def __init__(self, fail_on): + self.fail_on = fail_on + + def before_task_start(self, task_ex, task_spec): + pass + + def after_task_complete(self, task_ex, task_spec): + if task_ex.state != states.SUCCESS: + return + + super(FailOnPolicy, self).after_task_complete(task_ex, task_spec) + + if self.fail_on: + task_ex.state = states.ERROR + task_ex.state_info = 'Failed by fail-on policy' + + @db_utils.retry_on_db_error @post_tx_queue.run def _continue_task(task_ex_id): diff --git a/mistral/lang/v2/policies.py b/mistral/lang/v2/policies.py index a9a4c9c9a..0a13447cb 100644 --- a/mistral/lang/v2/policies.py +++ b/mistral/lang/v2/policies.py @@ -29,6 +29,7 @@ class PoliciesSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN }, "additionalProperties": False } @@ -46,6 +47,7 @@ class PoliciesSpec(base.BaseSpec): self._timeout = data.get('timeout', 0) self._pause_before = data.get('pause-before', False) self._concurrency = data.get('concurrency', 0) + self._fail_on = data.get('fail-on', False) def validate_schema(self): super(PoliciesSpec, self).validate_schema() @@ -56,6 +58,7 @@ class PoliciesSpec(base.BaseSpec): self.validate_expr(self._data.get('timeout', 0)) self.validate_expr(self._data.get('pause-before', False)) self.validate_expr(self._data.get('concurrency', 0)) + self.validate_expr(self._data.get('fail-on', False)) def get_retry(self): return self._retry @@ -74,3 +77,6 @@ class PoliciesSpec(base.BaseSpec): def get_concurrency(self): return self._concurrency + + def get_fail_on(self): + return self._fail_on diff --git a/mistral/lang/v2/task_defaults.py b/mistral/lang/v2/task_defaults.py index 7c4cc3b19..8d6d315e1 100644 --- a/mistral/lang/v2/task_defaults.py +++ b/mistral/lang/v2/task_defaults.py @@ -38,6 +38,7 @@ class TaskDefaultsSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN, "on-complete": on_clause.OnClauseSpec.get_schema(), "on-success": on_clause.OnClauseSpec.get_schema(), "on-error": on_clause.OnClauseSpec.get_schema(), @@ -63,7 +64,8 @@ class TaskDefaultsSpec(base.BaseSpec): 'wait-after', 'timeout', 'pause-before', - 'concurrency' + 'concurrency', + 'fail-on' ) on_spec_cls = on_clause.OnClauseSpec diff --git a/mistral/lang/v2/tasks.py b/mistral/lang/v2/tasks.py index 156371436..54d7d2b6e 100644 --- a/mistral/lang/v2/tasks.py +++ b/mistral/lang/v2/tasks.py @@ -75,6 +75,7 @@ class TaskSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN, "target": types.NONEMPTY_STRING, "keep-result": types.EXPRESSION_OR_BOOLEAN, "safe-rerun": types.EXPRESSION_OR_BOOLEAN @@ -120,7 +121,8 @@ class TaskSpec(base.BaseSpec): 'wait-after', 'timeout', 'pause-before', - 'concurrency' + 'concurrency', + 'fail-on' ) self._target = data.get('target') self._keep_result = data.get('keep-result', True) diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index af0c02bf7..0020803d3 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -1874,3 +1874,154 @@ class PoliciesTest(base.EngineTestCase): {}, fail_task_ex.runtime_context["retry_task_policy"] ) + + def test_fail_on_true_condition(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output=4 + fail-on: <% task(task1).result <= 4 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.ERROR, "Check task state") + + def test_fail_on_false_condition(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output=4 + fail-on: <% task(task1).result != 4 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + + def test_fail_on_true_condition_task_defaults(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + task-defaults: + fail-on: <% task().result <= 4 %> + tasks: + task1: + action: std.echo output=4 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.ERROR, "Check task state") + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.Mock(side_effect=[1, 2, 3, 4]) + ) + def test_fail_on_with_retry(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output="mocked" + fail-on: <% task(task1).result <= 2 %> + retry: + count: 3 + delay: 0 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + self.assertEqual( + 2, + task_ex.runtime_context['retry_task_policy']['retry_no'] + ) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.Mock(side_effect=[1, 2, 3, 4]) + ) + def test_fail_on_with_retry_and_with_items(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + with-items: x in [1, 2] + action: std.echo output="mocked" + fail-on: <% not task(task1).result.contains(4) %> + retry: + count: 3 + delay: 0 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + self.assertEqual( + 1, + task_ex.runtime_context['retry_task_policy']['retry_no'] + ) diff --git a/requirements.txt b/requirements.txt index 1e1accf4e..8a2f089f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 pecan>=1.2.1 # BSD python-barbicanclient>=4.5.2 # Apache-2.0 python-cinderclient!=4.0.0,>=3.3.0 # Apache-2.0 +python-zaqarclient>=1.0.0 # Apache-2.0 python-designateclient>=2.7.0 # Apache-2.0 python-glanceclient>=2.8.0 # Apache-2.0 python-glareclient>=0.3.0 # Apache-2.0 @@ -52,7 +53,6 @@ python-troveclient>=2.2.0 # Apache-2.0 python-ironicclient!=2.7.1,>=2.7.0 # Apache-2.0 python-ironic-inspector-client>=1.5.0 # Apache-2.0 python-vitrageclient>=2.0.0 # Apache-2.0 -python-zaqarclient>=1.0.0 # Apache-2.0 python-zunclient>=1.0.0 # Apache-2.0 python-qinlingclient>=1.0.0 # Apache-2.0 PyJWT>=1.0.1 # MIT