Add support for driver-specific periodic tasks
Syncs openstack.common.periodic_task to commit 0848516902444ca83dd4998655cae3901d038d (https://review.openstack.org/#/c/148854/). This patch also makes service start running periodic tasks after init_host() is called. Change-Id: I63ee4ce30b3684e53158fe5f985efd04e4e88fef Implements: blueprint driver-periodic-tasks
This commit is contained in:
parent
dc5b9c65d0
commit
d3d3164a6b
@ -71,6 +71,37 @@ There are three categories of driver interfaces:
|
||||
unable to do so within the `core` or `standard` interfaces. In this case, Ironic
|
||||
will merely relay the message from the API service to the appropriate driver.
|
||||
|
||||
Driver-Specific Periodic Tasks
|
||||
------------------------------
|
||||
|
||||
Drivers may run their own periodic tasks, i.e. actions run repeatedly after
|
||||
a certain amount of time. Such task is created by decorating a method on the
|
||||
driver itself or on any interface with driver_periodic_task_ decorator, e.g.
|
||||
|
||||
::
|
||||
|
||||
class FakePower(base.PowerInterface):
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
def task(self, manager, context):
|
||||
pass # do something
|
||||
|
||||
class FakeDriver(base.BaseDriver):
|
||||
def __init__(self):
|
||||
self.power = FakePower()
|
||||
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
def task2(self, manager, context):
|
||||
pass # do something
|
||||
|
||||
|
||||
Here ``spacing`` argument is a period for a given periodic task.
|
||||
|
||||
.. note::
|
||||
By default periodic task names are derived from method names,
|
||||
so they should be unique within a Python module.
|
||||
Use ``name`` argument to driver_periodic_task_ to override
|
||||
automatically generated name.
|
||||
|
||||
|
||||
Message Routing
|
||||
===============
|
||||
@ -95,3 +126,4 @@ driver actions such as take-over or clean-up.
|
||||
.. _DB API: ../api/ironic.db.api.html
|
||||
.. _diskimage-builder: https://github.com/openstack/diskimage-builder
|
||||
.. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html
|
||||
.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task
|
||||
|
@ -65,6 +65,12 @@ def get_driver(driver_name):
|
||||
raise exception.DriverNotFound(driver_name=driver_name)
|
||||
|
||||
|
||||
def drivers():
|
||||
"""Get all drivers as a dict name -> driver object."""
|
||||
factory = DriverFactory()
|
||||
return {name: factory[name].obj for name in factory.names}
|
||||
|
||||
|
||||
class DriverFactory(object):
|
||||
"""Discover, load and manage the drivers available."""
|
||||
|
||||
|
@ -64,12 +64,12 @@ class RPCService(service.Service):
|
||||
def start(self):
|
||||
super(RPCService, self).start()
|
||||
admin_context = context.RequestContext('admin', 'admin', is_admin=True)
|
||||
self.manager.init_host()
|
||||
self.tg.add_dynamic_timer(
|
||||
self.manager.periodic_tasks,
|
||||
periodic_interval_max=cfg.CONF.periodic_interval,
|
||||
context=admin_context)
|
||||
|
||||
self.manager.init_host()
|
||||
target = messaging.Target(topic=self.topic, server=self.host)
|
||||
endpoints = [self.manager]
|
||||
serializer = objects_base.IronicObjectSerializer()
|
||||
|
@ -43,6 +43,7 @@ a change, etc.
|
||||
|
||||
import collections
|
||||
import datetime
|
||||
import inspect
|
||||
import tempfile
|
||||
import threading
|
||||
|
||||
@ -212,6 +213,16 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
||||
LOG.error(msg, self.host)
|
||||
raise exception.NoDriversLoaded(conductor=self.host)
|
||||
|
||||
# Collect driver-specific periodic tasks
|
||||
for driver_obj in driver_factory.drivers().values():
|
||||
self._collect_periodic_tasks(driver_obj)
|
||||
for iface_name in (driver_obj.core_interfaces +
|
||||
driver_obj.standard_interfaces +
|
||||
['vendor']):
|
||||
iface = getattr(driver_obj, iface_name, None)
|
||||
if iface:
|
||||
self._collect_periodic_tasks(iface)
|
||||
|
||||
# clear all locks held by this conductor before registering
|
||||
self.dbapi.clear_node_reservations_for_conductor(self.host)
|
||||
try:
|
||||
@ -248,6 +259,11 @@ class ConductorManager(periodic_task.PeriodicTasks):
|
||||
LOG.critical(_LC('Failed to start keepalive'))
|
||||
self.del_host()
|
||||
|
||||
def _collect_periodic_tasks(self, obj):
|
||||
for n, method in inspect.getmembers(obj, inspect.ismethod):
|
||||
if getattr(method, '_periodic_enabled', False):
|
||||
self.add_periodic_task(method)
|
||||
|
||||
def del_host(self):
|
||||
self._keepalive_evt.set()
|
||||
try:
|
||||
|
@ -22,12 +22,14 @@ import collections
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
import eventlet
|
||||
from oslo.utils import excutils
|
||||
import six
|
||||
|
||||
from ironic.common import exception
|
||||
from ironic.common.i18n import _LE
|
||||
from ironic.openstack.common import log as logging
|
||||
from ironic.openstack.common import periodic_task
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -603,3 +605,40 @@ class ManagementInterface(object):
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
def driver_periodic_task(parallel=True, **other):
|
||||
"""Decorator for a driver-specific periodic task.
|
||||
|
||||
Example::
|
||||
|
||||
class MyDriver(base.BaseDriver):
|
||||
@base.driver_periodic_task(spacing=42)
|
||||
def task(self, manager, context):
|
||||
# do some job
|
||||
|
||||
:param parallel: whether to run this task in a separate thread
|
||||
:param other: arguments to pass to @periodic_task.periodic_task
|
||||
"""
|
||||
# TODO(dtantsur): drop all this magic once
|
||||
# https://review.openstack.org/#/c/134303/ lands
|
||||
semaphore = eventlet.semaphore.BoundedSemaphore()
|
||||
|
||||
def decorator2(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if parallel:
|
||||
def _internal():
|
||||
with semaphore:
|
||||
func(*args, **kwargs)
|
||||
|
||||
eventlet.greenthread.spawn_n(_internal)
|
||||
else:
|
||||
func(*args, **kwargs)
|
||||
|
||||
# NOTE(dtantsur): name should be unique
|
||||
other.setdefault('name', '%s.%s' % (func.__module__, func.__name__))
|
||||
decorator = periodic_task.periodic_task(**other)
|
||||
return decorator(wrapper)
|
||||
|
||||
return decorator2
|
||||
|
@ -11,13 +11,14 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import random
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from ironic.openstack.common.gettextutils import _, _LE, _LI
|
||||
from ironic.openstack.common._i18n import _, _LE, _LI
|
||||
from ironic.openstack.common import log as logging
|
||||
|
||||
|
||||
@ -36,6 +37,11 @@ LOG = logging.getLogger(__name__)
|
||||
DEFAULT_INTERVAL = 60.0
|
||||
|
||||
|
||||
def list_opts():
|
||||
"""Entry point for oslo-config-generator."""
|
||||
return [(None, copy.deepcopy(periodic_opts))]
|
||||
|
||||
|
||||
class InvalidPeriodicTaskArg(Exception):
|
||||
message = _("Unexpected argument for periodic task creation: %(arg)s.")
|
||||
|
||||
@ -49,14 +55,15 @@ def periodic_task(*args, **kwargs):
|
||||
interval of 60 seconds.
|
||||
|
||||
2. With arguments:
|
||||
@periodic_task(spacing=N [, run_immediately=[True|False]])
|
||||
@periodic_task(spacing=N [, run_immediately=[True|False]]
|
||||
[, name=[None|"string"])
|
||||
this will be run on approximately every N seconds. If this number is
|
||||
negative the periodic task will be disabled. If the run_immediately
|
||||
argument is provided and has a value of 'True', the first run of the
|
||||
task will be shortly after task scheduler starts. If
|
||||
run_immediately is omitted or set to 'False', the first time the
|
||||
task runs will be approximately N seconds after the task scheduler
|
||||
starts.
|
||||
starts. If name is not provided, __name__ of function is used.
|
||||
"""
|
||||
def decorator(f):
|
||||
# Test for old style invocation
|
||||
@ -70,6 +77,7 @@ def periodic_task(*args, **kwargs):
|
||||
f._periodic_enabled = False
|
||||
else:
|
||||
f._periodic_enabled = kwargs.pop('enabled', True)
|
||||
f._periodic_name = kwargs.pop('name', f.__name__)
|
||||
|
||||
# Control frequency
|
||||
f._periodic_spacing = kwargs.pop('spacing', 0)
|
||||
@ -99,6 +107,36 @@ def periodic_task(*args, **kwargs):
|
||||
|
||||
|
||||
class _PeriodicTasksMeta(type):
|
||||
def _add_periodic_task(cls, task):
|
||||
"""Add a periodic task to the list of periodic tasks.
|
||||
|
||||
The task should already be decorated by @periodic_task.
|
||||
|
||||
:return: whether task was actually enabled
|
||||
"""
|
||||
name = task._periodic_name
|
||||
|
||||
if task._periodic_spacing < 0:
|
||||
LOG.info(_LI('Skipping periodic task %(task)s because '
|
||||
'its interval is negative'),
|
||||
{'task': name})
|
||||
return False
|
||||
if not task._periodic_enabled:
|
||||
LOG.info(_LI('Skipping periodic task %(task)s because '
|
||||
'it is disabled'),
|
||||
{'task': name})
|
||||
return False
|
||||
|
||||
# A periodic spacing of zero indicates that this task should
|
||||
# be run on the default interval to avoid running too
|
||||
# frequently.
|
||||
if task._periodic_spacing == 0:
|
||||
task._periodic_spacing = DEFAULT_INTERVAL
|
||||
|
||||
cls._periodic_tasks.append((name, task))
|
||||
cls._periodic_spacing[name] = task._periodic_spacing
|
||||
return True
|
||||
|
||||
def __init__(cls, names, bases, dict_):
|
||||
"""Metaclass that allows us to collect decorated periodic tasks."""
|
||||
super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
|
||||
@ -119,28 +157,7 @@ class _PeriodicTasksMeta(type):
|
||||
|
||||
for value in cls.__dict__.values():
|
||||
if getattr(value, '_periodic_task', False):
|
||||
task = value
|
||||
name = task.__name__
|
||||
|
||||
if task._periodic_spacing < 0:
|
||||
LOG.info(_LI('Skipping periodic task %(task)s because '
|
||||
'its interval is negative'),
|
||||
{'task': name})
|
||||
continue
|
||||
if not task._periodic_enabled:
|
||||
LOG.info(_LI('Skipping periodic task %(task)s because '
|
||||
'it is disabled'),
|
||||
{'task': name})
|
||||
continue
|
||||
|
||||
# A periodic spacing of zero indicates that this task should
|
||||
# be run on the default interval to avoid running too
|
||||
# frequently.
|
||||
if task._periodic_spacing == 0:
|
||||
task._periodic_spacing = DEFAULT_INTERVAL
|
||||
|
||||
cls._periodic_tasks.append((name, task))
|
||||
cls._periodic_spacing[name] = task._periodic_spacing
|
||||
cls._add_periodic_task(value)
|
||||
|
||||
|
||||
def _nearest_boundary(last_run, spacing):
|
||||
@ -172,6 +189,15 @@ class PeriodicTasks(object):
|
||||
for name, task in self._periodic_tasks:
|
||||
self._periodic_last_run[name] = task._periodic_last_run
|
||||
|
||||
def add_periodic_task(self, task):
|
||||
"""Add a periodic task to the list of periodic tasks.
|
||||
|
||||
The task should already be decorated by @periodic_task.
|
||||
"""
|
||||
if self.__class__._add_periodic_task(task):
|
||||
self._periodic_last_run[task._periodic_name] = (
|
||||
task._periodic_last_run)
|
||||
|
||||
def run_periodic_tasks(self, context, raise_on_error=False):
|
||||
"""Tasks to be run at a periodic interval."""
|
||||
idle_for = DEFAULT_INTERVAL
|
||||
|
@ -198,6 +198,8 @@ class StartStopTestCase(_ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
objects.Conductor.get_by_hostname,
|
||||
self.context, self.hostname)
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__getitem__',
|
||||
lambda *args: mock.MagicMock())
|
||||
def test_start_registers_driver_names(self):
|
||||
init_names = ['fake1', 'fake2']
|
||||
restart_names = ['fake3', 'fake4']
|
||||
@ -220,6 +222,47 @@ class StartStopTestCase(_ServiceSetUpMixin, tests_db_base.DbTestCase):
|
||||
self.hostname)
|
||||
self.assertEqual(restart_names, res['drivers'])
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__getitem__')
|
||||
def test_start_registers_driver_specific_tasks(self, get_mock):
|
||||
init_names = ['fake1']
|
||||
expected_task_name = 'ironic.tests.conductor.test_manager.task'
|
||||
expected_task_name2 = 'ironic.tests.conductor.test_manager.iface'
|
||||
self.config(enabled_drivers=init_names)
|
||||
|
||||
class TestInterface(object):
|
||||
@drivers_base.driver_periodic_task(spacing=100500)
|
||||
def iface(self):
|
||||
pass
|
||||
|
||||
class Driver(object):
|
||||
core_interfaces = []
|
||||
standard_interfaces = ['iface']
|
||||
|
||||
iface = TestInterface()
|
||||
|
||||
@drivers_base.driver_periodic_task(spacing=42)
|
||||
def task(self, context):
|
||||
pass
|
||||
|
||||
obj = Driver()
|
||||
self.assertTrue(obj.task._periodic_enabled)
|
||||
get_mock.return_value = mock.Mock(obj=obj)
|
||||
|
||||
with mock.patch.object(
|
||||
driver_factory.DriverFactory()._extension_manager,
|
||||
'names') as mock_names:
|
||||
mock_names.return_value = init_names
|
||||
self._start_service()
|
||||
tasks = dict(self.service._periodic_tasks)
|
||||
self.assertEqual(obj.task, tasks[expected_task_name])
|
||||
self.assertEqual(obj.iface.iface, tasks[expected_task_name2])
|
||||
self.assertEqual(42,
|
||||
self.service._periodic_spacing[expected_task_name])
|
||||
self.assertEqual(100500,
|
||||
self.service._periodic_spacing[expected_task_name2])
|
||||
self.assertIn(expected_task_name, self.service._periodic_last_run)
|
||||
self.assertIn(expected_task_name2, self.service._periodic_last_run)
|
||||
|
||||
@mock.patch.object(driver_factory.DriverFactory, '__init__')
|
||||
def test_start_fails_on_missing_driver(self, mock_df):
|
||||
mock_df.side_effect = exception.DriverNotFound('test')
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from ironic.common import exception
|
||||
@ -65,3 +66,35 @@ class PassthruDecoratorTestCase(base.TestCase):
|
||||
self.fvi.normalexception, mock.ANY)
|
||||
driver_base.LOG.exception.assert_called_with(
|
||||
mock.ANY, 'normalexception')
|
||||
|
||||
|
||||
@mock.patch.object(eventlet.greenthread, 'spawn_n',
|
||||
side_effect=lambda func, *args, **kw: func(*args, **kw))
|
||||
class DriverPeriodicTaskTestCase(base.TestCase):
|
||||
def test(self, spawn_mock):
|
||||
method_mock = mock.Mock()
|
||||
function_mock = mock.Mock()
|
||||
|
||||
class TestClass(object):
|
||||
@driver_base.driver_periodic_task(spacing=42)
|
||||
def method(self, foo, bar=None):
|
||||
method_mock(foo, bar=bar)
|
||||
|
||||
@driver_base.driver_periodic_task(spacing=100, parallel=False)
|
||||
def function():
|
||||
function_mock()
|
||||
|
||||
obj = TestClass()
|
||||
self.assertEqual(42, obj.method._periodic_spacing)
|
||||
self.assertTrue(obj.method._periodic_task)
|
||||
self.assertEqual('ironic.tests.drivers.test_base.method',
|
||||
obj.method._periodic_name)
|
||||
self.assertEqual('ironic.tests.drivers.test_base.function',
|
||||
function._periodic_name)
|
||||
|
||||
obj.method(1, bar=2)
|
||||
method_mock.assert_called_once_with(1, bar=2)
|
||||
self.assertEqual(1, spawn_mock.call_count)
|
||||
function()
|
||||
function_mock.assert_called_once_with()
|
||||
self.assertEqual(1, spawn_mock.call_count)
|
||||
|
Loading…
x
Reference in New Issue
Block a user