diff --git a/doc/source/main_features.rst b/doc/source/main_features.rst index 2f21ff3e4..7f15d89fe 100644 --- a/doc/source/main_features.rst +++ b/doc/source/main_features.rst @@ -86,6 +86,7 @@ YAML example pause-before: true wait-before: 2 wait-after: 4 + fail-on: <% $.some_value < 4 %> timeout: 30 retry: count: 10 @@ -110,6 +111,11 @@ There are different types of policies in Mistral. has completed before starting the tasks specified in *'on-success'*, *'on-error'* or *'on-complete'*. +4. **fail-on** + + Specifies a condition under which the task will fail, even if + the action was completed successfully. + 4. **timeout** Specifies a period of time in seconds after which a task will be failed diff --git a/doc/source/user/wf_lang_v2.rst b/doc/source/user/wf_lang_v2.rst index 2603138aa..890f3084a 100644 --- a/doc/source/user/wf_lang_v2.rst +++ b/doc/source/user/wf_lang_v2.rst @@ -162,6 +162,7 @@ Common workflow attributes - **pause-before** - Configures pause-before policy. *Optional*. - **wait-before** - Configures wait-before policy. *Optional*. - **wait-after** - Configures wait-after policy. *Optional*. + - **fail-on** - Configures fail-on policy. *Optional*. - **timeout** - Configures timeout policy. *Optional*. - **retry** - Configures retry policy. *Optional*. - **concurrency** - Configures concurrency policy. *Optional*. @@ -270,6 +271,7 @@ attributes: - **pause-before** - Configures pause-before policy. *Optional*. - **wait-before** - Configures wait-before policy. *Optional*. - **wait-after** - Configures wait-after policy. *Optional*. +- **fail-on** - Configures fail-on policy. *Optional*. - **timeout** - Configures timeout policy. *Optional*. - **retry** - Configures retry policy. *Optional*. - **concurrency** - Configures concurrency policy. *Optional*. @@ -355,6 +357,7 @@ YAML example   pause-before: true   wait-before: 2   wait-after: 4 +   fail-on: <% $.some_value < 4 %>   timeout: 30   retry:     count: 10 @@ -381,6 +384,12 @@ has completed before starting next tasks defined in *on-success*, *on-error* or *on-complete*. +**fail-on** + +Defines a condition under which the task will fail, even if +the action was completed successfully. + + **timeout** Defines a period of time in seconds after which a task will be failed diff --git a/doc/source/user/wf_namespaces.rst b/doc/source/user/wf_namespaces.rst index 49abd697b..cf8d3680c 100644 --- a/doc/source/user/wf_namespaces.rst +++ b/doc/source/user/wf_namespaces.rst @@ -131,7 +131,7 @@ definitions: So the call chain looks like this: - .. code-block:: + .. code-block:: console wf1 -> wf2 -> wf3 @@ -154,7 +154,7 @@ these namespaces: And we create a workflow execution like this via API: - .. code-block:: + .. code-block:: console POST /v2/executions @@ -175,7 +175,7 @@ In this case, Mistral will: However, if we launch a workflow like this: - .. code-block:: + .. code-block:: console POST /v2/executions @@ -186,7 +186,7 @@ However, if we launch a workflow like this: We'll get the call chain - .. code-block:: + .. code-block:: console wf2 -> wf3 diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 4682f4d31..5722839b7 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -56,6 +56,7 @@ def get_policy_factories(): build_pause_before_policy, build_wait_before_policy, build_wait_after_policy, + build_fail_on_policy, build_retry_policy, build_timeout_policy, build_concurrency_policy @@ -150,6 +151,16 @@ def build_concurrency_policy(policies_spec): if concurrency_policy else None) +def build_fail_on_policy(policies_spec): + if not policies_spec: + return None + + fail_on_policy = policies_spec.get_fail_on() + + return (FailOnPolicy(fail_on_policy) + if fail_on_policy else None) + + def _ensure_context_has_key(runtime_context, key): if not runtime_context: runtime_context = {} @@ -542,6 +553,30 @@ class ConcurrencyPolicy(base.TaskPolicy): task_ex.runtime_context = runtime_context +class FailOnPolicy(base.TaskPolicy): + _schema = { + "properties": { + "fail-on": {"type": "boolean"}, + } + } + + def __init__(self, fail_on): + self.fail_on = fail_on + + def before_task_start(self, task_ex, task_spec): + pass + + def after_task_complete(self, task_ex, task_spec): + if task_ex.state != states.SUCCESS: + return + + super(FailOnPolicy, self).after_task_complete(task_ex, task_spec) + + if self.fail_on: + task_ex.state = states.ERROR + task_ex.state_info = 'Failed by fail-on policy' + + @db_utils.retry_on_db_error @post_tx_queue.run def _continue_task(task_ex_id): diff --git a/mistral/lang/v2/policies.py b/mistral/lang/v2/policies.py index a9a4c9c9a..0a13447cb 100644 --- a/mistral/lang/v2/policies.py +++ b/mistral/lang/v2/policies.py @@ -29,6 +29,7 @@ class PoliciesSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN }, "additionalProperties": False } @@ -46,6 +47,7 @@ class PoliciesSpec(base.BaseSpec): self._timeout = data.get('timeout', 0) self._pause_before = data.get('pause-before', False) self._concurrency = data.get('concurrency', 0) + self._fail_on = data.get('fail-on', False) def validate_schema(self): super(PoliciesSpec, self).validate_schema() @@ -56,6 +58,7 @@ class PoliciesSpec(base.BaseSpec): self.validate_expr(self._data.get('timeout', 0)) self.validate_expr(self._data.get('pause-before', False)) self.validate_expr(self._data.get('concurrency', 0)) + self.validate_expr(self._data.get('fail-on', False)) def get_retry(self): return self._retry @@ -74,3 +77,6 @@ class PoliciesSpec(base.BaseSpec): def get_concurrency(self): return self._concurrency + + def get_fail_on(self): + return self._fail_on diff --git a/mistral/lang/v2/task_defaults.py b/mistral/lang/v2/task_defaults.py index 7c4cc3b19..8d6d315e1 100644 --- a/mistral/lang/v2/task_defaults.py +++ b/mistral/lang/v2/task_defaults.py @@ -38,6 +38,7 @@ class TaskDefaultsSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN, "on-complete": on_clause.OnClauseSpec.get_schema(), "on-success": on_clause.OnClauseSpec.get_schema(), "on-error": on_clause.OnClauseSpec.get_schema(), @@ -63,7 +64,8 @@ class TaskDefaultsSpec(base.BaseSpec): 'wait-after', 'timeout', 'pause-before', - 'concurrency' + 'concurrency', + 'fail-on' ) on_spec_cls = on_clause.OnClauseSpec diff --git a/mistral/lang/v2/tasks.py b/mistral/lang/v2/tasks.py index 156371436..54d7d2b6e 100644 --- a/mistral/lang/v2/tasks.py +++ b/mistral/lang/v2/tasks.py @@ -75,6 +75,7 @@ class TaskSpec(base.BaseSpec): "timeout": types.EXPRESSION_OR_POSITIVE_INTEGER, "pause-before": types.EXPRESSION_OR_BOOLEAN, "concurrency": types.EXPRESSION_OR_POSITIVE_INTEGER, + "fail-on": types.EXPRESSION_OR_BOOLEAN, "target": types.NONEMPTY_STRING, "keep-result": types.EXPRESSION_OR_BOOLEAN, "safe-rerun": types.EXPRESSION_OR_BOOLEAN @@ -120,7 +121,8 @@ class TaskSpec(base.BaseSpec): 'wait-after', 'timeout', 'pause-before', - 'concurrency' + 'concurrency', + 'fail-on' ) self._target = data.get('target') self._keep_result = data.get('keep-result', True) diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index af0c02bf7..0020803d3 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -1874,3 +1874,154 @@ class PoliciesTest(base.EngineTestCase): {}, fail_task_ex.runtime_context["retry_task_policy"] ) + + def test_fail_on_true_condition(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output=4 + fail-on: <% task(task1).result <= 4 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.ERROR, "Check task state") + + def test_fail_on_false_condition(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output=4 + fail-on: <% task(task1).result != 4 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + + def test_fail_on_true_condition_task_defaults(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + task-defaults: + fail-on: <% task().result <= 4 %> + tasks: + task1: + action: std.echo output=4 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.ERROR, "Check task state") + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.Mock(side_effect=[1, 2, 3, 4]) + ) + def test_fail_on_with_retry(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output="mocked" + fail-on: <% task(task1).result <= 2 %> + retry: + count: 3 + delay: 0 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + self.assertEqual( + 2, + task_ex.runtime_context['retry_task_policy']['retry_no'] + ) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.Mock(side_effect=[1, 2, 3, 4]) + ) + def test_fail_on_with_retry_and_with_items(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + with-items: x in [1, 2] + action: std.echo output="mocked" + fail-on: <% not task(task1).result.contains(4) %> + retry: + count: 3 + delay: 0 + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual(task_ex.state, states.SUCCESS, "Check task state") + self.assertEqual( + 1, + task_ex.runtime_context['retry_task_policy']['retry_no'] + ) diff --git a/requirements.txt b/requirements.txt index 1e1accf4e..8a2f089f0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0 pecan>=1.2.1 # BSD python-barbicanclient>=4.5.2 # Apache-2.0 python-cinderclient!=4.0.0,>=3.3.0 # Apache-2.0 +python-zaqarclient>=1.0.0 # Apache-2.0 python-designateclient>=2.7.0 # Apache-2.0 python-glanceclient>=2.8.0 # Apache-2.0 python-glareclient>=0.3.0 # Apache-2.0 @@ -52,7 +53,6 @@ python-troveclient>=2.2.0 # Apache-2.0 python-ironicclient!=2.7.1,>=2.7.0 # Apache-2.0 python-ironic-inspector-client>=1.5.0 # Apache-2.0 python-vitrageclient>=2.0.0 # Apache-2.0 -python-zaqarclient>=1.0.0 # Apache-2.0 python-zunclient>=1.0.0 # Apache-2.0 python-qinlingclient>=1.0.0 # Apache-2.0 PyJWT>=1.0.1 # MIT