Add periodic task to rebuild conductor local state

This adds a periodic task which can rebuild the conductor's local state
(PXE config files, etc) when conductors join or leave the cluster.

For any node which is newly mapped to the conductor, this will
trigger calling prepare() and take_over() on that node's deploy
interface.

This uses the periodic_max_worker setting like other periodic jobs,
starting the take over process in separate threads. Thus, in a large
cluster, it may take some time for all nodes to settle down.
It also adds a new CONF option to control the timing of this job.

There is a lot of room for improvement and optimization in this, however
getting a fix in place is critical to the Juno release.

NOTE: This does not re-establish any console sessions.

Co-Authored-By: Lucas Alvares Gomes <lucasagomes@gmail.com>
Change-Id: I0dbe9a5a98ec5fd0c69f32d7590d8141da5a23c2
Closes-bug: #1279331
This commit is contained in:
Devananda van der Veen 2014-09-27 18:41:46 -07:00
parent 6b64010f46
commit 552a927e56
5 changed files with 307 additions and 31 deletions

@ -573,6 +573,14 @@
# meaning send all the sensor data. (list value)
#send_sensor_data_types=ALL
# When conductors join or leave the cluster, existing
# conductors may need to update any persistent local state as
# nodes are moved around the cluster. This option controls how
# often, in seconds, each conductor will check for nodes that
# it should "take over". Set it to a negative value to disable
# the check entirely. (integer value)
#sync_local_state_interval=180
[console]

@ -38,6 +38,33 @@ def _is_apiv3(auth_url, auth_version):
return auth_version == 'v3.0' or '/v3' in parse.urlparse(auth_url).path
def _get_ksclient():
auth_url = CONF.keystone_authtoken.auth_uri
if not auth_url:
raise exception.CatalogFailure(_('Keystone API endpoint is missing'))
auth_version = CONF.keystone_authtoken.auth_version
api_v3 = _is_apiv3(auth_url, auth_version)
if api_v3:
from keystoneclient.v3 import client
else:
from keystoneclient.v2_0 import client
auth_url = get_keystone_url(auth_url, auth_version)
try:
return client.Client(username=CONF.keystone_authtoken.admin_user,
password=CONF.keystone_authtoken.admin_password,
tenant_name=CONF.keystone_authtoken.admin_tenant_name,
auth_url=auth_url)
except ksexception.Unauthorized:
raise exception.CatalogUnauthorized
except ksexception.AuthorizationFailure as err:
raise exception.CatalogFailure(_('Could not perform authorization '
'process for service catalog: %s')
% err)
def get_keystone_url(auth_url, auth_version):
"""Gives an http/https url to contact keystone.
@ -66,31 +93,7 @@ def get_service_url(service_type='baremetal', endpoint_type='internal'):
:param endpoint_type: the type of endpoint for the service.
:returns: an http/https url for the desired endpoint.
"""
auth_url = CONF.keystone_authtoken.auth_uri
if not auth_url:
raise exception.CatalogFailure(_('Keystone API endpoint is missing'))
auth_version = CONF.keystone_authtoken.auth_version
api_v3 = _is_apiv3(auth_url, auth_version)
if api_v3:
from keystoneclient.v3 import client
else:
from keystoneclient.v2_0 import client
auth_url = get_keystone_url(auth_url, auth_version)
try:
ksclient = client.Client(username=CONF.keystone_authtoken.admin_user,
password=CONF.keystone_authtoken.admin_password,
tenant_name=CONF.keystone_authtoken.admin_tenant_name,
auth_url=auth_url)
except ksexception.Unauthorized:
raise exception.CatalogUnauthorized
except ksexception.AuthorizationFailure as err:
raise exception.CatalogFailure(_('Could not perform authorization '
'process for service catalog: %s')
% err)
ksclient = _get_ksclient()
if not ksclient.has_service_catalog():
raise exception.CatalogFailure(_('No keystone service catalog loaded'))
@ -103,3 +106,9 @@ def get_service_url(service_type='baremetal', endpoint_type='internal'):
endpoint_type=endpoint_type)
return endpoint
def get_admin_auth_token():
"""Get an admin auth_token from the Keystone."""
ksclient = _get_ksclient()
return ksclient.auth_token

