
Depends-on: https://review.opendev.org/#/c/703296/ Depends-On: https://review.opendev.org/#/c/704280/ Change-Id: Id62fdabe7699e7c3b2977166e253cfc77779e467
997 lines
33 KiB
Python
997 lines
33 KiB
Python
# Copyright 2013 - Mirantis, Inc.
|
|
# Copyright 2015 - StackStorm, Inc.
|
|
# Copyright 2015 Huawei Technologies Co., Ltd.
|
|
# Copyright 2016 - Brocade Communications Systems, Inc.
|
|
# Copyright 2018 - Extreme Networks, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import copy
|
|
import datetime
|
|
import json
|
|
|
|
import mock
|
|
from oslo_config import cfg
|
|
import oslo_messaging
|
|
from oslo_utils import uuidutils
|
|
import sqlalchemy as sa
|
|
from webtest import app as webtest_app
|
|
|
|
from mistral.api.controllers.v2 import execution
|
|
from mistral.api.controllers.v2 import resources
|
|
from mistral.db.v2 import api as db_api
|
|
from mistral.db.v2.sqlalchemy import api as sql_db_api
|
|
from mistral.db.v2.sqlalchemy import models
|
|
from mistral import exceptions as exc
|
|
from mistral.rpc import base as rpc_base
|
|
from mistral.rpc import clients as rpc_clients
|
|
from mistral.tests.unit.api import base
|
|
from mistral.tests.unit import base as unit_base
|
|
from mistral.utils import rest_utils
|
|
from mistral.workflow import states
|
|
from mistral_lib import utils
|
|
|
|
# This line is needed for correct initialization of messaging config.
|
|
oslo_messaging.get_rpc_transport(cfg.CONF)
|
|
|
|
|
|
WF_EX = models.WorkflowExecution(
|
|
id='123e4567-e89b-12d3-a456-426655440000',
|
|
workflow_name='some',
|
|
workflow_id='123e4567-e89b-12d3-a456-426655441111',
|
|
description='execution description.',
|
|
spec={'name': 'some'},
|
|
state=states.RUNNING,
|
|
state_info=None,
|
|
context={},
|
|
input={'foo': 'bar'},
|
|
output={},
|
|
params={'env': {'k1': 'abc'}},
|
|
created_at=datetime.datetime(1970, 1, 1),
|
|
updated_at=datetime.datetime(1970, 1, 1)
|
|
)
|
|
|
|
WF_EX_JSON = {
|
|
'id': '123e4567-e89b-12d3-a456-426655440000',
|
|
'input': '{"foo": "bar"}',
|
|
'output': '{}',
|
|
'params': '{"env": {"k1": "abc"}}',
|
|
'state': 'RUNNING',
|
|
'state_info': None,
|
|
'created_at': '1970-01-01 00:00:00',
|
|
'updated_at': '1970-01-01 00:00:00',
|
|
'workflow_name': 'some',
|
|
'workflow_id': '123e4567-e89b-12d3-a456-426655441111'
|
|
}
|
|
|
|
SUB_WF_EX = models.WorkflowExecution(
|
|
id=uuidutils.generate_uuid(),
|
|
workflow_name='some',
|
|
workflow_id='123e4567-e89b-12d3-a456-426655441111',
|
|
description='foobar',
|
|
spec={'name': 'some'},
|
|
state=states.RUNNING,
|
|
state_info=None,
|
|
context={},
|
|
input={'foo': 'bar'},
|
|
output={},
|
|
params={'env': {'k1': 'abc'}},
|
|
created_at=datetime.datetime(1970, 1, 1),
|
|
updated_at=datetime.datetime(1970, 1, 1),
|
|
task_execution_id=uuidutils.generate_uuid()
|
|
)
|
|
|
|
SUB_WF_EX_JSON = {
|
|
'id': SUB_WF_EX.id,
|
|
'workflow_name': 'some',
|
|
'workflow_id': '123e4567-e89b-12d3-a456-426655441111',
|
|
'input': '{"foo": "bar"}',
|
|
'output': '{}',
|
|
'params': '{"env": {"k1": "abc"}}',
|
|
'state': 'RUNNING',
|
|
'state_info': None,
|
|
'created_at': '1970-01-01 00:00:00',
|
|
'updated_at': '1970-01-01 00:00:00',
|
|
'task_execution_id': SUB_WF_EX.task_execution_id
|
|
}
|
|
|
|
MOCK_SUB_WF_EXECUTIONS = mock.MagicMock(return_value=[SUB_WF_EX])
|
|
|
|
SUB_WF_EX_JSON_WITH_DESC = copy.deepcopy(SUB_WF_EX_JSON)
|
|
SUB_WF_EX_JSON_WITH_DESC['description'] = SUB_WF_EX.description
|
|
|
|
|
|
UPDATED_WF_EX = copy.deepcopy(WF_EX)
|
|
UPDATED_WF_EX['state'] = states.PAUSED
|
|
|
|
UPDATED_WF_EX_JSON = copy.deepcopy(WF_EX_JSON)
|
|
UPDATED_WF_EX_JSON['state'] = states.PAUSED
|
|
|
|
UPDATED_WF_EX_ENV = copy.deepcopy(UPDATED_WF_EX)
|
|
UPDATED_WF_EX_ENV['params'] = {'env': {'k1': 'def'}}
|
|
|
|
UPDATED_WF_EX_ENV_DESC = copy.deepcopy(UPDATED_WF_EX)
|
|
UPDATED_WF_EX_ENV_DESC['description'] = 'foobar'
|
|
UPDATED_WF_EX_ENV_DESC['params'] = {'env': {'k1': 'def'}}
|
|
|
|
WF_EX_JSON_WITH_DESC = copy.deepcopy(WF_EX_JSON)
|
|
WF_EX_JSON_WITH_DESC['description'] = WF_EX.description
|
|
WF_EX_WITH_PROJECT_ID = WF_EX.get_clone()
|
|
WF_EX_WITH_PROJECT_ID.project_id = '<default-project>'
|
|
|
|
SOURCE_WF_EX = copy.deepcopy(WF_EX)
|
|
SOURCE_WF_EX['source_execution_id'] = WF_EX.id
|
|
SOURCE_WF_EX['id'] = uuidutils.generate_uuid()
|
|
SOURCE_WF_EX_JSON_WITH_DESC = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
SOURCE_WF_EX_JSON_WITH_DESC['id'] = SOURCE_WF_EX.id
|
|
SOURCE_WF_EX_JSON_WITH_DESC['source_execution_id'] = \
|
|
SOURCE_WF_EX.source_execution_id
|
|
|
|
MOCK_WF_EX = mock.MagicMock(return_value=WF_EX)
|
|
MOCK_SUB_WF_EX = mock.MagicMock(return_value=SUB_WF_EX)
|
|
MOCK_SOURCE_WF_EX = mock.MagicMock(return_value=SOURCE_WF_EX)
|
|
MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX])
|
|
MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX)
|
|
MOCK_DELETE = mock.MagicMock(return_value=None)
|
|
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
|
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
|
|
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
|
|
|
|
ERROR_WF_EX = copy.deepcopy(WF_EX)
|
|
ERROR_WF_EX['state'] = states.ERROR
|
|
MOCK_ERROR_WF_EX = mock.MagicMock(return_value=ERROR_WF_EX)
|
|
|
|
SUCCESS_WF_EX = copy.deepcopy(WF_EX)
|
|
SUCCESS_WF_EX['state'] = states.SUCCESS
|
|
MOCK_SUCCESS_WF_EX = mock.MagicMock(return_value=SUCCESS_WF_EX)
|
|
|
|
|
|
@mock.patch.object(rpc_base, '_IMPL_CLIENT', mock.Mock())
|
|
class TestExecutionsController(base.APITest):
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
|
def test_get(self):
|
|
resp = self.app.get('/v2/executions/123')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
expected = WF_EX_JSON_WITH_DESC.copy()
|
|
expected['published_global'] = '{}'
|
|
|
|
self.assertDictEqual(expected, resp.json)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution')
|
|
def test_get_operational_error(self, mocked_get):
|
|
mocked_get.side_effect = [
|
|
# Emulating DB OperationalError
|
|
sa.exc.OperationalError('Mock', 'mock', 'mock'),
|
|
WF_EX # Successful run
|
|
]
|
|
|
|
resp = self.app.get('/v2/executions/123')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
expected = WF_EX_JSON_WITH_DESC.copy()
|
|
expected['published_global'] = '{}'
|
|
|
|
self.assertDictEqual(expected, resp.json)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_SUB_WF_EX)
|
|
def test_get_sub_wf_ex(self):
|
|
resp = self.app.get('/v2/executions/123')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
expected = SUB_WF_EX_JSON_WITH_DESC.copy()
|
|
expected['published_global'] = '{}'
|
|
|
|
self.assertDictEqual(expected, resp.json)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_NOT_FOUND)
|
|
def test_get_not_found(self):
|
|
resp = self.app.get('/v2/executions/123', expect_errors=True)
|
|
|
|
self.assertEqual(404, resp.status_int)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution',
|
|
return_value=WF_EX_WITH_PROJECT_ID)
|
|
def test_get_within_project_id(self, mock_get):
|
|
resp = self.app.get('/v2/executions/123', expect_errors=True)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertTrue('project_id' in resp.json)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
@mock.patch.object(
|
|
rpc_clients.EngineClient,
|
|
'pause_workflow',
|
|
MOCK_UPDATED_WF_EX
|
|
)
|
|
def test_put_state_paused(self):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.PAUSED
|
|
}
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
expected_exec['state'] = states.PAUSED
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertDictEqual(expected_exec, resp.json)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
|
|
def test_put_state_error(self, mock_stop_wf):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.ERROR,
|
|
'state_info': 'Force'
|
|
}
|
|
|
|
wf_ex = copy.deepcopy(WF_EX)
|
|
wf_ex['state'] = states.ERROR
|
|
wf_ex['state_info'] = 'Force'
|
|
mock_stop_wf.return_value = wf_ex
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
expected_exec['state'] = states.ERROR
|
|
expected_exec['state_info'] = 'Force'
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertDictEqual(expected_exec, resp.json)
|
|
mock_stop_wf.assert_called_once_with('123', 'ERROR', 'Force')
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
|
|
def test_put_state_cancelled(self, mock_stop_wf):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.CANCELLED,
|
|
'state_info': 'Cancelled by user.'
|
|
}
|
|
|
|
wf_ex = copy.deepcopy(WF_EX)
|
|
wf_ex['state'] = states.CANCELLED
|
|
wf_ex['state_info'] = 'Cancelled by user.'
|
|
mock_stop_wf.return_value = wf_ex
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
expected_exec['state'] = states.CANCELLED
|
|
expected_exec['state_info'] = 'Cancelled by user.'
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertDictEqual(expected_exec, resp.json)
|
|
|
|
mock_stop_wf.assert_called_once_with(
|
|
'123',
|
|
'CANCELLED',
|
|
'Cancelled by user.'
|
|
)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'resume_workflow')
|
|
def test_put_state_resume(self, mock_resume_wf):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.RUNNING
|
|
}
|
|
|
|
wf_ex = copy.deepcopy(WF_EX)
|
|
wf_ex['state'] = states.RUNNING
|
|
wf_ex['state_info'] = None
|
|
mock_resume_wf.return_value = wf_ex
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
expected_exec['state'] = states.RUNNING
|
|
expected_exec['state_info'] = None
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertDictEqual(expected_exec, resp.json)
|
|
mock_resume_wf.assert_called_once_with('123', env=None)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
def test_put_invalid_state(self):
|
|
invalid_states = [states.IDLE, states.WAITING, states.RUNNING_DELAYED]
|
|
|
|
for state in invalid_states:
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': state
|
|
}
|
|
|
|
resp = self.app.put_json(
|
|
'/v2/executions/123',
|
|
update_exec,
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
self.assertIn(
|
|
'Cannot change state to %s.' % state,
|
|
resp.json['faultstring']
|
|
)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'stop_workflow')
|
|
def test_put_state_info_unset(self, mock_stop_wf):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.ERROR,
|
|
}
|
|
|
|
wf_ex = copy.deepcopy(WF_EX)
|
|
wf_ex['state'] = states.ERROR
|
|
del wf_ex.state_info
|
|
mock_stop_wf.return_value = wf_ex
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
expected_exec = copy.deepcopy(WF_EX_JSON_WITH_DESC)
|
|
expected_exec['state'] = states.ERROR
|
|
expected_exec['state_info'] = None
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertDictEqual(expected_exec, resp.json)
|
|
mock_stop_wf.assert_called_once_with('123', 'ERROR', None)
|
|
|
|
@mock.patch('mistral.db.v2.api.get_workflow_execution')
|
|
@mock.patch(
|
|
'mistral.db.v2.api.update_workflow_execution',
|
|
return_value=WF_EX
|
|
)
|
|
def test_put_description(self, mock_update, mock_ensure):
|
|
update_params = {'description': 'execution description.'}
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_params)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
mock_ensure.assert_called_once_with(
|
|
'123',
|
|
fields=(models.WorkflowExecution.id,)
|
|
)
|
|
mock_update.assert_called_once_with('123', update_params)
|
|
|
|
@mock.patch.object(
|
|
sql_db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=copy.deepcopy(UPDATED_WF_EX))
|
|
)
|
|
@mock.patch(
|
|
'mistral.services.workflows.update_workflow_execution_env',
|
|
return_value=copy.deepcopy(UPDATED_WF_EX_ENV)
|
|
)
|
|
def test_put_env(self, mock_update_env):
|
|
update_exec = {'params': '{"env": {"k1": "def"}}'}
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertEqual(update_exec['params'], resp.json['params'])
|
|
|
|
mock_update_env.assert_called_once_with(UPDATED_WF_EX, {'k1': 'def'})
|
|
|
|
@mock.patch.object(db_api, 'update_workflow_execution', MOCK_NOT_FOUND)
|
|
def test_put_not_found(self):
|
|
resp = self.app.put_json(
|
|
'/v2/executions/123',
|
|
dict(state=states.PAUSED),
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(404, resp.status_int)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
def test_put_empty(self):
|
|
resp = self.app.put_json('/v2/executions/123', {}, expect_errors=True)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
self.assertIn(
|
|
'state, description, or env is not provided for update',
|
|
resp.json['faultstring']
|
|
)
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
def test_put_state_and_description(self):
|
|
resp = self.app.put_json(
|
|
'/v2/executions/123',
|
|
{'description': 'foobar', 'state': states.ERROR},
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
self.assertIn(
|
|
'description must be updated separately from state',
|
|
resp.json['faultstring']
|
|
)
|
|
|
|
@mock.patch.object(
|
|
sql_db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=copy.deepcopy(UPDATED_WF_EX))
|
|
)
|
|
@mock.patch(
|
|
'mistral.db.v2.api.update_workflow_execution',
|
|
return_value=WF_EX
|
|
)
|
|
@mock.patch(
|
|
'mistral.services.workflows.update_workflow_execution_env',
|
|
return_value=copy.deepcopy(UPDATED_WF_EX_ENV_DESC)
|
|
)
|
|
def test_put_env_and_description(self, mock_update_env, mock_update):
|
|
update_exec = {
|
|
'description': 'foobar',
|
|
'params': '{"env": {"k1": "def"}}'
|
|
}
|
|
|
|
resp = self.app.put_json('/v2/executions/123', update_exec)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertEqual(update_exec['description'], resp.json['description'])
|
|
self.assertEqual(update_exec['params'], resp.json['params'])
|
|
|
|
mock_update.assert_called_once_with('123', {'description': 'foobar'})
|
|
mock_update_env.assert_called_once_with(UPDATED_WF_EX, {'k1': 'def'})
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=None)
|
|
)
|
|
def test_put_env_wrong_state(self):
|
|
update_exec = {
|
|
'id': WF_EX['id'],
|
|
'state': states.SUCCESS,
|
|
'params': '{"env": {"k1": "def"}}'
|
|
}
|
|
|
|
resp = self.app.put_json(
|
|
'/v2/executions/123',
|
|
update_exec,
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
expected_fault = (
|
|
'env can only be updated when workflow execution '
|
|
'is not running or on resume from pause'
|
|
)
|
|
|
|
self.assertIn(expected_fault, resp.json['faultstring'])
|
|
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
def test_post_auto_id(self, start_wf_func):
|
|
# NOTE: In fact, we use "white box" testing here to understand
|
|
# if the REST controller calls other APIs as expected. This is
|
|
# the only way of testing available with the current testing
|
|
# infrastructure.
|
|
wf_ex_dict = WF_EX.to_dict()
|
|
start_wf_func.return_value = wf_ex_dict
|
|
|
|
json_body = WF_EX_JSON_WITH_DESC.copy()
|
|
|
|
expected_json = WF_EX_JSON_WITH_DESC
|
|
|
|
resp = self.app.post_json('/v2/executions', json_body)
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(expected_json, resp.json)
|
|
|
|
kwargs = json.loads(expected_json['params'])
|
|
kwargs['description'] = expected_json['description']
|
|
|
|
start_wf_func.assert_called_once_with(
|
|
expected_json['workflow_id'],
|
|
'',
|
|
wf_ex_dict['id'],
|
|
json.loads(expected_json['input']),
|
|
**kwargs
|
|
)
|
|
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
@mock.patch.object(db_api, 'load_workflow_execution')
|
|
def test_post_with_exec_id_exec_doesnt_exist(self, load_wf_ex_func,
|
|
start_wf_func):
|
|
# NOTE: In fact, we use "white box" testing here to understand
|
|
# if the REST controller calls other APIs as expected. This is
|
|
# the only way of testing available with the current testing
|
|
# infrastructure.
|
|
|
|
# Imitate that the execution doesn't exist in DB.
|
|
load_wf_ex_func.return_value = None
|
|
start_wf_func.return_value = WF_EX.to_dict()
|
|
|
|
# We want to pass execution ID in this case so we don't delete 'id'
|
|
# from the dict.
|
|
json_body = WF_EX_JSON_WITH_DESC.copy()
|
|
|
|
expected_json = WF_EX_JSON_WITH_DESC
|
|
|
|
resp = self.app.post_json('/v2/executions', json_body)
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(expected_json, resp.json)
|
|
|
|
load_wf_ex_func.assert_called_once_with(expected_json['id'])
|
|
|
|
kwargs = json.loads(expected_json['params'])
|
|
kwargs['description'] = expected_json['description']
|
|
|
|
start_wf_func.assert_called_once_with(
|
|
expected_json['workflow_id'],
|
|
'',
|
|
expected_json['id'],
|
|
json.loads(expected_json['input']),
|
|
**kwargs
|
|
)
|
|
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
@mock.patch.object(db_api, 'load_workflow_execution')
|
|
def test_post_with_exec_id_exec_exists(self, load_wf_ex_func,
|
|
start_wf_func):
|
|
# NOTE: In fact, we use "white box" testing here to understand
|
|
# if the REST controller calls other APIs as expected. This is
|
|
# the only way of testing available with the current testing
|
|
# infrastructure.
|
|
|
|
# Imitate that the execution exists in DB.
|
|
load_wf_ex_func.return_value = WF_EX
|
|
|
|
# We want to pass execution ID in this case so we don't delete 'id'
|
|
# from the dict.
|
|
json_body = WF_EX_JSON_WITH_DESC.copy()
|
|
|
|
expected_json = WF_EX_JSON_WITH_DESC
|
|
|
|
resp = self.app.post_json('/v2/executions', json_body)
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(expected_json, resp.json)
|
|
|
|
load_wf_ex_func.assert_called_once_with(expected_json['id'])
|
|
|
|
# Note that "start_workflow" method on engine API should not be called
|
|
# in this case because we passed execution ID to the endpoint and the
|
|
# corresponding object exists.
|
|
start_wf_func.assert_not_called()
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
def test_post_with_source_execution_id(self, wf_exec_mock):
|
|
wf_exec_mock.return_value = SOURCE_WF_EX.to_dict()
|
|
|
|
resp = self.app.post_json('/v2/executions/',
|
|
SOURCE_WF_EX_JSON_WITH_DESC)
|
|
|
|
source_wf_ex_json = copy.copy(SOURCE_WF_EX_JSON_WITH_DESC)
|
|
del source_wf_ex_json['source_execution_id']
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(source_wf_ex_json, resp.json)
|
|
|
|
exec_dict = source_wf_ex_json
|
|
|
|
expected_description = "{} Based on the execution '{}'".format(
|
|
exec_dict['description'],
|
|
SOURCE_WF_EX_JSON_WITH_DESC['source_execution_id']
|
|
)
|
|
|
|
wf_exec_mock.assert_called_once_with(
|
|
exec_dict['workflow_id'],
|
|
'',
|
|
exec_dict['id'],
|
|
json.loads(exec_dict['input']),
|
|
description=expected_description,
|
|
**json.loads(exec_dict['params'])
|
|
)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
def test_post_with_src_exec_id_without_exec_id(self, wf_exec_mock):
|
|
source_wf_ex = copy.copy(SOURCE_WF_EX)
|
|
|
|
source_wf_ex_json = copy.copy(SOURCE_WF_EX_JSON_WITH_DESC)
|
|
|
|
wf_exec_mock.return_value = source_wf_ex.to_dict()
|
|
|
|
resp = self.app.post_json('/v2/executions/', source_wf_ex_json)
|
|
|
|
del source_wf_ex_json['source_execution_id']
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(source_wf_ex_json, resp.json)
|
|
|
|
exec_dict = source_wf_ex_json
|
|
|
|
expected_description = "{} Based on the execution '{}'".format(
|
|
exec_dict['description'],
|
|
SOURCE_WF_EX_JSON_WITH_DESC['source_execution_id']
|
|
)
|
|
|
|
wf_exec_mock.assert_called_once_with(
|
|
exec_dict['workflow_id'],
|
|
'',
|
|
exec_dict['id'],
|
|
json.loads(exec_dict['input']),
|
|
description=expected_description,
|
|
**json.loads(exec_dict['params'])
|
|
)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_EMPTY)
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
def test_post_without_source_execution_id(self, wf_exec_mock):
|
|
wf_exec_mock.return_value = SOURCE_WF_EX.to_dict()
|
|
|
|
source_wf_ex_json = copy.copy(SOURCE_WF_EX_JSON_WITH_DESC)
|
|
source_wf_ex_json['source_execution_id'] = ""
|
|
# here we want to pass an empty value into the api for the
|
|
# source execution id to make sure that the correct actions are
|
|
# taken.
|
|
|
|
resp = self.app.post_json('/v2/executions/', source_wf_ex_json)
|
|
self.assertEqual(201, resp.status_int)
|
|
|
|
del source_wf_ex_json['source_execution_id']
|
|
# here we have to remove the source execution key as the
|
|
# id is only used to perform a lookup.
|
|
|
|
self.assertDictEqual(source_wf_ex_json, resp.json)
|
|
exec_dict = source_wf_ex_json
|
|
|
|
wf_exec_mock.assert_called_once_with(
|
|
exec_dict['workflow_id'],
|
|
'',
|
|
exec_dict['id'],
|
|
json.loads(exec_dict['input']),
|
|
description=exec_dict['description'],
|
|
**json.loads(exec_dict['params'])
|
|
)
|
|
|
|
@mock.patch.object(rpc_clients.EngineClient, 'start_workflow')
|
|
def test_post_with_params_none(self, start_wf_func):
|
|
wf_ex_dict = WF_EX.to_dict()
|
|
|
|
start_wf_func.return_value = wf_ex_dict
|
|
|
|
json_body = WF_EX_JSON_WITH_DESC.copy()
|
|
|
|
json_body['params'] = None
|
|
|
|
expected_json = WF_EX_JSON_WITH_DESC
|
|
|
|
resp = self.app.post_json('/v2/executions', json_body)
|
|
|
|
self.assertEqual(201, resp.status_int)
|
|
self.assertDictEqual(expected_json, resp.json)
|
|
|
|
@mock.patch.object(
|
|
rpc_clients.EngineClient,
|
|
'start_workflow',
|
|
MOCK_ACTION_EXC
|
|
)
|
|
def test_post_throws_exception(self):
|
|
context = self.assertRaises(
|
|
webtest_app.AppError,
|
|
self.app.post_json,
|
|
'/v2/executions',
|
|
WF_EX_JSON
|
|
)
|
|
|
|
self.assertIn('Bad response: 400', context.args[0])
|
|
|
|
def test_post_without_workflow_id_and_name(self):
|
|
context = self.assertRaises(
|
|
webtest_app.AppError,
|
|
self.app.post_json,
|
|
'/v2/executions',
|
|
{'description': 'some description here.'}
|
|
)
|
|
|
|
self.assertIn('Bad response: 400', context.args[0])
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=(states.RUNNING,))
|
|
)
|
|
def test_delete_running_execution(self):
|
|
resp = self.app.delete('/v2/executions/123', expect_errors=True)
|
|
|
|
self.assertEqual(403, resp.status_int)
|
|
self.assertIn(
|
|
"Only completed executions can be deleted. "
|
|
"Use --force to override this. "
|
|
"Execution 123 is in RUNNING state",
|
|
resp.body.decode()
|
|
)
|
|
|
|
@mock.patch.object(db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=(states.ERROR,)))
|
|
@mock.patch.object(db_api,
|
|
'delete_workflow_execution',
|
|
MOCK_DELETE)
|
|
def test_delete_error_exec(self):
|
|
resp = self.app.delete('/v2/executions/123')
|
|
|
|
self.assertEqual(204, resp.status_int)
|
|
|
|
@mock.patch.object(db_api,
|
|
'get_workflow_execution',
|
|
mock.MagicMock(return_value=(states.SUCCESS,)))
|
|
@mock.patch.object(db_api,
|
|
'delete_workflow_execution',
|
|
MOCK_DELETE)
|
|
def test_delete_success_exec(self):
|
|
resp = self.app.delete('/v2/executions/123')
|
|
|
|
self.assertEqual(204, resp.status_int)
|
|
|
|
@mock.patch.object(db_api, 'delete_workflow_execution', MOCK_NOT_FOUND)
|
|
def test_delete_not_found(self):
|
|
resp = self.app.delete('/v2/executions/123', expect_errors=True)
|
|
|
|
self.assertEqual(404, resp.status_int)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_WF_EXECUTIONS)
|
|
def test_get_all(self):
|
|
resp = self.app.get('/v2/executions')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertEqual(1, len(resp.json['executions']))
|
|
self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json['executions'][0])
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_executions')
|
|
def test_get_all_operational_error(self, mocked_get_all):
|
|
mocked_get_all.side_effect = [
|
|
# Emulating DB OperationalError
|
|
sa.exc.OperationalError('Mock', 'mock', 'mock'),
|
|
[WF_EX] # Successful run
|
|
]
|
|
|
|
resp = self.app.get('/v2/executions')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertEqual(1, len(resp.json['executions']))
|
|
self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json['executions'][0])
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_EMPTY)
|
|
def test_get_all_empty(self):
|
|
resp = self.app.get('/v2/executions')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertEqual(0, len(resp.json['executions']))
|
|
|
|
@mock.patch.object(db_api, "get_workflow_executions", MOCK_WF_EXECUTIONS)
|
|
def test_get_all_pagination(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=1&sort_keys=id,workflow_name'
|
|
'&sort_dirs=asc,desc')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
self.assertIn('next', resp.json)
|
|
self.assertEqual(1, len(resp.json['executions']))
|
|
self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json['executions'][0])
|
|
|
|
param_dict = utils.get_dict_from_string(
|
|
resp.json['next'].split('?')[1],
|
|
delimiter='&'
|
|
)
|
|
|
|
expected_dict = {
|
|
'marker': '123e4567-e89b-12d3-a456-426655440000',
|
|
'limit': 1,
|
|
'sort_keys': 'id,workflow_name',
|
|
'sort_dirs': 'asc,desc'
|
|
}
|
|
|
|
self.assertDictEqual(expected_dict, param_dict)
|
|
|
|
def test_get_all_pagination_limit_negative(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=-1&sort_keys=id&sort_dirs=asc',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
self.assertIn("Limit must be positive", resp.body.decode())
|
|
|
|
def test_get_all_pagination_limit_not_integer(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=1.1&sort_keys=id&sort_dirs=asc',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
self.assertIn("unable to convert to int", resp.body.decode())
|
|
|
|
def test_get_all_pagination_invalid_sort_dirs_length(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=1&sort_keys=id&sort_dirs=asc,asc',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
self.assertIn(
|
|
"Length of sort_keys must be equal or greater than sort_dirs",
|
|
resp.body.decode()
|
|
)
|
|
|
|
def test_get_all_pagination_unknown_direction(self):
|
|
resp = self.app.get(
|
|
'/v2/actions?limit=1&sort_keys=id&sort_dirs=nonexist',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(400, resp.status_int)
|
|
|
|
self.assertIn("Unknown sort direction", resp.body.decode())
|
|
|
|
@mock.patch.object(
|
|
db_api,
|
|
'get_workflow_executions',
|
|
MOCK_SUB_WF_EXECUTIONS
|
|
)
|
|
def test_get_task_workflow_executions(self):
|
|
resp = self.app.get(
|
|
'/v2/tasks/%s/workflow_executions' % SUB_WF_EX.task_execution_id
|
|
)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertEqual(1, len(resp.json['executions']))
|
|
self.assertDictEqual(
|
|
SUB_WF_EX_JSON_WITH_DESC,
|
|
resp.json['executions'][0]
|
|
)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_WF_EXECUTIONS)
|
|
@mock.patch.object(rest_utils, 'get_all',
|
|
return_value=resources.Executions())
|
|
def test_get_all_executions_with_output(self, mock_get_all):
|
|
resp = self.app.get('/v2/executions?include_output=true')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
args, kwargs = mock_get_all.call_args
|
|
resource_function = kwargs['resource_function']
|
|
|
|
self.assertEqual(
|
|
execution._get_workflow_execution_resource_with_output,
|
|
resource_function
|
|
)
|
|
|
|
@mock.patch.object(db_api, 'get_workflow_executions', MOCK_WF_EXECUTIONS)
|
|
@mock.patch.object(rest_utils, 'get_all',
|
|
return_value=resources.Executions())
|
|
def test_get_all_executions_without_output(self, mock_get_all):
|
|
resp = self.app.get('/v2/executions')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
args, kwargs = mock_get_all.call_args
|
|
resource_function = kwargs['resource_function']
|
|
|
|
self.assertEqual(
|
|
execution._get_workflow_execution_resource,
|
|
resource_function
|
|
)
|
|
|
|
@mock.patch('mistral.db.v2.api.get_workflow_executions')
|
|
@mock.patch('mistral.context.MistralContext.from_environ')
|
|
def test_get_all_projects_admin(self, mock_context, mock_get_execs):
|
|
admin_ctx = unit_base.get_context(admin=True)
|
|
mock_context.return_value = admin_ctx
|
|
|
|
resp = self.app.get('/v2/executions?all_projects=true')
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertTrue(mock_get_execs.call_args[1].get('insecure', False))
|
|
|
|
def test_get_all_projects_normal_user(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?all_projects=true',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(403, resp.status_int)
|
|
|
|
@mock.patch('mistral.db.v2.api.get_workflow_executions')
|
|
@mock.patch('mistral.context.MistralContext.from_environ')
|
|
def test_get_all_filter_by_project_id(self, mock_context, mock_get_execs):
|
|
admin_ctx = unit_base.get_context(admin=True)
|
|
mock_context.return_value = admin_ctx
|
|
|
|
fake_project_id = uuidutils.generate_uuid()
|
|
|
|
resp = self.app.get('/v2/executions?project_id=%s' % fake_project_id)
|
|
|
|
self.assertEqual(200, resp.status_int)
|
|
|
|
self.assertTrue(mock_get_execs.call_args[1].get('insecure', False))
|
|
self.assertTrue(
|
|
mock_get_execs.call_args[1].get('project_id', fake_project_id)
|
|
)
|
|
|
|
def test_get_all_with_nulls_not_valid(self):
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=10&sort_keys=id&sort_dirs=asc&nulls=invalid',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(500, resp.status_int)
|
|
self.assertIn(
|
|
"'invalid' is not a valid field name.",
|
|
resp.body.decode()
|
|
)
|
|
|
|
resp = self.app.get(
|
|
'/v2/executions?limit=10&sort_keys=id&sort_dirs=asc&nulls=id',
|
|
expect_errors=True
|
|
)
|
|
|
|
self.assertEqual(500, resp.status_int)
|
|
self.assertIn(
|
|
"The field 'id' can't hold None value.",
|
|
resp.body.decode()
|
|
)
|