From 715fb3db7667205dbcdf361e4ccf51f585fa19dc Mon Sep 17 00:00:00 2001 From: Arnaud M Date: Thu, 24 Apr 2025 21:57:13 +0200 Subject: [PATCH] Remove kombu RPC driver Since last release, we decided to get rid of kombu RPC driver in mistral. Kombu RPC driver was experimental and not widely used. Having only one driver will simplify global mistral maintenance. Change-Id: Ia847d4383af54fcc33f039ddd6dc122c58b1f126 Signed-off-by: Arnaud M --- .zuul.yaml | 2 - mistral/config.py | 13 - mistral/rpc/base.py | 13 +- mistral/rpc/kombu/__init__.py | 0 mistral/rpc/kombu/base.py | 149 --------- mistral/rpc/kombu/examples/__init__.py | 0 mistral/rpc/kombu/examples/client.py | 44 --- mistral/rpc/kombu/examples/server.py | 53 --- mistral/rpc/kombu/kombu_client.py | 209 ------------ mistral/rpc/kombu/kombu_hosts.py | 35 -- mistral/rpc/kombu/kombu_listener.py | 127 ------- mistral/rpc/kombu/kombu_server.py | 281 ---------------- mistral/tests/unit/rpc/kombu/__init__.py | 0 mistral/tests/unit/rpc/kombu/base.py | 28 -- mistral/tests/unit/rpc/kombu/fake_kombu.py | 48 --- .../tests/unit/rpc/kombu/test_kombu_client.py | 100 ------ .../tests/unit/rpc/kombu/test_kombu_hosts.py | 101 ------ .../unit/rpc/kombu/test_kombu_listener.py | 219 ------------ .../tests/unit/rpc/kombu/test_kombu_server.py | 311 ------------------ .../remove-kombu-rpc-28a616c55fa051b5.yaml | 5 + requirements.txt | 1 - setup.cfg | 9 - 22 files changed, 9 insertions(+), 1739 deletions(-) delete mode 100644 mistral/rpc/kombu/__init__.py delete mode 100644 mistral/rpc/kombu/base.py delete mode 100644 mistral/rpc/kombu/examples/__init__.py delete mode 100644 mistral/rpc/kombu/examples/client.py delete mode 100644 mistral/rpc/kombu/examples/server.py delete mode 100644 mistral/rpc/kombu/kombu_client.py delete mode 100644 mistral/rpc/kombu/kombu_hosts.py delete mode 100644 mistral/rpc/kombu/kombu_listener.py delete mode 100644 mistral/rpc/kombu/kombu_server.py delete mode 100644 mistral/tests/unit/rpc/kombu/__init__.py delete mode 100644 mistral/tests/unit/rpc/kombu/base.py delete mode 100644 mistral/tests/unit/rpc/kombu/fake_kombu.py delete mode 100644 mistral/tests/unit/rpc/kombu/test_kombu_client.py delete mode 100644 mistral/tests/unit/rpc/kombu/test_kombu_hosts.py delete mode 100644 mistral/tests/unit/rpc/kombu/test_kombu_listener.py delete mode 100644 mistral/tests/unit/rpc/kombu/test_kombu_server.py create mode 100644 releasenotes/notes/remove-kombu-rpc-28a616c55fa051b5.yaml diff --git a/.zuul.yaml b/.zuul.yaml index f77ec0409..219efb3a6 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -111,7 +111,6 @@ - mistral-devstack-tempest-ipv6-only - mistral-devstack-non-apache-tempest-ipv6-only - mistral-devstack-non-apache - - mistral-devstack-kombu # Disable mysql / postgresql units as they are not working as expected # - mistral-tox-unit-mysql # - mistral-tox-unit-postgresql @@ -123,7 +122,6 @@ - mistral-devstack-tempest-ipv6-only - mistral-devstack-non-apache-tempest-ipv6-only - mistral-devstack-non-apache - - mistral-devstack-kombu # Disable mysql / postgresql units as they are not working as expected # - mistral-tox-unit-mysql # - mistral-tox-unit-postgresql diff --git a/mistral/config.py b/mistral/config.py index 69f30b2c6..533dbd7d7 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -193,17 +193,6 @@ js_impl_opt = cfg.StrOpt( 'action to evaluate scripts.') ) -rpc_impl_opt = cfg.StrOpt( - 'rpc_implementation', - default='oslo', - choices=['oslo', 'kombu'], - deprecated_for_removal=True, - deprecated_reason='Kombu driver is deprecated and will be removed ' - 'in the F release cycle', - help=_('Specifies RPC implementation for RPC client and server. ' - 'Support of kombu driver is experimental.') -) - oslo_rpc_executor = cfg.StrOpt( 'oslo_rpc_executor', default='threading', @@ -823,7 +812,6 @@ CONF.register_opt(wf_trace_log_name_opt) CONF.register_opt(auth_type_opt) CONF.register_opt(scheduler_type_opt) CONF.register_opt(js_impl_opt) -CONF.register_opt(rpc_impl_opt) CONF.register_opt(oslo_rpc_executor) CONF.register_opt(expiration_token_duration) CONF.register_opts(service_opts.service_opts) @@ -868,7 +856,6 @@ default_group_opts = CLI_OPTS + [ auth_type_opt, scheduler_type_opt, js_impl_opt, - rpc_impl_opt, oslo_rpc_executor, expiration_token_duration ] diff --git a/mistral/rpc/base.py b/mistral/rpc/base.py index dcc2defee..337dd019c 100644 --- a/mistral/rpc/base.py +++ b/mistral/rpc/base.py @@ -42,9 +42,6 @@ def cleanup(): _TRANSPORT = None -# TODO(rakhmerov): This method seems misplaced. Now we have different kind -# of transports (oslo, kombu) and this module should not have any oslo -# specific things anymore. def get_transport(): global _TRANSPORT @@ -54,27 +51,25 @@ def get_transport(): return _TRANSPORT +# TODO(amorin) maybe refactor this since we have only one impl now def get_rpc_server_driver(): - rpc_impl = cfg.CONF.rpc_implementation - global _IMPL_SERVER if not _IMPL_SERVER: _IMPL_SERVER = driver.DriverManager( 'mistral.rpc.backends', - '%s_server' % rpc_impl + 'oslo_server' ).driver return _IMPL_SERVER +# TODO(amorin) maybe refactor this since we have only one impl now def get_rpc_client_driver(): - rpc_impl = cfg.CONF.rpc_implementation - global _IMPL_CLIENT if not _IMPL_CLIENT: _IMPL_CLIENT = driver.DriverManager( 'mistral.rpc.backends', - '%s_client' % rpc_impl + 'oslo_client' ).driver return _IMPL_CLIENT diff --git a/mistral/rpc/kombu/__init__.py b/mistral/rpc/kombu/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mistral/rpc/kombu/base.py b/mistral/rpc/kombu/base.py deleted file mode 100644 index 64832a9eb..000000000 --- a/mistral/rpc/kombu/base.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import kombu - -from mistral_lib import serialization as mistral_serialization -import oslo_messaging as messaging - -from mistral import config as cfg -from mistral import exceptions as exc - -IS_RECEIVED = 'kombu_rpc_is_received' -RESULT = 'kombu_rpc_result' -CORR_ID = 'kombu_rpc_correlation_id' -TYPE = 'kombu_rpc_type' - - -CONF = cfg.CONF - - -def set_transport_options(check_backend=True): - # We can be sure that all needed transport options are registered - # only if we at least once called method get_transport(). Because - # this is the method that registers them. - messaging.get_transport(CONF) - - backend = messaging.TransportURL.parse(CONF, CONF.transport_url).transport - - if check_backend and backend not in ['rabbit', 'kombu']: - raise exc.MistralException("Unsupported backend: %s" % backend) - - -class Base(object): - """Base class for Client and Server.""" - def __init__(self): - self.serializer = None - - @staticmethod - def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password, - amqp_vhost): - """Create connection. - - This method creates object representing the connection to RabbitMQ. - - :param amqp_host: Address of RabbitMQ server. - :param amqp_user: Username for connecting to RabbitMQ. - :param amqp_password: Password matching the given username. - :param amqp_vhost: Virtual host to connect to. - :param amqp_port: Port of RabbitMQ server. - :return: New connection to RabbitMQ. - """ - return kombu.BrokerConnection( - hostname=amqp_host, - userid=amqp_user, - password=amqp_password, - virtual_host=amqp_vhost, - port=amqp_port, - transport_options={'confirm_publish': True} - ) - - @staticmethod - def _make_exchange(name, durable=False, auto_delete=True, - exchange_type='topic'): - """Make named exchange. - - This method creates object representing exchange on RabbitMQ. It would - create a new exchange if exchange with given name don't exists. - - :param name: Name of the exchange. - :param durable: If set to True, messages on this exchange would be - store on disk - therefore can be retrieve after - failure. - :param auto_delete: If set to True, exchange would be automatically - deleted when none is connected. - :param exchange_type: Type of the exchange. Can be one of 'direct', - 'topic', 'fanout', 'headers'. See Kombu docs for - further details. - :return: Kombu exchange object. - """ - return kombu.Exchange( - name=name, - type=exchange_type, - durable=durable, - auto_delete=auto_delete - ) - - @staticmethod - def _make_queue(name, exchange, routing_key='', - durable=False, auto_delete=True, **kwargs): - """Make named queue for a given exchange. - - This method creates object representing queue in RabbitMQ. It would - create a new queue if queue with given name don't exists. - - :param name: Name of the queue - :param exchange: Kombu Exchange object (can be created using - _make_exchange). - :param routing_key: Routing key for queue. It behaves differently - depending on exchange type. See Kombu docs for - further details. - :param durable: If set to True, messages on this queue would be - store on disk - therefore can be retrieve after - failure. - :param auto_delete: If set to True, queue would be automatically - deleted when none is connected. - :param kwargs: See kombu documentation for all parameters than may be - may be passed to Queue. - :return: Kombu Queue object. - """ - return kombu.Queue( - name=name, - routing_key=routing_key, - exchange=exchange, - durable=durable, - auto_delete=auto_delete, - **kwargs - ) - - def _register_mistral_serialization(self): - """Adds mistral serializer to available serializers in kombu.""" - - self.serializer = mistral_serialization.get_polymorphic_serializer() - - def _serialize_message(self, kwargs): - result = {} - - for argname, arg in kwargs.items(): - result[argname] = self.serializer.serialize(arg) - - return result - - def _deserialize_message(self, kwargs): - result = {} - - for argname, arg in kwargs.items(): - result[argname] = self.serializer.deserialize(arg) - - return result diff --git a/mistral/rpc/kombu/examples/__init__.py b/mistral/rpc/kombu/examples/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mistral/rpc/kombu/examples/client.py b/mistral/rpc/kombu/examples/client.py deleted file mode 100644 index 46c18fa8e..000000000 --- a/mistral/rpc/kombu/examples/client.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -from mistral.rpc.kombu import kombu_client - - -# Example of using Kombu based RPC client. -def main(): - conf = { - 'user_id': 'guest', - 'password': 'secret', - 'exchange': 'my_exchange', - 'topic': 'my_topic', - 'server_id': 'host', - 'host': 'localhost', - 'port': 5672, - 'virtual_host': '/' - } - kombu_rpc = kombu_client.KombuRPCClient(conf) - - print(" [x] Requesting ...") - - ctx = type('context', (object,), {'to_dict': lambda self: {}})() - - response = kombu_rpc.sync_call(ctx, 'fib', n=44) - - print(" [.] Got %r" % (response,)) - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/mistral/rpc/kombu/examples/server.py b/mistral/rpc/kombu/examples/server.py deleted file mode 100644 index 52f5aa0a8..000000000 --- a/mistral/rpc/kombu/examples/server.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -from mistral.rpc.kombu import kombu_server - - -# Simple example of endpoint of RPC server, which just -# calculates given fibonacci number. -class MyServer(object): - cache = {0: 0, 1: 1} - - def fib(self, rpc_ctx, n): - if self.cache.get(n) is None: - self.cache[n] = (self.fib(rpc_ctx, n - 1) - + self.fib(rpc_ctx, n - 2)) - return self.cache[n] - - def get_name(self, rpc_ctx): - return self.__class__.__name__ - - -# Example of using Kombu based RPC server. -def main(): - conf = { - 'user_id': 'guest', - 'password': 'secret', - 'exchange': 'my_exchange', - 'topic': 'my_topic', - 'server_id': 'host', - 'host': 'localhost', - 'port': 5672, - 'virtual_host': '/' - } - rpc_server = kombu_server.KombuRPCServer(conf) - rpc_server.register_endpoint(MyServer()) - rpc_server.run() - - -if __name__ == '__main__': - sys.exit(main()) diff --git a/mistral/rpc/kombu/kombu_client.py b/mistral/rpc/kombu/kombu_client.py deleted file mode 100644 index f7b93c6e5..000000000 --- a/mistral/rpc/kombu/kombu_client.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import socket - -import itertools - -import errno -import queue - -import kombu -from oslo_log import log as logging - -from mistral import config as cfg -from mistral import exceptions as exc -from mistral.rpc import base as rpc_base -from mistral.rpc.kombu import base as kombu_base -from mistral.rpc.kombu import kombu_hosts -from mistral.rpc.kombu import kombu_listener -from mistral_lib import utils - -#: When connection to the RabbitMQ server breaks, the -#: client will receive EPIPE socket errors. These indicate -#: an error that may be fixed by retrying. This constant -#: is a guess for how many times the retry may be reasonable -EPIPE_RETRIES = 4 - -LOG = logging.getLogger(__name__) - -CONF = cfg.CONF - - -class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): - def __init__(self, conf): - super(KombuRPCClient, self).__init__(conf) - - kombu_base.set_transport_options() - - self._register_mistral_serialization() - - self.topic = conf.topic - self.server_id = conf.host - - hosts = kombu_hosts.KombuHosts(CONF) - - self.exchange = CONF.control_exchange - self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues - self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete - # NOTE(amorin) let's use a hardcoded 60s value until we deprecate the - # kombu RPC driver - self._timeout = 60 - self.routing_key = self.topic - - connections = [] - - for host in hosts.hosts: - conn = self._make_connection( - host.hostname, - host.port, - host.username, - host.password, - hosts.virtual_host - ) - connections.append(conn) - - self._connections = itertools.cycle(connections) - - # Create exchange. - exchange = self._make_exchange( - self.exchange, - durable=self.durable_queue, - auto_delete=self.auto_delete - ) - - # Create queue. - self.queue_name = utils.generate_unicode_uuid() - self.callback_queue = kombu.Queue( - self.queue_name, - exchange=exchange, - routing_key=self.queue_name, - durable=False, - exclusive=True, - auto_delete=True - ) - - self._listener = kombu_listener.KombuRPCListener( - connections=self._connections, - callback_queue=self.callback_queue - ) - - self._listener.start() - - def _wait_for_result(self, correlation_id): - """Waits for the result from the server. - - Waits for the result from the server, checks every second if - a timeout occurred. If a timeout occurred - the `RpcTimeout` exception - will be raised. - """ - try: - return self._listener.get_result(correlation_id, self._timeout) - except queue.Empty: - raise exc.MistralException( - "RPC Request timeout, correlation_id = %s" % correlation_id - ) - - def _call(self, ctx, method, target, async_=False, **kwargs): - """Performs a remote call for the given method. - - :param ctx: authentication context associated with mistral - :param method: name of the method that should be executed - :param kwargs: keyword parameters for the remote-method - :param target: Server name - :param async: bool value means whether the request is - asynchronous or not. - :return: result of the method or None if async. - """ - correlation_id = utils.generate_unicode_uuid() - - body = { - 'rpc_ctx': ctx.to_dict(), - 'rpc_method': method, - 'arguments': self._serialize_message(kwargs), - 'async': async_ - } - - LOG.debug("Publish request: %s", body) - - try: - if not async_: - self._listener.add_listener(correlation_id) - - # Publish request. - for retry_round in range(EPIPE_RETRIES): - if self._publish_request(body, correlation_id): - break - - # Start waiting for response. - if async_: - return - - LOG.debug( - "Waiting a reply for sync call [reply_to = %s]", - self.queue_name - ) - - result = self._wait_for_result(correlation_id) - res_type = result[kombu_base.TYPE] - res_object = result[kombu_base.RESULT] - - if res_type == 'error': - raise res_object - else: - res_object = self._deserialize_message(res_object)['body'] - - finally: - if not async_: - self._listener.remove_listener(correlation_id) - - return res_object - - def _publish_request(self, body, correlation_id): - """Publishes the request message - - .. note:: - The :const:`errno.EPIPE` socket errors are suppressed - and result in False being returned. This is because - this type of error can usually be fixed by retrying. - - :param body: message body - :param correlation_id: correlation id - :return: True if publish succeeded, False otherwise - :rtype: bool - """ - try: - conn = self._listener.wait_ready() - if conn: - with kombu.producers[conn].acquire(block=True) as producer: - producer.publish( - body=body, - exchange=self.exchange, - routing_key=self.topic, - reply_to=self.queue_name, - correlation_id=correlation_id, - delivery_mode=2 - ) - return True - except socket.error as e: - if e.errno != errno.EPIPE: - raise - else: - LOG.debug('Retrying publish due to broker connection failure') - return False - - def sync_call(self, ctx, method, target=None, **kwargs): - return self._call(ctx, method, async_=False, target=target, **kwargs) - - def async_call(self, ctx, method, target=None, fanout=False, **kwargs): - return self._call(ctx, method, async_=True, target=target, **kwargs) diff --git a/mistral/rpc/kombu/kombu_hosts.py b/mistral/rpc/kombu/kombu_hosts.py deleted file mode 100644 index 9e1059255..000000000 --- a/mistral/rpc/kombu/kombu_hosts.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import itertools -import random - -import oslo_messaging as messaging - - -class KombuHosts(object): - def __init__(self, conf): - transport_url = messaging.TransportURL.parse(conf, conf.transport_url) - - self.virtual_host = transport_url.virtual_host - self.hosts = transport_url.hosts - - if len(self.hosts) > 1: - random.shuffle(self.hosts) - - self._hosts_cycle = itertools.cycle(self.hosts) - - def get_host(self): - return next(self._hosts_cycle) diff --git a/mistral/rpc/kombu/kombu_listener.py b/mistral/rpc/kombu/kombu_listener.py deleted file mode 100644 index 09ad701c9..000000000 --- a/mistral/rpc/kombu/kombu_listener.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (c) 2016 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import itertools -from kombu.mixins import ConsumerMixin -import queue -import threading - -from oslo_log import log as logging - -from mistral.rpc.kombu import base as kombu_base - -LOG = logging.getLogger(__name__) - - -class KombuRPCListener(ConsumerMixin): - - def __init__(self, connections, callback_queue): - self._results = {} - self._connections = itertools.cycle(connections) - self._callback_queue = callback_queue - self._thread = None - self.connection = next(self._connections) - - self.ready = threading.Event() - - def add_listener(self, correlation_id): - self._results[correlation_id] = queue.Queue() - - def remove_listener(self, correlation_id): - if correlation_id in self._results: - del self._results[correlation_id] - - def get_consumers(self, Consumer, channel): - consumers = [Consumer( - self._callback_queue, - callbacks=[self.on_message], - accept=['pickle', 'json'] - )] - self.ready.set() - - return consumers - - def start(self): - if self._thread is None: - self._thread = threading.Thread(target=self.run) - self._thread.daemon = True - self._thread.start() - - def on_message(self, response, message): - """Callback on response. - - This method is automatically called when a response is incoming and - decides if it is the message we are waiting for - the message with the - result. - - :param response: the body of the amqp message already deserialized - by kombu - :param message: the plain amqp kombu.message with additional - information - """ - LOG.debug("Got response: {}", response) - - try: - message.ack() - except Exception as e: - LOG.exception("Failed to acknowledge AMQP message: %s", e) - else: - LOG.debug("AMQP message acknowledged.") - - correlation_id = message.properties['correlation_id'] - - queue = self._results.get(correlation_id) - - if queue: - result = { - kombu_base.TYPE: 'error' - if message.properties.get('type') == 'error' - else None, - kombu_base.RESULT: response - } - - queue.put(result) - else: - LOG.debug( - "Got a response, but seems like no process is waiting for " - "it [correlation_id={}]", correlation_id - ) - - def get_result(self, correlation_id, timeout): - return self._results[correlation_id].get(block=True, timeout=timeout) - - def on_connection_error(self, exc, interval): - self.ready.clear() - - self.connection = next(self._connections) - - LOG.debug("Broker connection failed: %s", exc) - LOG.debug( - "Sleeping for %s seconds, then retrying connection", - interval - ) - - def wait_ready(self, timeout=10.0): - """Waits for the listener to successfully declare the consumer - - :param timeout: timeout for waiting in seconds - :return: same as :func:`~threading.Event.wait` - :rtype: bool - - """ - if self.ready.wait(timeout=timeout): - return self.connection - else: - return False diff --git a/mistral/rpc/kombu/kombu_server.py b/mistral/rpc/kombu/kombu_server.py deleted file mode 100644 index 979a386cd..000000000 --- a/mistral/rpc/kombu/kombu_server.py +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright 2015 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import amqp -import socket -import threading -import time - -import kombu -from oslo_config import cfg -from oslo_log import log as logging -from stevedore import driver - -from mistral import context as auth_ctx -from mistral import exceptions as exc -from mistral.rpc import base as rpc_base -from mistral.rpc.kombu import base as kombu_base -from mistral.rpc.kombu import kombu_hosts - - -LOG = logging.getLogger(__name__) - -CONF = cfg.CONF - - -class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): - def __init__(self, conf): - super(KombuRPCServer, self).__init__(conf) - - kombu_base.set_transport_options() - - self._register_mistral_serialization() - - self.topic = conf.topic - self.server_id = conf.host - - self._hosts = kombu_hosts.KombuHosts(CONF) - - # NOTE(amorin) let's use a hardcoded value until we deprecate the - # kombu RPC driver - self._executor_threads = 64 - self.exchange = CONF.control_exchange - # TODO(rakhmerov): We shouldn't rely on any properties related - # to oslo.messaging. Only "transport_url" should matter. - self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues - self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete - self.routing_key = self.topic - self.channel = None - self.conn = None - self._running = threading.Event() - self._stopped = threading.Event() - self.endpoints = [] - self._worker = None - self._thread = None - - # TODO(ddeja): Those 2 options should be gathered from config. - self._sleep_time = 1 - self._max_sleep_time = 10 - - @property - def is_running(self): - """Return whether server is running.""" - return self._running.is_set() - - def run(self, executor='threading'): - if self._thread is None: - self._thread = threading.Thread(target=self._run, args=(executor,)) - self._thread.daemon = True - self._thread.start() - - def _run(self, executor): - """Start the server.""" - self._prepare_worker(executor) - - while True: - try: - _retry_connection = False - host = self._hosts.get_host() - - self.conn = self._make_connection( - host.hostname, - host.port, - host.username, - host.password, - self._hosts.virtual_host, - ) - - conn = kombu.connections[self.conn].acquire(block=True) - - exchange = self._make_exchange( - self.exchange, - durable=self.durable_queue, - auto_delete=self.auto_delete - ) - - queue = self._make_queue( - self.topic, - exchange, - routing_key=self.routing_key, - durable=self.durable_queue, - auto_delete=self.auto_delete - ) - with conn.Consumer( - queues=queue, - callbacks=[self._process_message], - ) as consumer: - consumer.qos(prefetch_count=1) - - self._running.set() - self._stopped.clear() - - LOG.info( - "Connected to AMQP at %s:%s", - host.hostname, - host.port - ) - self._sleep_time = 1 - - while self.is_running: - try: - conn.drain_events(timeout=1) - except socket.timeout: - pass - except KeyboardInterrupt: - self.stop() - - LOG.info( - "Server with id='%s' stopped.", - self.server_id - ) - - return - except (socket.error, amqp.exceptions.ConnectionForced) as e: - LOG.debug("Broker connection failed: %s", e) - - _retry_connection = True - finally: - self._stopped.set() - - if _retry_connection: - LOG.debug( - "Sleeping for %s seconds, then retrying " - "connection", - self._sleep_time - ) - - time.sleep(self._sleep_time) - - self._sleep_time = min( - self._sleep_time * 2, - self._max_sleep_time - ) - - def stop(self, graceful=False): - self._running.clear() - - if graceful: - self.wait() - - def wait(self): - self._stopped.wait() - - try: - self._worker.shutdown(wait=True) - except AttributeError as e: - LOG.warning("Cannot stop worker in graceful way: %s", e) - - def _get_rpc_method(self, method_name): - for endpoint in self.endpoints: - if hasattr(endpoint, method_name): - return getattr(endpoint, method_name) - - return None - - @staticmethod - def _set_auth_ctx(ctx): - if not isinstance(ctx, dict): - return - - context = auth_ctx.MistralContext.from_dict(ctx) - auth_ctx.set_ctx(context) - - return context - - def publish_message(self, body, reply_to, corr_id, res_type='response'): - if res_type != 'error': - body = self._serialize_message({'body': body}) - - with kombu.producers[self.conn].acquire(block=True) as producer: - producer.publish( - body=body, - exchange=self.exchange, - routing_key=reply_to, - correlation_id=corr_id, - serializer='pickle' if res_type == 'error' else 'json', - type=res_type - ) - - def _on_message_safe(self, request, message): - try: - return self._on_message(request, message) - except Exception as e: - LOG.warning( - "Got exception while consuming message. Exception would be " - "send back to the caller." - ) - LOG.debug("Exceptions: %s", str(e)) - - # Wrap exception into another exception for compatibility - # with oslo. - self.publish_message( - exc.KombuException(e), - message.properties['reply_to'], - message.properties['correlation_id'], - res_type='error' - ) - finally: - message.ack() - - def _on_message(self, request, message): - LOG.debug('Received message %s', request) - - is_async = request.get('async', False) - rpc_ctx = request.get('rpc_ctx') - redelivered = message.delivery_info.get('redelivered') - rpc_method_name = request.get('rpc_method') - arguments = self._deserialize_message(request.get('arguments')) - correlation_id = message.properties['correlation_id'] - reply_to = message.properties['reply_to'] - - if redelivered is not None: - rpc_ctx['redelivered'] = redelivered - - rpc_context = self._set_auth_ctx(rpc_ctx) - - rpc_method = self._get_rpc_method(rpc_method_name) - - if not rpc_method: - raise exc.MistralException("No such method: %s" % rpc_method_name) - - response = rpc_method(rpc_ctx=rpc_context, **arguments) - - if not is_async: - LOG.debug( - "RPC server sent a reply [reply_to = %s, correlation_id = %s", - reply_to, - correlation_id - ) - - self.publish_message( - response, - reply_to, - correlation_id - ) - - def register_endpoint(self, endpoint): - self.endpoints.append(endpoint) - - def _process_message(self, request, message): - self._worker.submit(self._on_message_safe, request, message) - - def _prepare_worker(self, executor='blocking'): - mgr = driver.DriverManager('kombu_driver.executors', executor) - - executor_opts = {} - - if executor != 'blocking': - executor_opts['max_workers'] = self._executor_threads - - self._worker = mgr.driver(**executor_opts) diff --git a/mistral/tests/unit/rpc/kombu/__init__.py b/mistral/tests/unit/rpc/kombu/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mistral/tests/unit/rpc/kombu/base.py b/mistral/tests/unit/rpc/kombu/base.py deleted file mode 100644 index c00e106ec..000000000 --- a/mistral/tests/unit/rpc/kombu/base.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (c) 2016 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from mistral import config as cfg -from mistral.rpc.kombu import base as kombu_base -from mistral.tests.unit import base - - -class KombuTestCase(base.BaseTest): - - def setUp(self): - super(KombuTestCase, self).setUp() - - kombu_base.set_transport_options(check_backend=False) - - cfg.CONF.set_default('transport_url', 'rabbit://localhost:567') diff --git a/mistral/tests/unit/rpc/kombu/fake_kombu.py b/mistral/tests/unit/rpc/kombu/fake_kombu.py deleted file mode 100644 index 605116157..000000000 --- a/mistral/tests/unit/rpc/kombu/fake_kombu.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright (c) 2016 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from kombu import mixins as mx -from unittest import mock - - -# Hack for making tests works with kombu listener -mixins = mx - -producer = mock.MagicMock() - -producers = mock.MagicMock() -producers.__getitem__ = lambda *args, **kwargs: producer - -connection = mock.MagicMock() - -connections = mock.MagicMock() -connections.__getitem__ = lambda *args, **kwargs: connection - -serialization = mock.MagicMock() - - -def BrokerConnection(*args, **kwargs): - return mock.MagicMock() - - -def Exchange(*args, **kwargs): - return mock.MagicMock() - - -def Queue(*args, **kwargs): - return mock.MagicMock() - - -def Consumer(*args, **kwargs): - return mock.MagicMock() diff --git a/mistral/tests/unit/rpc/kombu/test_kombu_client.py b/mistral/tests/unit/rpc/kombu/test_kombu_client.py deleted file mode 100644 index 80215e6b6..000000000 --- a/mistral/tests/unit/rpc/kombu/test_kombu_client.py +++ /dev/null @@ -1,100 +0,0 @@ -# Copyright (c) 2016 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from mistral import exceptions as exc -from mistral.tests.unit.rpc.kombu import base -from mistral.tests.unit.rpc.kombu import fake_kombu -from unittest import mock - -import queue - -with mock.patch.dict('sys.modules', kombu=fake_kombu): - from mistral.rpc.kombu import base as kombu_base - from mistral.rpc.kombu import kombu_client - - -class TestException(exc.MistralException): - pass - - -class KombuClientTest(base.KombuTestCase): - - _RESPONSE = "response" - - def setUp(self): - super(KombuClientTest, self).setUp() - - conf = mock.MagicMock() - - listener_class = kombu_client.kombu_listener.KombuRPCListener - - kombu_client.kombu_listener.KombuRPCListener = mock.MagicMock() - - def restore_listener(): - kombu_client.kombu_listener.KombuRPCListener = listener_class - - self.addCleanup(restore_listener) - - self.client = kombu_client.KombuRPCClient(conf) - self.ctx = type( - 'context', - (object,), - {'to_dict': lambda self: {}} - )() - - def test_sync_call_result_get(self): - self.client._listener.get_result = mock.MagicMock( - return_value={ - kombu_base.TYPE: None, - kombu_base.RESULT: self.client._serialize_message({ - 'body': self._RESPONSE - }) - } - ) - - response = self.client.sync_call(self.ctx, 'method') - - self.assertEqual(response, self._RESPONSE) - - def test_sync_call_result_not_get(self): - self.client._listener.get_result = mock.MagicMock( - side_effect=queue.Empty - ) - - self.assertRaises( - exc.MistralException, - self.client.sync_call, - self.ctx, - 'method_not_found' - ) - - def test_sync_call_result_type_error(self): - def side_effect(*args, **kwargs): - return { - kombu_base.TYPE: 'error', - kombu_base.RESULT: TestException() - } - - self.client._wait_for_result = mock.MagicMock(side_effect=side_effect) - - self.assertRaises( - TestException, - self.client.sync_call, - self.ctx, - 'method' - ) - - def test_async_call(self): - self.assertIsNone(self.client.async_call(self.ctx, 'method')) diff --git a/mistral/tests/unit/rpc/kombu/test_kombu_hosts.py b/mistral/tests/unit/rpc/kombu/test_kombu_hosts.py deleted file mode 100644 index a80674ca2..000000000 --- a/mistral/tests/unit/rpc/kombu/test_kombu_hosts.py +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from mistral.rpc.kombu import kombu_hosts -from mistral.tests.unit import base -from oslo_config import cfg - -import functools -import oslo_messaging - - -HOST_1 = 'rabbitmq_1' -PORT_1 = 5671 -HOST_2 = 'rabbitmq_2' -PORT_2 = 5672 -USER_1 = 'u_mistral_1' -PASSWORD_1 = 'p_mistral_1' -USER_2 = 'u_mistral_2' -PASSWORD_2 = 'p_mistral_2' -VIRTUAL_HOST_1 = 'vhost_1' -VIRTUAL_HOST_2 = 'vhost_2' - - -class KombuHostsTest(base.BaseTest): - - def setUp(self): - super(KombuHostsTest, self).setUp() - - # Oslo messaging set a default config option - oslo_messaging.get_transport(cfg.CONF) - - def assert_transports_host(self, expected, result): - sorted_by_host = functools.partial(sorted, key=lambda x: x.hostname) - - self.assertListEqual(sorted_by_host(expected), sorted_by_host(result)) - - def test_transport_url(self): - self.override_config( - 'transport_url', - 'rabbit://{user}:{password}@{host}:{port}/{virtual_host}'.format( - user=USER_1, port=PORT_1, host=HOST_1, - password=PASSWORD_1, - virtual_host=VIRTUAL_HOST_1 - )) - - hosts = kombu_hosts.KombuHosts(cfg.CONF) - - self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host) - self.assert_transports_host([oslo_messaging.TransportHost( - hostname=HOST_1, - port=PORT_1, - username=USER_1, - password=PASSWORD_1, - )], hosts.hosts) - - def test_transport_url_multiple_hosts(self): - self.override_config( - 'transport_url', - 'rabbit://{user_1}:{password_1}@{host_1}:{port_1},' - '{user_2}:{password_2}@{host_2}:{port_2}/{virtual_host}'.format( - user_1=USER_1, - password_1=PASSWORD_1, - port_1=PORT_1, - host_1=HOST_1, - user_2=USER_2, - password_2=PASSWORD_2, - host_2=HOST_2, - port_2=PORT_2, - virtual_host=VIRTUAL_HOST_1 - )) - - hosts = kombu_hosts.KombuHosts(cfg.CONF) - - self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host) - self.assert_transports_host( - [ - oslo_messaging.TransportHost( - hostname=HOST_1, - port=PORT_1, - username=USER_1, - password=PASSWORD_1 - ), - oslo_messaging.TransportHost( - hostname=HOST_2, - port=PORT_2, - username=USER_2, - password=PASSWORD_2 - ) - ], - hosts.hosts - ) diff --git a/mistral/tests/unit/rpc/kombu/test_kombu_listener.py b/mistral/tests/unit/rpc/kombu/test_kombu_listener.py deleted file mode 100644 index 132bfa781..000000000 --- a/mistral/tests/unit/rpc/kombu/test_kombu_listener.py +++ /dev/null @@ -1,219 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from mistral import exceptions as exc -from mistral.tests.unit.rpc.kombu import base -from mistral.tests.unit.rpc.kombu import fake_kombu -from mistral_lib import utils -from unittest import mock - -import queue - -with mock.patch.dict('sys.modules', kombu=fake_kombu): - from mistral.rpc.kombu import base as kombu_base - from mistral.rpc.kombu import kombu_listener - - -class TestException(exc.MistralException): - pass - - -class KombuListenerTest(base.KombuTestCase): - - def setUp(self): - super(KombuListenerTest, self).setUp() - - self.listener = kombu_listener.KombuRPCListener( - [mock.MagicMock()], - mock.MagicMock() - ) - self.ctx = type('context', (object,), {'to_dict': lambda self: {}})() - - def test_add_listener(self): - correlation_id = utils.generate_unicode_uuid() - - self.listener.add_listener(correlation_id) - - self.assertEqual( - type(self.listener._results.get(correlation_id)), - queue.Queue - ) - - self.assertEqual(0, self.listener._results[correlation_id].qsize()) - - def test_remove_listener_correlation_id_in_results(self): - correlation_id = utils.generate_unicode_uuid() - - self.listener.add_listener(correlation_id) - - self.assertEqual( - type(self.listener._results.get(correlation_id)), - queue.Queue - ) - - self.listener.remove_listener(correlation_id) - - self.assertIsNone( - self.listener._results.get(correlation_id) - ) - - def test_remove_listener_correlation_id_not_in_results(self): - correlation_id = utils.generate_unicode_uuid() - - self.listener.add_listener(correlation_id) - - self.assertEqual( - type(self.listener._results.get(correlation_id)), - queue.Queue - ) - - self.listener.remove_listener(utils.generate_unicode_uuid()) - - self.assertEqual( - type(self.listener._results.get(correlation_id)), - queue.Queue - ) - - @mock.patch('threading.Thread') - def test_start_thread_not_set(self, thread_class_mock): - thread_mock = mock.MagicMock() - thread_class_mock.return_value = thread_mock - - self.listener.start() - - self.assertTrue(thread_mock.daemon) - self.assertEqual(thread_mock.start.call_count, 1) - - @mock.patch('threading.Thread') - def test_start_thread_set(self, thread_class_mock): - thread_mock = mock.MagicMock() - thread_class_mock.return_value = thread_mock - - self.listener._thread = mock.MagicMock() - self.listener.start() - - self.assertEqual(thread_mock.start.call_count, 0) - - def test_get_result_results_in_queue(self): - expected_result = 'abcd' - correlation_id = utils.generate_unicode_uuid() - - self.listener.add_listener(correlation_id) - self.listener._results.get(correlation_id).put(expected_result) - - result = self.listener.get_result(correlation_id, 5) - - self.assertEqual(result, expected_result) - - def test_get_result_not_in_queue(self): - correlation_id = utils.generate_unicode_uuid() - - self.listener.add_listener(correlation_id) - - self.assertRaises( - queue.Empty, - self.listener.get_result, - correlation_id, - 1 # timeout - ) - - def test_get_result_lack_of_queue(self): - correlation_id = utils.generate_unicode_uuid() - - self.assertRaises( - KeyError, - self.listener.get_result, - correlation_id, - 1 # timeout - ) - - def test__on_response_message_ack_fail(self): - message = mock.MagicMock() - message.ack.side_effect = Exception('Test Exception') - response = 'response' - - kombu_listener.LOG = mock.MagicMock() - - self.listener.on_message(response, message) - self.assertEqual(kombu_listener.LOG.debug.call_count, 1) - self.assertEqual(kombu_listener.LOG.exception.call_count, 1) - - def test__on_response_message_ack_ok_corr_id_not_match(self): - message = mock.MagicMock() - message.properties = mock.MagicMock() - message.properties.__getitem__ = lambda *args, **kwargs: True - response = 'response' - - kombu_listener.LOG = mock.MagicMock() - - self.listener.on_message(response, message) - self.assertEqual(kombu_listener.LOG.debug.call_count, 3) - self.assertEqual(kombu_listener.LOG.exception.call_count, 0) - - def test__on_response_message_ack_ok_messsage_type_error(self): - correlation_id = utils.generate_unicode_uuid() - - message = mock.MagicMock() - message.properties = dict() - message.properties['type'] = 'error' - message.properties['correlation_id'] = correlation_id - - response = TestException('response') - - kombu_listener.LOG = mock.MagicMock() - - self.listener.add_listener(correlation_id) - self.listener.on_message(response, message) - - self.assertEqual(kombu_listener.LOG.debug.call_count, 2) - self.assertEqual(kombu_listener.LOG.exception.call_count, 0) - - result = self.listener.get_result(correlation_id, 5) - - self.assertDictEqual( - result, - { - kombu_base.TYPE: 'error', - kombu_base.RESULT: response - } - ) - - def test__on_response_message_ack_ok(self): - correlation_id = utils.generate_unicode_uuid() - - message = mock.MagicMock() - message.properties = dict() - message.properties['type'] = None - message.properties['correlation_id'] = correlation_id - - response = 'response' - - kombu_listener.LOG = mock.MagicMock() - - self.listener.add_listener(correlation_id) - self.listener.on_message(response, message) - - self.assertEqual(kombu_listener.LOG.debug.call_count, 2) - self.assertEqual(kombu_listener.LOG.exception.call_count, 0) - - result = self.listener.get_result(correlation_id, 5) - - self.assertDictEqual( - result, - { - kombu_base.TYPE: None, - kombu_base.RESULT: response - } - ) diff --git a/mistral/tests/unit/rpc/kombu/test_kombu_server.py b/mistral/tests/unit/rpc/kombu/test_kombu_server.py deleted file mode 100644 index f581c69c6..000000000 --- a/mistral/tests/unit/rpc/kombu/test_kombu_server.py +++ /dev/null @@ -1,311 +0,0 @@ -# Copyright (c) 2016 Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import futurist -from mistral import context -from mistral import exceptions as exc -from mistral.tests.unit.rpc.kombu import base -from mistral.tests.unit.rpc.kombu import fake_kombu -from unittest import mock - -import socket -import time - -from stevedore import driver - -with mock.patch.dict('sys.modules', kombu=fake_kombu): - from mistral.rpc.kombu import kombu_server - - -class TestException(exc.MistralError): - pass - - -class KombuServerTest(base.KombuTestCase): - - def setUp(self): - super(KombuServerTest, self).setUp() - - self.conf = mock.MagicMock() - self.conf.host = 'fakehost' - self.server = kombu_server.KombuRPCServer(self.conf) - self.ctx = type('context', (object,), {'to_dict': lambda self: {}})() - - def test_is_running_is_running(self): - self.server._running.set() - self.assertTrue(self.server.is_running) - - def test_is_running_is_not_running(self): - self.server._running.clear() - self.assertFalse(self.server.is_running) - - def test_stop(self): - self.server.stop() - self.assertFalse(self.server.is_running) - - def test_publish_message(self): - body = 'body' - reply_to = 'reply_to' - corr_id = 'corr_id' - type = 'type' - - acquire_mock = mock.MagicMock() - fake_kombu.producer.acquire.return_value = acquire_mock - - enter_mock = mock.MagicMock() - acquire_mock.__enter__.return_value = enter_mock - - self.server.publish_message(body, reply_to, corr_id, type) - enter_mock.publish.assert_called_once_with( - body={'body': '"body"'}, - exchange='openstack', - routing_key=reply_to, - correlation_id=corr_id, - type=type, - serializer='json' - ) - - def test_run_launch_successfully(self): - acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = TestException() - fake_kombu.connection.acquire.return_value = acquire_mock - - self.assertRaises(TestException, self.server._run, 'blocking') - self.assertTrue(self.server.is_running) - - def test_run_launch_successfully_than_stop(self): - - def side_effect(*args, **kwargs): - self.assertTrue(self.server.is_running) - raise KeyboardInterrupt - - acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = side_effect - fake_kombu.connection.acquire.return_value = acquire_mock - - self.server._run('blocking') - self.assertFalse(self.server.is_running) - self.assertEqual(self.server._sleep_time, 1) - - def test_run_socket_error_reconnect(self): - - def side_effect(*args, **kwargs): - if acquire_mock.drain_events.call_count == 1: - raise socket.error() - raise TestException() - - acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = side_effect - fake_kombu.connection.acquire.return_value = acquire_mock - - self.assertRaises(TestException, self.server._run, 'blocking') - self.assertEqual(self.server._sleep_time, 1) - - def test_run_socket_timeout_still_running(self): - - def side_effect(*args, **kwargs): - if acquire_mock.drain_events.call_count == 0: - raise socket.timeout() - raise TestException() - - acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = side_effect - fake_kombu.connection.acquire.return_value = acquire_mock - - self.assertRaises( - TestException, - self.server._run, - 'blocking' - ) - self.assertTrue(self.server.is_running) - - def test_run_keyboard_interrupt_not_running(self): - acquire_mock = mock.MagicMock() - acquire_mock.drain_events.side_effect = KeyboardInterrupt() - fake_kombu.connection.acquire.return_value = acquire_mock - - self.assertIsNone(self.server.run()) - # Wait 1 sec so the thread start listening on RPC and receive the - # side_effect - time.sleep(1) - self.assertFalse(self.server.is_running) - - @mock.patch.object( - kombu_server.KombuRPCServer, - '_on_message', - mock.MagicMock() - ) - @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') - def test__on_message_safe_message_processing_ok(self, publish_message): - message = mock.MagicMock() - - self.server._on_message_safe(None, message) - - self.assertEqual(message.ack.call_count, 1) - self.assertEqual(publish_message.call_count, 0) - - @mock.patch.object(kombu_server.KombuRPCServer, '_on_message') - @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') - def test__on_message_safe_message_processing_raise( - self, - publish_message, - _on_message - ): - reply_to = 'reply_to' - correlation_id = 'corr_id' - message = mock.MagicMock() - message.properties = { - 'reply_to': reply_to, - 'correlation_id': correlation_id - } - - test_exception = TestException() - _on_message.side_effect = test_exception - - self.server._on_message_safe(None, message) - - self.assertEqual(message.ack.call_count, 1) - self.assertEqual(publish_message.call_count, 1) - - @mock.patch.object( - kombu_server.KombuRPCServer, - '_get_rpc_method', - mock.MagicMock(return_value=None) - ) - def test__on_message_rpc_method_not_found(self): - request = { - 'rpc_ctx': {}, - 'rpc_method': 'not_found_method', - 'arguments': {} - } - - message = mock.MagicMock() - message.properties = { - 'reply_to': None, - 'correlation_id': None - } - - self.assertRaises( - exc.MistralException, - self.server._on_message, - request, - message - ) - - @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') - @mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method') - @mock.patch('mistral.context.MistralContext.from_dict') - def test__on_message_is_async(self, mock_get_context, get_rpc_method, - publish_message): - result = 'result' - request = { - 'async': True, - 'rpc_ctx': {}, - 'rpc_method': 'found_method', - 'arguments': self.server._serialize_message({ - 'a': 1, - 'b': 2 - }) - } - - message = mock.MagicMock() - message.properties = { - 'reply_to': None, - 'correlation_id': None - } - message.delivery_info.get.return_value = False - - rpc_method = mock.MagicMock(return_value=result) - get_rpc_method.return_value = rpc_method - - ctx = context.MistralContext() - mock_get_context.return_value = ctx - - self.server._on_message(request, message) - - rpc_method.assert_called_once_with( - rpc_ctx=ctx, - a=1, - b=2 - ) - self.assertEqual(publish_message.call_count, 0) - - @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') - @mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method') - @mock.patch('mistral.context.MistralContext.from_dict') - def test__on_message_is_sync(self, mock_get_context, get_rpc_method, - publish_message): - result = 'result' - request = { - 'async': False, - 'rpc_ctx': {}, - 'rpc_method': 'found_method', - 'arguments': self.server._serialize_message({ - 'a': 1, - 'b': 2 - }) - } - - reply_to = 'reply_to' - correlation_id = 'corr_id' - message = mock.MagicMock() - message.properties = { - 'reply_to': reply_to, - 'correlation_id': correlation_id - } - message.delivery_info.get.return_value = False - - rpc_method = mock.MagicMock(return_value=result) - get_rpc_method.return_value = rpc_method - - ctx = context.MistralContext() - mock_get_context.return_value = ctx - - self.server._on_message(request, message) - - rpc_method.assert_called_once_with( - rpc_ctx=ctx, - a=1, - b=2 - ) - publish_message.assert_called_once_with( - result, - reply_to, - correlation_id - ) - - def test__prepare_worker(self): - self.server._prepare_worker('blocking') - self.assertEqual( - futurist.SynchronousExecutor, - type(self.server._worker) - ) - - self.server._prepare_worker('threading') - self.assertEqual( - futurist.ThreadPoolExecutor, - type(self.server._worker) - ) - - @mock.patch('stevedore.driver.DriverManager') - def test__prepare_worker_no_valid_executor(self, driver_manager_mock): - - driver_manager_mock.side_effect = driver.NoMatches() - - self.assertRaises( - driver.NoMatches, - self.server._prepare_worker, - 'non_valid_executor' - ) diff --git a/releasenotes/notes/remove-kombu-rpc-28a616c55fa051b5.yaml b/releasenotes/notes/remove-kombu-rpc-28a616c55fa051b5.yaml new file mode 100644 index 000000000..51d84ed44 --- /dev/null +++ b/releasenotes/notes/remove-kombu-rpc-28a616c55fa051b5.yaml @@ -0,0 +1,5 @@ +--- +deprecations: + - | + Kombu RPC driver is now deleted from mistral RPC drivers. + Only oslo messaging driver is supported. diff --git a/requirements.txt b/requirements.txt index 0fde366ad..f669a1b50 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,7 +10,6 @@ eventlet>=0.27.0 # MIT Jinja2>=2.10 # BSD License (3 clause) jsonschema>=3.2.0 # MIT keystonemiddleware>=4.18.0 # Apache-2.0 -kombu!=4.0.2,>=4.6.1 # BSD mistral-lib>=2.3.0 # Apache-2.0 networkx>=2.3 # BSD oslo.concurrency>=3.26.0 # Apache-2.0 diff --git a/setup.cfg b/setup.cfg index 41c9d145e..3648aabb0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -36,9 +36,6 @@ wsgi_scripts = mistral.rpc.backends = oslo_client = mistral.rpc.oslo.oslo_client:OsloRPCClient oslo_server = mistral.rpc.oslo.oslo_server:OsloRPCServer - # NOTE(amorin) Kombu driver is deprecated - kombu_client = mistral.rpc.kombu.kombu_client:KombuRPCClient - kombu_server = mistral.rpc.kombu.kombu_server:KombuRPCServer oslo.config.opts = mistral.config = mistral.config:list_opts @@ -108,12 +105,6 @@ mistral.auth = keystone = mistral.auth.keystone:KeystoneAuthHandler keycloak-oidc = mistral.auth.keycloak:KeycloakAuthHandler -# NOTE(amorin) Kombu driver is deprecated -kombu_driver.executors = - blocking = futurist:SynchronousExecutor - threading = futurist:ThreadPoolExecutor - eventlet = futurist:GreenThreadPoolExecutor - pygments.lexers = mistral = mistral.ext.pygmentplugin:MistralLexer