diff --git a/openstack/_adapter.py b/openstack/_adapter.py index dfe2e3691..7f5da392b 100644 --- a/openstack/_adapter.py +++ b/openstack/_adapter.py @@ -116,11 +116,18 @@ class OpenStackSDKAdapter(adapter.Adapter): This allows using the nodepool MultiThreaded Rate Limiting TaskManager. """ - def __init__(self, session=None, task_manager=None, *args, **kwargs): + def __init__( + self, session=None, + task_manager=None, + rate_limit=None, concurrency=None, + *args, **kwargs): super(OpenStackSDKAdapter, self).__init__( session=session, *args, **kwargs) if not task_manager: - task_manager = _task_manager.TaskManager(name=self.service_type) + task_manager = _task_manager.TaskManager( + name=self.service_type, + rate=rate_limit, + workers=concurrency) task_manager.start() self.task_manager = task_manager @@ -143,6 +150,7 @@ class OpenStackSDKAdapter(adapter.Adapter): ret = self.task_manager.submit_function( request_method, run_async=True, name=name, connect_retries=connect_retries, raise_exc=raise_exc, + tag=self.service_type, **kwargs) if run_async: return ret diff --git a/openstack/config/cloud_region.py b/openstack/config/cloud_region.py index 1bb421d84..7568673bf 100644 --- a/openstack/config/cloud_region.py +++ b/openstack/config/cloud_region.py @@ -15,7 +15,6 @@ import copy import warnings -from keystoneauth1 import adapter from keystoneauth1 import discover import keystoneauth1.exceptions.catalog from keystoneauth1 import session as ks_session @@ -23,6 +22,7 @@ import os_service_types import requestsexceptions from six.moves import urllib +from openstack import _adapter from openstack import version as openstack_version from openstack import _log from openstack.config import _util @@ -247,6 +247,17 @@ class CloudRegion(object): value = converter(value) return value + def _get_service_config(self, key, service_type): + config_dict = self.config.get(key) + if not config_dict: + return None + if not isinstance(config_dict, dict): + return config_dict + + for st in self._service_type_manager.get_all_types(service_type): + if st in config_dict: + return config_dict[st] + def get_interface(self, service_type=None): return self._get_config( 'interface', service_type, fallback_to_unprefixed=True) @@ -438,7 +449,8 @@ class CloudRegion(object): return interface_versions.get(service_type, []) def get_session_client( - self, service_type, version=None, constructor=adapter.Adapter, + self, service_type, version=None, + constructor=_adapter.OpenStackSDKAdapter, **kwargs): """Return a prepped keystoneauth Adapter for a given service. @@ -498,6 +510,8 @@ class CloudRegion(object): max_version=max_api_version, endpoint_override=endpoint_override, default_microversion=version_request.default_microversion, + rate_limit=self.get_rate_limit(service_type), + concurrency=self.get_concurrency(service_type), **kwargs) if version_request.default_microversion: default_microversion = version_request.default_microversion @@ -724,3 +738,11 @@ class CloudRegion(object): def get_password_callback(self): return self._password_callback + + def get_rate_limit(self, service_type=None): + return self._get_service_config( + 'rate_limit', service_type=service_type) + + def get_concurrency(self, service_type=None): + return self._get_service_config( + 'concurrency', service_type=service_type) diff --git a/openstack/connection.py b/openstack/connection.py index 945283598..ad094319e 100644 --- a/openstack/connection.py +++ b/openstack/connection.py @@ -220,6 +220,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, strict=False, use_direct_get=False, task_manager=None, + rate_limit=None, **kwargs): """Create a connection to a cloud. @@ -262,6 +263,12 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, Defaults to None which causes a direct-action Task Manager to be used. :type manager: :class:`~openstack.task_manager.TaskManager` + :param rate_limit: + Client-side rate limit, expressed in calls per second. The + parameter can either be a single float, or it can be a dict with + keys as service-type and values as floats expressing the calls + per second for that service. Defaults to None, which means no + rate-limiting is performed. :param kwargs: If a config is not provided, the rest of the parameters provided are assumed to be arguments to be passed to the CloudRegion contructor. @@ -294,7 +301,8 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta, self.task_manager = task_manager else: self.task_manager = _task_manager.TaskManager( - self.config.full_name) + self.config.full_name, + rate=rate_limit) self.task_manager.start() self._session = None diff --git a/openstack/task_manager.py b/openstack/task_manager.py index 6a58bb06a..37c5ed1ee 100644 --- a/openstack/task_manager.py +++ b/openstack/task_manager.py @@ -45,7 +45,9 @@ class Task(object): the main payload at execution time. """ - def __init__(self, main=None, name=None, run_async=False, *args, **kwargs): + def __init__( + self, main=None, name=None, run_async=False, + tag=None, *args, **kwargs): self._exception = None self._traceback = None self._result = None @@ -56,6 +58,7 @@ class Task(object): self.args = args self.kwargs = kwargs self.name = name or type(self).__name__ + self.tag = tag def main(self): return self._main(*self.args, **self.kwargs) @@ -103,12 +106,22 @@ class TaskManager(object): self.daemon = True self.queue = queue.Queue() self._running = True - if rate is not None: - rate = float(rate) - self.rate = rate + if isinstance(rate, dict): + self._waits = {} + for (k, v) in rate.items(): + if v: + self._waits[k] = 1.0 / v + else: + if rate: + self._waits = {None: 1.0 / rate} + else: + self._waits = {} self._thread = threading.Thread(name=name, target=self.run) self._thread.daemon = True + def _get_wait(self, tag): + return self._waits.get(tag, self._waits.get(None)) + @property def executor(self): if not self._executor: @@ -129,7 +142,7 @@ class TaskManager(object): self._thread.join() def run(self): - last_ts = 0 + last_ts_dict = {} try: while True: task = self.queue.get() @@ -137,12 +150,15 @@ class TaskManager(object): if not self._running: break continue - if self.rate: + wait = self._get_wait(task.tag) + if wait: + last_ts = last_ts_dict.get(task.tag, 0) while True: delta = time.time() - last_ts - if delta >= self.rate: + if delta >= wait: break - time.sleep(self.rate - delta) + time.sleep(wait - delta) + last_ts_dict[task.tag] = time.time() self._log.debug( "TaskManager {name} queue size: {size})".format( name=self.name, @@ -171,12 +187,14 @@ class TaskManager(object): return task.wait() def submit_function( - self, method, name=None, run_async=False, *args, **kwargs): + self, method, name=None, run_async=False, tag=None, + *args, **kwargs): """ Allows submitting an arbitrary method for work. :param method: Callable to run in the TaskManager. :param str name: Name to use for the generated Task object. :param bool run_async: Whether to run this task async or not. + :param str tag: Named rate-limiting context for the task. :param args: positional arguments to pass to the method when it runs. :param kwargs: keyword arguments to pass to the method when it runs. """ @@ -185,10 +203,12 @@ class TaskManager(object): self.executor.submit, method, *args, **kwargs) task = Task( main=payload, name=name, - run_async=run_async) + run_async=run_async, + tag=tag) else: task = Task( main=method, name=name, + tag=tag, *args, **kwargs) return self.submit_task(task) diff --git a/openstack/tests/unit/base.py b/openstack/tests/unit/base.py index 8062486bf..993346772 100644 --- a/openstack/tests/unit/base.py +++ b/openstack/tests/unit/base.py @@ -124,6 +124,8 @@ class TestCase(base.TestCase): self.strict_cloud = openstack.connection.Connection( config=self.cloud_config, strict=True) + self.addCleanup(self.cloud.task_manager.stop) + self.addCleanup(self.strict_cloud.task_manager.stop) # FIXME(notmorgan): Convert the uri_registry, discovery.json, and # use of keystone_v3/v2 to a proper fixtures.Fixture. For now this diff --git a/openstack/tests/unit/cloud/test_task_manager.py b/openstack/tests/unit/cloud/test_task_manager.py index 6c4b19c0c..9f471a208 100644 --- a/openstack/tests/unit/cloud/test_task_manager.py +++ b/openstack/tests/unit/cloud/test_task_manager.py @@ -63,6 +63,25 @@ class TaskTestSet(task_manager.Task): return set([1, 2]) +class TestRateTransforms(base.TestCase): + + def test_rate_parameter_scalar(self): + manager = task_manager.TaskManager(name='test', rate=0.1234) + self.assertEqual(1 / 0.1234, manager._get_wait('compute')) + self.assertEqual(1 / 0.1234, manager._get_wait(None)) + + def test_rate_parameter_dict(self): + manager = task_manager.TaskManager( + name='test', + rate={ + 'compute': 20, + 'network': 10, + }) + self.assertEqual(1 / 20, manager._get_wait('compute')) + self.assertEqual(1 / 10, manager._get_wait('network')) + self.assertIsNone(manager._get_wait('object-store')) + + class TestTaskManager(base.TestCase): def setUp(self): diff --git a/openstack/tests/unit/test_connection.py b/openstack/tests/unit/test_connection.py index a14e5af2d..2605525d7 100644 --- a/openstack/tests/unit/test_connection.py +++ b/openstack/tests/unit/test_connection.py @@ -84,6 +84,22 @@ class TestConnection(base.TestCase): self.assertEqual(mock_session, conn.session) self.assertEqual('auth.example.com', conn.config.name) + def test_task_manager_rate_scalar(self): + conn = connection.Connection(cloud='sample', rate_limit=20) + self.assertEqual(1 / 20, conn.task_manager._get_wait('object-store')) + self.assertEqual(1 / 20, conn.task_manager._get_wait(None)) + + def test_task_manager_rate_dict(self): + conn = connection.Connection( + cloud='sample', + rate_limit={ + 'compute': 20, + 'network': 10, + }) + self.assertEqual(1 / 20, conn.task_manager._get_wait('compute')) + self.assertEqual(1 / 10, conn.task_manager._get_wait('network')) + self.assertIsNone(conn.task_manager._get_wait('object-store')) + def test_create_session(self): conn = connection.Connection(cloud='sample') self.assertIsNotNone(conn) diff --git a/releasenotes/notes/expose-client-side-rate-limit-ddb82df7cb92091c.yaml b/releasenotes/notes/expose-client-side-rate-limit-ddb82df7cb92091c.yaml new file mode 100644 index 000000000..3d7b503f3 --- /dev/null +++ b/releasenotes/notes/expose-client-side-rate-limit-ddb82df7cb92091c.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Client-side rate limiting is now directly exposed via ``rate_limit`` + and ``concurrency`` parameters. A single value can be given that applies + to all services, or a dict of service-type and value if different + client-side rate or concurrency limits should be used for different + services.