From d3d3164a6b8bc4ff5fdd02ea4ba22e411868b729 Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Thu, 15 Jan 2015 15:22:36 +0100 Subject: [PATCH] Add support for driver-specific periodic tasks Syncs openstack.common.periodic_task to commit 0848516902444ca83dd4998655cae3901d038d (https://review.openstack.org/#/c/148854/). This patch also makes service start running periodic tasks after init_host() is called. Change-Id: I63ee4ce30b3684e53158fe5f985efd04e4e88fef Implements: blueprint driver-periodic-tasks --- doc/source/dev/architecture.rst | 32 ++++++++++ ironic/common/driver_factory.py | 6 ++ ironic/common/service.py | 2 +- ironic/conductor/manager.py | 16 +++++ ironic/drivers/base.py | 39 ++++++++++++ ironic/openstack/common/periodic_task.py | 76 ++++++++++++++++-------- ironic/tests/conductor/test_manager.py | 43 ++++++++++++++ ironic/tests/drivers/test_base.py | 33 ++++++++++ 8 files changed, 221 insertions(+), 26 deletions(-) diff --git a/doc/source/dev/architecture.rst b/doc/source/dev/architecture.rst index 21fbad8962..dedcfd4328 100644 --- a/doc/source/dev/architecture.rst +++ b/doc/source/dev/architecture.rst @@ -71,6 +71,37 @@ There are three categories of driver interfaces: unable to do so within the `core` or `standard` interfaces. In this case, Ironic will merely relay the message from the API service to the appropriate driver. +Driver-Specific Periodic Tasks +------------------------------ + +Drivers may run their own periodic tasks, i.e. actions run repeatedly after +a certain amount of time. Such task is created by decorating a method on the +driver itself or on any interface with driver_periodic_task_ decorator, e.g. + +:: + + class FakePower(base.PowerInterface): + @base.driver_periodic_task(spacing=42) + def task(self, manager, context): + pass # do something + + class FakeDriver(base.BaseDriver): + def __init__(self): + self.power = FakePower() + + @base.driver_periodic_task(spacing=42) + def task2(self, manager, context): + pass # do something + + +Here ``spacing`` argument is a period for a given periodic task. + +.. note:: + By default periodic task names are derived from method names, + so they should be unique within a Python module. + Use ``name`` argument to driver_periodic_task_ to override + automatically generated name. + Message Routing =============== @@ -95,3 +126,4 @@ driver actions such as take-over or clean-up. .. _DB API: ../api/ironic.db.api.html .. _diskimage-builder: https://github.com/openstack/diskimage-builder .. _consistent hashing algorithm: ../api/ironic.common.hash_ring.html +.. _driver_periodic_task: ../api/ironic.drivers.base.html#ironic.drivers.base.driver_periodic_task diff --git a/ironic/common/driver_factory.py b/ironic/common/driver_factory.py index ce757460ef..93e163c5cb 100644 --- a/ironic/common/driver_factory.py +++ b/ironic/common/driver_factory.py @@ -65,6 +65,12 @@ def get_driver(driver_name): raise exception.DriverNotFound(driver_name=driver_name) +def drivers(): + """Get all drivers as a dict name -> driver object.""" + factory = DriverFactory() + return {name: factory[name].obj for name in factory.names} + + class DriverFactory(object): """Discover, load and manage the drivers available.""" diff --git a/ironic/common/service.py b/ironic/common/service.py index cef568650e..3a7a7b2300 100644 --- a/ironic/common/service.py +++ b/ironic/common/service.py @@ -64,12 +64,12 @@ class RPCService(service.Service): def start(self): super(RPCService, self).start() admin_context = context.RequestContext('admin', 'admin', is_admin=True) + self.manager.init_host() self.tg.add_dynamic_timer( self.manager.periodic_tasks, periodic_interval_max=cfg.CONF.periodic_interval, context=admin_context) - self.manager.init_host() target = messaging.Target(topic=self.topic, server=self.host) endpoints = [self.manager] serializer = objects_base.IronicObjectSerializer() diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index 67c3c0a98f..9bc69de1b0 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -43,6 +43,7 @@ a change, etc. import collections import datetime +import inspect import tempfile import threading @@ -212,6 +213,16 @@ class ConductorManager(periodic_task.PeriodicTasks): LOG.error(msg, self.host) raise exception.NoDriversLoaded(conductor=self.host) + # Collect driver-specific periodic tasks + for driver_obj in driver_factory.drivers().values(): + self._collect_periodic_tasks(driver_obj) + for iface_name in (driver_obj.core_interfaces + + driver_obj.standard_interfaces + + ['vendor']): + iface = getattr(driver_obj, iface_name, None) + if iface: + self._collect_periodic_tasks(iface) + # clear all locks held by this conductor before registering self.dbapi.clear_node_reservations_for_conductor(self.host) try: @@ -248,6 +259,11 @@ class ConductorManager(periodic_task.PeriodicTasks): LOG.critical(_LC('Failed to start keepalive')) self.del_host() + def _collect_periodic_tasks(self, obj): + for n, method in inspect.getmembers(obj, inspect.ismethod): + if getattr(method, '_periodic_enabled', False): + self.add_periodic_task(method) + def del_host(self): self._keepalive_evt.set() try: diff --git a/ironic/drivers/base.py b/ironic/drivers/base.py index 65033c3f6d..56d42a016e 100644 --- a/ironic/drivers/base.py +++ b/ironic/drivers/base.py @@ -22,12 +22,14 @@ import collections import functools import inspect +import eventlet from oslo.utils import excutils import six from ironic.common import exception from ironic.common.i18n import _LE from ironic.openstack.common import log as logging +from ironic.openstack.common import periodic_task LOG = logging.getLogger(__name__) @@ -603,3 +605,40 @@ class ManagementInterface(object): } } """ + + +def driver_periodic_task(parallel=True, **other): + """Decorator for a driver-specific periodic task. + + Example:: + + class MyDriver(base.BaseDriver): + @base.driver_periodic_task(spacing=42) + def task(self, manager, context): + # do some job + + :param parallel: whether to run this task in a separate thread + :param other: arguments to pass to @periodic_task.periodic_task + """ + # TODO(dtantsur): drop all this magic once + # https://review.openstack.org/#/c/134303/ lands + semaphore = eventlet.semaphore.BoundedSemaphore() + + def decorator2(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + if parallel: + def _internal(): + with semaphore: + func(*args, **kwargs) + + eventlet.greenthread.spawn_n(_internal) + else: + func(*args, **kwargs) + + # NOTE(dtantsur): name should be unique + other.setdefault('name', '%s.%s' % (func.__module__, func.__name__)) + decorator = periodic_task.periodic_task(**other) + return decorator(wrapper) + + return decorator2 diff --git a/ironic/openstack/common/periodic_task.py b/ironic/openstack/common/periodic_task.py index bb15019e43..147f3d1083 100644 --- a/ironic/openstack/common/periodic_task.py +++ b/ironic/openstack/common/periodic_task.py @@ -11,13 +11,14 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import random import time from oslo.config import cfg import six -from ironic.openstack.common.gettextutils import _, _LE, _LI +from ironic.openstack.common._i18n import _, _LE, _LI from ironic.openstack.common import log as logging @@ -36,6 +37,11 @@ LOG = logging.getLogger(__name__) DEFAULT_INTERVAL = 60.0 +def list_opts(): + """Entry point for oslo-config-generator.""" + return [(None, copy.deepcopy(periodic_opts))] + + class InvalidPeriodicTaskArg(Exception): message = _("Unexpected argument for periodic task creation: %(arg)s.") @@ -49,14 +55,15 @@ def periodic_task(*args, **kwargs): interval of 60 seconds. 2. With arguments: - @periodic_task(spacing=N [, run_immediately=[True|False]]) + @periodic_task(spacing=N [, run_immediately=[True|False]] + [, name=[None|"string"]) this will be run on approximately every N seconds. If this number is negative the periodic task will be disabled. If the run_immediately argument is provided and has a value of 'True', the first run of the task will be shortly after task scheduler starts. If run_immediately is omitted or set to 'False', the first time the task runs will be approximately N seconds after the task scheduler - starts. + starts. If name is not provided, __name__ of function is used. """ def decorator(f): # Test for old style invocation @@ -70,6 +77,7 @@ def periodic_task(*args, **kwargs): f._periodic_enabled = False else: f._periodic_enabled = kwargs.pop('enabled', True) + f._periodic_name = kwargs.pop('name', f.__name__) # Control frequency f._periodic_spacing = kwargs.pop('spacing', 0) @@ -99,6 +107,36 @@ def periodic_task(*args, **kwargs): class _PeriodicTasksMeta(type): + def _add_periodic_task(cls, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + + :return: whether task was actually enabled + """ + name = task._periodic_name + + if task._periodic_spacing < 0: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'its interval is negative'), + {'task': name}) + return False + if not task._periodic_enabled: + LOG.info(_LI('Skipping periodic task %(task)s because ' + 'it is disabled'), + {'task': name}) + return False + + # A periodic spacing of zero indicates that this task should + # be run on the default interval to avoid running too + # frequently. + if task._periodic_spacing == 0: + task._periodic_spacing = DEFAULT_INTERVAL + + cls._periodic_tasks.append((name, task)) + cls._periodic_spacing[name] = task._periodic_spacing + return True + def __init__(cls, names, bases, dict_): """Metaclass that allows us to collect decorated periodic tasks.""" super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_) @@ -119,28 +157,7 @@ class _PeriodicTasksMeta(type): for value in cls.__dict__.values(): if getattr(value, '_periodic_task', False): - task = value - name = task.__name__ - - if task._periodic_spacing < 0: - LOG.info(_LI('Skipping periodic task %(task)s because ' - 'its interval is negative'), - {'task': name}) - continue - if not task._periodic_enabled: - LOG.info(_LI('Skipping periodic task %(task)s because ' - 'it is disabled'), - {'task': name}) - continue - - # A periodic spacing of zero indicates that this task should - # be run on the default interval to avoid running too - # frequently. - if task._periodic_spacing == 0: - task._periodic_spacing = DEFAULT_INTERVAL - - cls._periodic_tasks.append((name, task)) - cls._periodic_spacing[name] = task._periodic_spacing + cls._add_periodic_task(value) def _nearest_boundary(last_run, spacing): @@ -172,6 +189,15 @@ class PeriodicTasks(object): for name, task in self._periodic_tasks: self._periodic_last_run[name] = task._periodic_last_run + def add_periodic_task(self, task): + """Add a periodic task to the list of periodic tasks. + + The task should already be decorated by @periodic_task. + """ + if self.__class__._add_periodic_task(task): + self._periodic_last_run[task._periodic_name] = ( + task._periodic_last_run) + def run_periodic_tasks(self, context, raise_on_error=False): """Tasks to be run at a periodic interval.""" idle_for = DEFAULT_INTERVAL diff --git a/ironic/tests/conductor/test_manager.py b/ironic/tests/conductor/test_manager.py index cc8ad0a545..61d670eefc 100644 --- a/ironic/tests/conductor/test_manager.py +++ b/ironic/tests/conductor/test_manager.py @@ -198,6 +198,8 @@ class StartStopTestCase(_ServiceSetUpMixin, tests_db_base.DbTestCase): objects.Conductor.get_by_hostname, self.context, self.hostname) + @mock.patch.object(driver_factory.DriverFactory, '__getitem__', + lambda *args: mock.MagicMock()) def test_start_registers_driver_names(self): init_names = ['fake1', 'fake2'] restart_names = ['fake3', 'fake4'] @@ -220,6 +222,47 @@ class StartStopTestCase(_ServiceSetUpMixin, tests_db_base.DbTestCase): self.hostname) self.assertEqual(restart_names, res['drivers']) + @mock.patch.object(driver_factory.DriverFactory, '__getitem__') + def test_start_registers_driver_specific_tasks(self, get_mock): + init_names = ['fake1'] + expected_task_name = 'ironic.tests.conductor.test_manager.task' + expected_task_name2 = 'ironic.tests.conductor.test_manager.iface' + self.config(enabled_drivers=init_names) + + class TestInterface(object): + @drivers_base.driver_periodic_task(spacing=100500) + def iface(self): + pass + + class Driver(object): + core_interfaces = [] + standard_interfaces = ['iface'] + + iface = TestInterface() + + @drivers_base.driver_periodic_task(spacing=42) + def task(self, context): + pass + + obj = Driver() + self.assertTrue(obj.task._periodic_enabled) + get_mock.return_value = mock.Mock(obj=obj) + + with mock.patch.object( + driver_factory.DriverFactory()._extension_manager, + 'names') as mock_names: + mock_names.return_value = init_names + self._start_service() + tasks = dict(self.service._periodic_tasks) + self.assertEqual(obj.task, tasks[expected_task_name]) + self.assertEqual(obj.iface.iface, tasks[expected_task_name2]) + self.assertEqual(42, + self.service._periodic_spacing[expected_task_name]) + self.assertEqual(100500, + self.service._periodic_spacing[expected_task_name2]) + self.assertIn(expected_task_name, self.service._periodic_last_run) + self.assertIn(expected_task_name2, self.service._periodic_last_run) + @mock.patch.object(driver_factory.DriverFactory, '__init__') def test_start_fails_on_missing_driver(self, mock_df): mock_df.side_effect = exception.DriverNotFound('test') diff --git a/ironic/tests/drivers/test_base.py b/ironic/tests/drivers/test_base.py index 1f4f35f139..7aa84ca7ec 100644 --- a/ironic/tests/drivers/test_base.py +++ b/ironic/tests/drivers/test_base.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import eventlet import mock from ironic.common import exception @@ -65,3 +66,35 @@ class PassthruDecoratorTestCase(base.TestCase): self.fvi.normalexception, mock.ANY) driver_base.LOG.exception.assert_called_with( mock.ANY, 'normalexception') + + +@mock.patch.object(eventlet.greenthread, 'spawn_n', + side_effect=lambda func, *args, **kw: func(*args, **kw)) +class DriverPeriodicTaskTestCase(base.TestCase): + def test(self, spawn_mock): + method_mock = mock.Mock() + function_mock = mock.Mock() + + class TestClass(object): + @driver_base.driver_periodic_task(spacing=42) + def method(self, foo, bar=None): + method_mock(foo, bar=bar) + + @driver_base.driver_periodic_task(spacing=100, parallel=False) + def function(): + function_mock() + + obj = TestClass() + self.assertEqual(42, obj.method._periodic_spacing) + self.assertTrue(obj.method._periodic_task) + self.assertEqual('ironic.tests.drivers.test_base.method', + obj.method._periodic_name) + self.assertEqual('ironic.tests.drivers.test_base.function', + function._periodic_name) + + obj.method(1, bar=2) + method_mock.assert_called_once_with(1, bar=2) + self.assertEqual(1, spawn_mock.call_count) + function() + function_mock.assert_called_once_with() + self.assertEqual(1, spawn_mock.call_count)