@ -61,6 +61,7 @@ from ironic.common.i18n import _LC
from ironic.common.i18n import _LE
from ironic.common.i18n import _LI
from ironic.common.i18n import _LW
from ironic.common import keystone
from ironic.common import rpc
from ironic.common import states
from ironic.common import utils as ironic_utils
@ -68,6 +69,7 @@ from ironic.conductor import task_manager
from ironic.conductor import utils
from ironic.db import api as dbapi
from ironic import objects
from ironic.openstack.common import context as ironic_context
from ironic.openstack.common import lockutils
from ironic.openstack.common import log
from ironic.openstack.common import periodic_task
@ -141,6 +143,15 @@ conductor_opts = [
' sent to Ceilometer. The default value, "ALL", is a '
'special value meaning send all the sensor data.'
),
cfg.IntOpt('sync_local_state_interval',
default=180,
help='When conductors join or leave the cluster, existing '
'conductors may need to update any persistent '
'local state as nodes are moved around the cluster. '
'This option controls how often, in seconds, each '
'conductor will check for nodes that it should '
'"take over". Set it to a negative value to disable '
'the check entirely.'),
]
CONF = cfg.CONF
@ -862,15 +873,72 @@ class ConductorManager(periodic_task.PeriodicTasks):
if workers_count == CONF.conductor.periodic_max_workers:
break
def rebalance_node_ring(self):
"""Perform any actions necessary when rebalancing the consistent hash.
def _do_takeover(self, task):
LOG.debug(('Conductor %(cdr)s taking over node %(node)s'),
{'cdr': self.host, 'node': task.node.uuid})
task.driver.deploy.prepare(task)
task.driver.deploy.take_over(task)
# NOTE(lucasagomes): Set the ID of the new conductor managing
# this node
task.node.conductor_affinity = self.conductor.id
task.node.save()
This may trigger several actions, such as calling driver.deploy.prepare
for nodes which are now mapped to this conductor.
@periodic_task.periodic_task(
spacing=CONF.conductor.sync_local_state_interval)
def _sync_local_state(self, context):
"""Perform any actions necessary to sync local state.
This is called periodically to refresh the conductor's copy of the
consistent hash ring. If any mappings have changed, this method then
determines which, if any, nodes need to be "taken over".
The ensuing actions could include preparing a PXE environment,
updating the DHCP server, and so on.
"""
# TODO(deva): implement this
pass
self.ring_manager.reset()
filters = {'reserved': False,
'maintenance': False,
'provision_state': states.ACTIVE}
columns = ['id', 'uuid', 'driver', 'conductor_affinity']
node_list = self.dbapi.get_nodeinfo_list(
columns=columns,
filters=filters)
admin_context = None
workers_count = 0
for node_id, node_uuid, driver, conductor_affinity in node_list:
if not self._mapped_to_this_conductor(node_uuid, driver):
continue
if conductor_affinity == self.conductor.id:
continue
# NOTE(lucasagomes): The context provided by the periodic task
# will make the glance client to fail with an 401 (Unauthorized)
# so we have to use the admin_context with an admin auth_token
if not admin_context:
admin_context = ironic_context.get_admin_context()
admin_context.auth_token = keystone.get_admin_auth_token()
# Node is mapped here, but not updated by this conductor last
try:
with task_manager.acquire(admin_context, node_id) as task:
# NOTE(deva): now that we have the lock, check again to
# avoid racing with deletes and other state changes
node = task.node
if (node.maintenance or
node.conductor_affinity == self.conductor.id or
node.provision_state != states.ACTIVE):
continue
task.spawn_after(self._spawn_worker,
self._do_takeover, task)
except exception.NoFreeConductorWorker:
break
except (exception.NodeLocked, exception.NodeNotFound):
continue
workers_count += 1
if workers_count == CONF.conductor.periodic_max_workers:
break
def _mapped_to_this_conductor(self, node_uuid, driver):
"""Check that node is mapped to this conductor.

