
* Removed extra client creation from trust * Delete trust only together with deleting trigger (so only one node deletes the trust) * Fix trust context with session Change-Id: Ic207b0364b6bc45a6e24e561ef1f540208795530
362 lines
10 KiB
Python
362 lines
10 KiB
Python
# Copyright 2014 - Mirantis, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import eventlet
|
|
import mock
|
|
|
|
from oslo_config import cfg
|
|
|
|
from mistral import exceptions as exc
|
|
from mistral.rpc import clients as rpc
|
|
from mistral.services import periodic
|
|
from mistral.services import security
|
|
from mistral.services import triggers as t_s
|
|
from mistral.services import workflows
|
|
from mistral.tests.unit import base
|
|
from mistral import utils
|
|
|
|
# Use the set_default method to set value otherwise in certain test cases
|
|
# the change in value is not permanent.
|
|
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
|
|
|
|
|
WORKFLOW_LIST = """
|
|
---
|
|
version: '2.0'
|
|
|
|
my_wf:
|
|
type: direct
|
|
|
|
tasks:
|
|
task1:
|
|
action: std.echo output='Hi!'
|
|
"""
|
|
|
|
advance_cron_trigger_orig = periodic.advance_cron_trigger
|
|
|
|
|
|
def new_advance_cron_trigger(ct):
|
|
"""Wrap the original advance_cron_trigger method.
|
|
|
|
This method makes sure that the other coroutines will also run
|
|
while this thread is executing. Without explicitly passing control to
|
|
another coroutine the process_cron_triggers_v2 will finish looping
|
|
over all the cron triggers in one coroutine without any sharing at all.
|
|
"""
|
|
eventlet.sleep()
|
|
modified = advance_cron_trigger_orig(ct)
|
|
eventlet.sleep()
|
|
|
|
return modified
|
|
|
|
|
|
class TriggerServiceV2Test(base.DbTestCase):
|
|
def setUp(self):
|
|
super(TriggerServiceV2Test, self).setUp()
|
|
|
|
self.wf = workflows.create_workflows(WORKFLOW_LIST)[0]
|
|
|
|
def test_trigger_create(self):
|
|
trigger = t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
None,
|
|
None,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
self.assertEqual(
|
|
datetime.datetime(2010, 8, 25, 0, 5),
|
|
trigger.next_execution_time
|
|
)
|
|
|
|
next_time = t_s.get_next_execution_time(
|
|
trigger['pattern'],
|
|
trigger.next_execution_time
|
|
)
|
|
|
|
self.assertEqual(datetime.datetime(2010, 8, 25, 0, 10), next_time)
|
|
|
|
def test_trigger_create_with_wf_id(self):
|
|
trigger = t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
None,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
None,
|
|
None,
|
|
datetime.datetime(2010, 8, 25),
|
|
workflow_id=self.wf.id
|
|
)
|
|
|
|
self.assertEqual(self.wf.name, trigger.workflow_name)
|
|
|
|
def test_trigger_create_the_same_first_time_or_count(self):
|
|
t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
"4242-12-25 13:37",
|
|
2,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
"4242-12-25 13:37",
|
|
4,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
"5353-12-25 13:37",
|
|
2,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
# Creations above should be ok.
|
|
|
|
# But creation with the same count and first time
|
|
# simultaneously leads to error.
|
|
self.assertRaises(
|
|
exc.DBDuplicateEntryError,
|
|
t_s.create_cron_trigger,
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
"4242-12-25 13:37",
|
|
2,
|
|
None
|
|
)
|
|
|
|
def test_trigger_create_wrong_workflow_input(self):
|
|
wf_with_input = """---
|
|
version: '2.0'
|
|
|
|
some_wf:
|
|
input:
|
|
- some_var
|
|
tasks:
|
|
some_task:
|
|
action: std.echo output=<% $.some_var %>
|
|
"""
|
|
workflows.create_workflows(wf_with_input)
|
|
exception = self.assertRaises(
|
|
exc.InputException,
|
|
t_s.create_cron_trigger,
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
'some_wf',
|
|
{},
|
|
{},
|
|
'*/5 * * * *',
|
|
None,
|
|
None,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
self.assertIn('Invalid input', exception.message)
|
|
self.assertIn('some_wf', exception.message)
|
|
|
|
def test_oneshot_trigger_create(self):
|
|
trigger = t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
None,
|
|
"4242-12-25 13:37",
|
|
None,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
self.assertEqual(
|
|
datetime.datetime(4242, 12, 25, 13, 37),
|
|
trigger.next_execution_time
|
|
)
|
|
|
|
@mock.patch.object(security, 'create_trust',
|
|
type('trust', (object,), {'id': 'my_trust_id'}))
|
|
def test_create_trust_in_trigger(self):
|
|
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
|
self.addCleanup(
|
|
cfg.CONF.set_default, 'auth_enable',
|
|
False, group='pecan'
|
|
)
|
|
|
|
trigger = t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'*/2 * * * *',
|
|
None,
|
|
None,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
self.assertEqual('my_trust_id', trigger.trust_id)
|
|
|
|
@mock.patch.object(security, 'create_trust',
|
|
type('trust', (object,), {'id': 'my_trust_id'}))
|
|
@mock.patch.object(security, 'create_context')
|
|
@mock.patch.object(rpc.EngineClient, 'start_workflow', mock.Mock())
|
|
@mock.patch(
|
|
'mistral.services.periodic.advance_cron_trigger',
|
|
mock.MagicMock(side_effect=new_advance_cron_trigger)
|
|
)
|
|
@mock.patch.object(security, 'delete_trust')
|
|
def test_create_delete_trust_in_trigger(self, create_ctx, delete_trust):
|
|
create_ctx.return_value = self.ctx
|
|
cfg.CONF.set_default('auth_enable', True, group='pecan')
|
|
trigger_thread = periodic.setup()
|
|
self.addCleanup(trigger_thread.stop)
|
|
self.addCleanup(
|
|
cfg.CONF.set_default, 'auth_enable',
|
|
False, group='pecan'
|
|
)
|
|
|
|
t_s.create_cron_trigger(
|
|
'trigger-%s' % utils.generate_unicode_uuid(),
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'* * * * * *',
|
|
None,
|
|
1,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
self._await(
|
|
lambda: delete_trust.call_count == 1, timeout=10
|
|
)
|
|
self.assertEqual('my_trust_id', delete_trust.mock_calls[0][1][0])
|
|
|
|
def test_get_trigger_in_correct_orders(self):
|
|
t1_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
|
|
|
t_s.create_cron_trigger(
|
|
t1_name,
|
|
self.wf.name,
|
|
{},
|
|
pattern='*/5 * * * *',
|
|
start_time=datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
t2_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
|
|
|
t_s.create_cron_trigger(
|
|
t2_name,
|
|
self.wf.name,
|
|
{},
|
|
pattern='*/1 * * * *',
|
|
start_time=datetime.datetime(2010, 8, 22)
|
|
)
|
|
|
|
t3_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
|
|
|
t_s.create_cron_trigger(
|
|
t3_name,
|
|
self.wf.name,
|
|
{},
|
|
pattern='*/2 * * * *',
|
|
start_time=datetime.datetime(2010, 9, 21)
|
|
)
|
|
|
|
t4_name = 'trigger-%s' % utils.generate_unicode_uuid()
|
|
|
|
t_s.create_cron_trigger(
|
|
t4_name,
|
|
self.wf.name,
|
|
{},
|
|
pattern='*/3 * * * *',
|
|
start_time=datetime.datetime.utcnow() + datetime.timedelta(0, 50)
|
|
)
|
|
|
|
trigger_names = [t.name for t in t_s.get_next_cron_triggers()]
|
|
|
|
self.assertEqual([t2_name, t1_name, t3_name], trigger_names)
|
|
|
|
@mock.patch(
|
|
'mistral.services.periodic.advance_cron_trigger',
|
|
mock.MagicMock(side_effect=new_advance_cron_trigger)
|
|
)
|
|
@mock.patch.object(rpc.EngineClient, 'start_workflow')
|
|
def test_single_execution_with_multiple_processes(self, start_wf_mock):
|
|
def stop_thread_groups():
|
|
print('Killing cron trigger threads...')
|
|
[tg.stop() for tg in self.trigger_threads]
|
|
|
|
self.trigger_threads = [
|
|
periodic.setup(),
|
|
periodic.setup(),
|
|
periodic.setup()
|
|
]
|
|
self.addCleanup(stop_thread_groups)
|
|
|
|
trigger_count = 5
|
|
t_s.create_cron_trigger(
|
|
'ct1',
|
|
self.wf.name,
|
|
{},
|
|
{},
|
|
'* * * * * */1', # Every second
|
|
None,
|
|
trigger_count,
|
|
datetime.datetime(2010, 8, 25)
|
|
)
|
|
|
|
# Wait until there are 'trigger_count' executions.
|
|
self._await(
|
|
lambda: self._wait_for_single_execution_with_multiple_processes(
|
|
trigger_count,
|
|
start_wf_mock
|
|
)
|
|
)
|
|
|
|
# Wait some more and make sure there are no more than 'trigger_count'
|
|
# executions.
|
|
eventlet.sleep(5)
|
|
|
|
self.assertEqual(trigger_count, start_wf_mock.call_count)
|
|
|
|
def _wait_for_single_execution_with_multiple_processes(self, trigger_count,
|
|
start_wf_mock):
|
|
eventlet.sleep(1)
|
|
|
|
return trigger_count == start_wf_mock.call_count
|
|
|
|
def test_get_next_execution_time(self):
|
|
pattern = '*/20 * * * *'
|
|
start_time = datetime.datetime(2016, 3, 22, 23, 40)
|
|
result = t_s.get_next_execution_time(pattern, start_time)
|
|
self.assertEqual(result, datetime.datetime(2016, 3, 23, 0, 0))
|