@ -26,6 +26,7 @@ from oslo import messaging
from ironic.common import boot_devices
from ironic.common import driver_factory
from ironic.common import exception
from ironic.common import keystone
from ironic.common import states
from ironic.common import utils as ironic_utils
from ironic.conductor import manager
@ -34,6 +35,7 @@ from ironic.conductor import utils as conductor_utils
from ironic.db import api as dbapi
from ironic.drivers import base as drivers_base
from ironic import objects
from ironic.openstack.common import context
from ironic.tests import base as tests_base
from ironic.tests.conductor import utils as mgr_utils
from ironic.tests.db import base as tests_db_base
@ -2292,3 +2294,185 @@ class ManagerTestProperties(tests_db_base.DbTestCase):
self.context, "bad-driver")
# Compare true exception hidden by @messaging.expected_exceptions
self.assertEqual(exception.DriverNotFound, exc.exc_info[0])
@mock.patch.object(keystone, 'get_admin_auth_token')
@mock.patch.object(task_manager, 'acquire')
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
@mock.patch.object(dbapi.IMPL, 'get_nodeinfo_list')
class ManagerSyncLocalStateTestCase(_CommonMixIn, tests_db_base.DbTestCase):
def setUp(self):
super(ManagerSyncLocalStateTestCase, self).setUp()
self.dbapi = dbapi.get_instance()
self.service = manager.ConductorManager('hostname', 'test-topic')
self.service.conductor = mock.Mock()
self.service.dbapi = self.dbapi
self.service.ring_manager = mock.Mock()
self.node = self._create_node(provision_state=states.ACTIVE)
self.task = self._create_task(node=self.node)
self.filters = {'reserved': False,
'maintenance': False,
'provision_state': states.ACTIVE}
self.columns = ['id', 'uuid', 'driver', 'conductor_affinity']
def _assert_get_nodeinfo_args(self, get_nodeinfo_mock):
get_nodeinfo_mock.assert_called_once_with(
columns=self.columns, filters=self.filters)
def test_not_mapped(self, get_nodeinfo_mock, mapped_mock, acquire_mock,
get_authtoken_mock):
get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
mapped_mock.return_value = False
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
self.assertFalse(acquire_mock.called)
self.assertFalse(get_authtoken_mock.called)
self.service.ring_manager.reset.assert_called_once_with()
def test_already_mapped(self, get_nodeinfo_mock, mapped_mock,
acquire_mock, get_authtoken_mock):
# Node is already mapped to the conductor running the periodic task
self.node.conductor_affinity = 123
self.service.conductor.id = 123
get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
mapped_mock.return_value = True
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
self.assertFalse(acquire_mock.called)
self.assertFalse(get_authtoken_mock.called)
self.service.ring_manager.reset.assert_called_once_with()
@mock.patch.object(context, 'get_admin_context')
def test_good(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
acquire_mock, get_authtoken_mock):
get_ctx_mock.return_value = self.context
get_nodeinfo_mock.return_value = self._get_nodeinfo_list_response()
mapped_mock.return_value = True
acquire_mock.side_effect = self._get_acquire_side_effect(self.task)
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
get_authtoken_mock.assert_called_once_with()
acquire_mock.assert_called_once_with(self.context, self.node.id)
# assert spawn_after has been called
self.task.spawn_after.assert_called_once_with(
self.service._spawn_worker,
self.service._do_takeover, self.task)
@mock.patch.object(context, 'get_admin_context')
def test_no_free_worker(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
acquire_mock, get_authtoken_mock):
get_ctx_mock.return_value = self.context
mapped_mock.return_value = True
acquire_mock.side_effect = \
self._get_acquire_side_effect([self.task] * 3)
self.task.spawn_after.side_effect = \
[None, exception.NoFreeConductorWorker('error')]
# 3 nodes to be checked
get_nodeinfo_mock.return_value = \
self._get_nodeinfo_list_response([self.node] * 3)
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
# assert _mapped_to_this_conductor() gets called 2 times only
# instead of 3. When NoFreeConductorWorker is raised the loop
# should be broken
expected = [mock.call(self.node.uuid, self.node.driver)] * 2
self.assertEqual(expected, mapped_mock.call_args_list)
# assert acquire() gets called 2 times only instead of 3. When
# NoFreeConductorWorker is raised the loop should be broken
expected = [mock.call(self.context, self.node.id)] * 2
self.assertEqual(expected, acquire_mock.call_args_list)
# Only one auth token needed for all runs
get_authtoken_mock.assert_called_once_with()
# assert spawn_after has been called twice
expected = [mock.call(self.service._spawn_worker,
self.service._do_takeover, self.task)] * 2
self.assertEqual(expected, self.task.spawn_after.call_args_list)
@mock.patch.object(context, 'get_admin_context')
def test_node_locked(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
acquire_mock, get_authtoken_mock):
get_ctx_mock.return_value = self.context
mapped_mock.return_value = True
acquire_mock.side_effect = self._get_acquire_side_effect(
[self.task, exception.NodeLocked('error'), self.task])
self.task.spawn_after.side_effect = [None, None]
# 3 nodes to be checked
get_nodeinfo_mock.return_value = \
self._get_nodeinfo_list_response([self.node] * 3)
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
# assert _mapped_to_this_conductor() gets called 3 times
expected = [mock.call(self.node.uuid, self.node.driver)] * 3
self.assertEqual(expected, mapped_mock.call_args_list)
# assert acquire() gets called 3 times
expected = [mock.call(self.context, self.node.id)] * 3
self.assertEqual(expected, acquire_mock.call_args_list)
# Only one auth token needed for all runs
get_authtoken_mock.assert_called_once_with()
# assert spawn_after has been called only 2 times
expected = [mock.call(self.service._spawn_worker,
self.service._do_takeover, self.task)] * 2
self.assertEqual(expected, self.task.spawn_after.call_args_list)
@mock.patch.object(context, 'get_admin_context')
def test_worker_limit(self, get_ctx_mock, get_nodeinfo_mock, mapped_mock,
acquire_mock, get_authtoken_mock):
# Limit to only 1 worker
self.config(periodic_max_workers=1, group='conductor')
get_ctx_mock.return_value = self.context
mapped_mock.return_value = True
acquire_mock.side_effect = \
self._get_acquire_side_effect([self.task] * 3)
self.task.spawn_after.side_effect = [None] * 3
# 3 nodes to be checked
get_nodeinfo_mock.return_value = \
self._get_nodeinfo_list_response([self.node] * 3)
self.service._sync_local_state(self.context)
self._assert_get_nodeinfo_args(get_nodeinfo_mock)
# assert _mapped_to_this_conductor() gets called only once
# because of the worker limit
mapped_mock.assert_called_once_with(self.node.uuid, self.node.driver)
# assert acquire() gets called only once because of the worker limit
acquire_mock.assert_called_once_with(self.context, self.node.id)
# Only one auth token needed for all runs
get_authtoken_mock.assert_called_once_with()
# assert spawn_after has been called
self.task.spawn_after.assert_called_once_with(
self.service._spawn_worker,
self.service._do_takeover, self.task)

@ -110,3 +110,10 @@ class KeystoneTestCase(base.TestCase):
mock_ks.assert_called_once_with(username='fake', password='fake',
tenant_name='fake',
auth_url=expected_url)
@mock.patch('keystoneclient.v2_0.client.Client')
def test_get_admin_auth_token(self, mock_ks):
fake_client = FakeClient()
fake_client.auth_token = '123456'
mock_ks.return_value = fake_client
self.assertEqual('123456', keystone.get_admin_auth_token())