Merge "Remove kombu RPC driver"
This commit is contained in:
commit
c2d529d35e
@ -111,7 +111,6 @@
|
||||
- mistral-devstack-tempest-ipv6-only
|
||||
- mistral-devstack-non-apache-tempest-ipv6-only
|
||||
- mistral-devstack-non-apache
|
||||
- mistral-devstack-kombu
|
||||
# Disable mysql / postgresql units as they are not working as expected
|
||||
# - mistral-tox-unit-mysql
|
||||
# - mistral-tox-unit-postgresql
|
||||
@ -123,7 +122,6 @@
|
||||
- mistral-devstack-tempest-ipv6-only
|
||||
- mistral-devstack-non-apache-tempest-ipv6-only
|
||||
- mistral-devstack-non-apache
|
||||
- mistral-devstack-kombu
|
||||
# Disable mysql / postgresql units as they are not working as expected
|
||||
# - mistral-tox-unit-mysql
|
||||
# - mistral-tox-unit-postgresql
|
||||
|
@ -193,17 +193,6 @@ js_impl_opt = cfg.StrOpt(
|
||||
'action to evaluate scripts.')
|
||||
)
|
||||
|
||||
rpc_impl_opt = cfg.StrOpt(
|
||||
'rpc_implementation',
|
||||
default='oslo',
|
||||
choices=['oslo', 'kombu'],
|
||||
deprecated_for_removal=True,
|
||||
deprecated_reason='Kombu driver is deprecated and will be removed '
|
||||
'in the F release cycle',
|
||||
help=_('Specifies RPC implementation for RPC client and server. '
|
||||
'Support of kombu driver is experimental.')
|
||||
)
|
||||
|
||||
oslo_rpc_executor = cfg.StrOpt(
|
||||
'oslo_rpc_executor',
|
||||
default='threading',
|
||||
@ -805,7 +794,6 @@ CONF.register_opt(wf_trace_log_name_opt)
|
||||
CONF.register_opt(auth_type_opt)
|
||||
CONF.register_opt(scheduler_type_opt)
|
||||
CONF.register_opt(js_impl_opt)
|
||||
CONF.register_opt(rpc_impl_opt)
|
||||
CONF.register_opt(oslo_rpc_executor)
|
||||
CONF.register_opt(expiration_token_duration)
|
||||
CONF.register_opts(service_opts.service_opts)
|
||||
@ -849,7 +837,6 @@ default_group_opts = CLI_OPTS + [
|
||||
auth_type_opt,
|
||||
scheduler_type_opt,
|
||||
js_impl_opt,
|
||||
rpc_impl_opt,
|
||||
oslo_rpc_executor,
|
||||
expiration_token_duration
|
||||
]
|
||||
|
@ -42,9 +42,6 @@ def cleanup():
|
||||
_TRANSPORT = None
|
||||
|
||||
|
||||
# TODO(rakhmerov): This method seems misplaced. Now we have different kind
|
||||
# of transports (oslo, kombu) and this module should not have any oslo
|
||||
# specific things anymore.
|
||||
def get_transport():
|
||||
global _TRANSPORT
|
||||
|
||||
@ -54,27 +51,25 @@ def get_transport():
|
||||
return _TRANSPORT
|
||||
|
||||
|
||||
# TODO(amorin) maybe refactor this since we have only one impl now
|
||||
def get_rpc_server_driver():
|
||||
rpc_impl = cfg.CONF.rpc_implementation
|
||||
|
||||
global _IMPL_SERVER
|
||||
if not _IMPL_SERVER:
|
||||
_IMPL_SERVER = driver.DriverManager(
|
||||
'mistral.rpc.backends',
|
||||
'%s_server' % rpc_impl
|
||||
'oslo_server'
|
||||
).driver
|
||||
|
||||
return _IMPL_SERVER
|
||||
|
||||
|
||||
# TODO(amorin) maybe refactor this since we have only one impl now
|
||||
def get_rpc_client_driver():
|
||||
rpc_impl = cfg.CONF.rpc_implementation
|
||||
|
||||
global _IMPL_CLIENT
|
||||
if not _IMPL_CLIENT:
|
||||
_IMPL_CLIENT = driver.DriverManager(
|
||||
'mistral.rpc.backends',
|
||||
'%s_client' % rpc_impl
|
||||
'oslo_client'
|
||||
).driver
|
||||
|
||||
return _IMPL_CLIENT
|
||||
|
@ -1,149 +0,0 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import kombu
|
||||
|
||||
from mistral_lib import serialization as mistral_serialization
|
||||
import oslo_messaging as messaging
|
||||
|
||||
from mistral import config as cfg
|
||||
from mistral import exceptions as exc
|
||||
|
||||
IS_RECEIVED = 'kombu_rpc_is_received'
|
||||
RESULT = 'kombu_rpc_result'
|
||||
CORR_ID = 'kombu_rpc_correlation_id'
|
||||
TYPE = 'kombu_rpc_type'
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def set_transport_options(check_backend=True):
|
||||
# We can be sure that all needed transport options are registered
|
||||
# only if we at least once called method get_transport(). Because
|
||||
# this is the method that registers them.
|
||||
messaging.get_transport(CONF)
|
||||
|
||||
backend = messaging.TransportURL.parse(CONF, CONF.transport_url).transport
|
||||
|
||||
if check_backend and backend not in ['rabbit', 'kombu']:
|
||||
raise exc.MistralException("Unsupported backend: %s" % backend)
|
||||
|
||||
|
||||
class Base(object):
|
||||
"""Base class for Client and Server."""
|
||||
def __init__(self):
|
||||
self.serializer = None
|
||||
|
||||
@staticmethod
|
||||
def _make_connection(amqp_host, amqp_port, amqp_user, amqp_password,
|
||||
amqp_vhost):
|
||||
"""Create connection.
|
||||
|
||||
This method creates object representing the connection to RabbitMQ.
|
||||
|
||||
:param amqp_host: Address of RabbitMQ server.
|
||||
:param amqp_user: Username for connecting to RabbitMQ.
|
||||
:param amqp_password: Password matching the given username.
|
||||
:param amqp_vhost: Virtual host to connect to.
|
||||
:param amqp_port: Port of RabbitMQ server.
|
||||
:return: New connection to RabbitMQ.
|
||||
"""
|
||||
return kombu.BrokerConnection(
|
||||
hostname=amqp_host,
|
||||
userid=amqp_user,
|
||||
password=amqp_password,
|
||||
virtual_host=amqp_vhost,
|
||||
port=amqp_port,
|
||||
transport_options={'confirm_publish': True}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_exchange(name, durable=False, auto_delete=True,
|
||||
exchange_type='topic'):
|
||||
"""Make named exchange.
|
||||
|
||||
This method creates object representing exchange on RabbitMQ. It would
|
||||
create a new exchange if exchange with given name don't exists.
|
||||
|
||||
:param name: Name of the exchange.
|
||||
:param durable: If set to True, messages on this exchange would be
|
||||
store on disk - therefore can be retrieve after
|
||||
failure.
|
||||
:param auto_delete: If set to True, exchange would be automatically
|
||||
deleted when none is connected.
|
||||
:param exchange_type: Type of the exchange. Can be one of 'direct',
|
||||
'topic', 'fanout', 'headers'. See Kombu docs for
|
||||
further details.
|
||||
:return: Kombu exchange object.
|
||||
"""
|
||||
return kombu.Exchange(
|
||||
name=name,
|
||||
type=exchange_type,
|
||||
durable=durable,
|
||||
auto_delete=auto_delete
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _make_queue(name, exchange, routing_key='',
|
||||
durable=False, auto_delete=True, **kwargs):
|
||||
"""Make named queue for a given exchange.
|
||||
|
||||
This method creates object representing queue in RabbitMQ. It would
|
||||
create a new queue if queue with given name don't exists.
|
||||
|
||||
:param name: Name of the queue
|
||||
:param exchange: Kombu Exchange object (can be created using
|
||||
_make_exchange).
|
||||
:param routing_key: Routing key for queue. It behaves differently
|
||||
depending on exchange type. See Kombu docs for
|
||||
further details.
|
||||
:param durable: If set to True, messages on this queue would be
|
||||
store on disk - therefore can be retrieve after
|
||||
failure.
|
||||
:param auto_delete: If set to True, queue would be automatically
|
||||
deleted when none is connected.
|
||||
:param kwargs: See kombu documentation for all parameters than may be
|
||||
may be passed to Queue.
|
||||
:return: Kombu Queue object.
|
||||
"""
|
||||
return kombu.Queue(
|
||||
name=name,
|
||||
routing_key=routing_key,
|
||||
exchange=exchange,
|
||||
durable=durable,
|
||||
auto_delete=auto_delete,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
def _register_mistral_serialization(self):
|
||||
"""Adds mistral serializer to available serializers in kombu."""
|
||||
|
||||
self.serializer = mistral_serialization.get_polymorphic_serializer()
|
||||
|
||||
def _serialize_message(self, kwargs):
|
||||
result = {}
|
||||
|
||||
for argname, arg in kwargs.items():
|
||||
result[argname] = self.serializer.serialize(arg)
|
||||
|
||||
return result
|
||||
|
||||
def _deserialize_message(self, kwargs):
|
||||
result = {}
|
||||
|
||||
for argname, arg in kwargs.items():
|
||||
result[argname] = self.serializer.deserialize(arg)
|
||||
|
||||
return result
|
@ -1,44 +0,0 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from mistral.rpc.kombu import kombu_client
|
||||
|
||||
|
||||
# Example of using Kombu based RPC client.
|
||||
def main():
|
||||
conf = {
|
||||
'user_id': 'guest',
|
||||
'password': 'secret',
|
||||
'exchange': 'my_exchange',
|
||||
'topic': 'my_topic',
|
||||
'server_id': 'host',
|
||||
'host': 'localhost',
|
||||
'port': 5672,
|
||||
'virtual_host': '/'
|
||||
}
|
||||
kombu_rpc = kombu_client.KombuRPCClient(conf)
|
||||
|
||||
print(" [x] Requesting ...")
|
||||
|
||||
ctx = type('context', (object,), {'to_dict': lambda self: {}})()
|
||||
|
||||
response = kombu_rpc.sync_call(ctx, 'fib', n=44)
|
||||
|
||||
print(" [.] Got %r" % (response,))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
@ -1,53 +0,0 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from mistral.rpc.kombu import kombu_server
|
||||
|
||||
|
||||
# Simple example of endpoint of RPC server, which just
|
||||
# calculates given fibonacci number.
|
||||
class MyServer(object):
|
||||
cache = {0: 0, 1: 1}
|
||||
|
||||
def fib(self, rpc_ctx, n):
|
||||
if self.cache.get(n) is None:
|
||||
self.cache[n] = (self.fib(rpc_ctx, n - 1)
|
||||
+ self.fib(rpc_ctx, n - 2))
|
||||
return self.cache[n]
|
||||
|
||||
def get_name(self, rpc_ctx):
|
||||
return self.__class__.__name__
|
||||
|
||||
|
||||
# Example of using Kombu based RPC server.
|
||||
def main():
|
||||
conf = {
|
||||
'user_id': 'guest',
|
||||
'password': 'secret',
|
||||
'exchange': 'my_exchange',
|
||||
'topic': 'my_topic',
|
||||
'server_id': 'host',
|
||||
'host': 'localhost',
|
||||
'port': 5672,
|
||||
'virtual_host': '/'
|
||||
}
|
||||
rpc_server = kombu_server.KombuRPCServer(conf)
|
||||
rpc_server.register_endpoint(MyServer())
|
||||
rpc_server.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
@ -1,209 +0,0 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import socket
|
||||
|
||||
import itertools
|
||||
|
||||
import errno
|
||||
import queue
|
||||
|
||||
import kombu
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral import config as cfg
|
||||
from mistral import exceptions as exc
|
||||
from mistral.rpc import base as rpc_base
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
from mistral.rpc.kombu import kombu_hosts
|
||||
from mistral.rpc.kombu import kombu_listener
|
||||
from mistral_lib import utils
|
||||
|
||||
#: When connection to the RabbitMQ server breaks, the
|
||||
#: client will receive EPIPE socket errors. These indicate
|
||||
#: an error that may be fixed by retrying. This constant
|
||||
#: is a guess for how many times the retry may be reasonable
|
||||
EPIPE_RETRIES = 4
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
def __init__(self, conf):
|
||||
super(KombuRPCClient, self).__init__(conf)
|
||||
|
||||
kombu_base.set_transport_options()
|
||||
|
||||
self._register_mistral_serialization()
|
||||
|
||||
self.topic = conf.topic
|
||||
self.server_id = conf.host
|
||||
|
||||
hosts = kombu_hosts.KombuHosts(CONF)
|
||||
|
||||
self.exchange = CONF.control_exchange
|
||||
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
|
||||
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
|
||||
# NOTE(amorin) let's use a hardcoded 60s value until we deprecate the
|
||||
# kombu RPC driver
|
||||
self._timeout = 60
|
||||
self.routing_key = self.topic
|
||||
|
||||
connections = []
|
||||
|
||||
for host in hosts.hosts:
|
||||
conn = self._make_connection(
|
||||
host.hostname,
|
||||
host.port,
|
||||
host.username,
|
||||
host.password,
|
||||
hosts.virtual_host
|
||||
)
|
||||
connections.append(conn)
|
||||
|
||||
self._connections = itertools.cycle(connections)
|
||||
|
||||
# Create exchange.
|
||||
exchange = self._make_exchange(
|
||||
self.exchange,
|
||||
durable=self.durable_queue,
|
||||
auto_delete=self.auto_delete
|
||||
)
|
||||
|
||||
# Create queue.
|
||||
self.queue_name = utils.generate_unicode_uuid()
|
||||
self.callback_queue = kombu.Queue(
|
||||
self.queue_name,
|
||||
exchange=exchange,
|
||||
routing_key=self.queue_name,
|
||||
durable=False,
|
||||
exclusive=True,
|
||||
auto_delete=True
|
||||
)
|
||||
|
||||
self._listener = kombu_listener.KombuRPCListener(
|
||||
connections=self._connections,
|
||||
callback_queue=self.callback_queue
|
||||
)
|
||||
|
||||
self._listener.start()
|
||||
|
||||
def _wait_for_result(self, correlation_id):
|
||||
"""Waits for the result from the server.
|
||||
|
||||
Waits for the result from the server, checks every second if
|
||||
a timeout occurred. If a timeout occurred - the `RpcTimeout` exception
|
||||
will be raised.
|
||||
"""
|
||||
try:
|
||||
return self._listener.get_result(correlation_id, self._timeout)
|
||||
except queue.Empty:
|
||||
raise exc.MistralException(
|
||||
"RPC Request timeout, correlation_id = %s" % correlation_id
|
||||
)
|
||||
|
||||
def _call(self, ctx, method, target, async_=False, **kwargs):
|
||||
"""Performs a remote call for the given method.
|
||||
|
||||
:param ctx: authentication context associated with mistral
|
||||
:param method: name of the method that should be executed
|
||||
:param kwargs: keyword parameters for the remote-method
|
||||
:param target: Server name
|
||||
:param async: bool value means whether the request is
|
||||
asynchronous or not.
|
||||
:return: result of the method or None if async.
|
||||
"""
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
body = {
|
||||
'rpc_ctx': ctx.to_dict(),
|
||||
'rpc_method': method,
|
||||
'arguments': self._serialize_message(kwargs),
|
||||
'async': async_
|
||||
}
|
||||
|
||||
LOG.debug("Publish request: %s", body)
|
||||
|
||||
try:
|
||||
if not async_:
|
||||
self._listener.add_listener(correlation_id)
|
||||
|
||||
# Publish request.
|
||||
for retry_round in range(EPIPE_RETRIES):
|
||||
if self._publish_request(body, correlation_id):
|
||||
break
|
||||
|
||||
# Start waiting for response.
|
||||
if async_:
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
"Waiting a reply for sync call [reply_to = %s]",
|
||||
self.queue_name
|
||||
)
|
||||
|
||||
result = self._wait_for_result(correlation_id)
|
||||
res_type = result[kombu_base.TYPE]
|
||||
res_object = result[kombu_base.RESULT]
|
||||
|
||||
if res_type == 'error':
|
||||
raise res_object
|
||||
else:
|
||||
res_object = self._deserialize_message(res_object)['body']
|
||||
|
||||
finally:
|
||||
if not async_:
|
||||
self._listener.remove_listener(correlation_id)
|
||||
|
||||
return res_object
|
||||
|
||||
def _publish_request(self, body, correlation_id):
|
||||
"""Publishes the request message
|
||||
|
||||
.. note::
|
||||
The :const:`errno.EPIPE` socket errors are suppressed
|
||||
and result in False being returned. This is because
|
||||
this type of error can usually be fixed by retrying.
|
||||
|
||||
:param body: message body
|
||||
:param correlation_id: correlation id
|
||||
:return: True if publish succeeded, False otherwise
|
||||
:rtype: bool
|
||||
"""
|
||||
try:
|
||||
conn = self._listener.wait_ready()
|
||||
if conn:
|
||||
with kombu.producers[conn].acquire(block=True) as producer:
|
||||
producer.publish(
|
||||
body=body,
|
||||
exchange=self.exchange,
|
||||
routing_key=self.topic,
|
||||
reply_to=self.queue_name,
|
||||
correlation_id=correlation_id,
|
||||
delivery_mode=2
|
||||
)
|
||||
return True
|
||||
except socket.error as e:
|
||||
if e.errno != errno.EPIPE:
|
||||
raise
|
||||
else:
|
||||
LOG.debug('Retrying publish due to broker connection failure')
|
||||
return False
|
||||
|
||||
def sync_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._call(ctx, method, async_=False, target=target, **kwargs)
|
||||
|
||||
def async_call(self, ctx, method, target=None, fanout=False, **kwargs):
|
||||
return self._call(ctx, method, async_=True, target=target, **kwargs)
|
@ -1,35 +0,0 @@
|
||||
# Copyright (c) 2017 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import random
|
||||
|
||||
import oslo_messaging as messaging
|
||||
|
||||
|
||||
class KombuHosts(object):
|
||||
def __init__(self, conf):
|
||||
transport_url = messaging.TransportURL.parse(conf, conf.transport_url)
|
||||
|
||||
self.virtual_host = transport_url.virtual_host
|
||||
self.hosts = transport_url.hosts
|
||||
|
||||
if len(self.hosts) > 1:
|
||||
random.shuffle(self.hosts)
|
||||
|
||||
self._hosts_cycle = itertools.cycle(self.hosts)
|
||||
|
||||
def get_host(self):
|
||||
return next(self._hosts_cycle)
|
@ -1,127 +0,0 @@
|
||||
# Copyright (c) 2016 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
from kombu.mixins import ConsumerMixin
|
||||
import queue
|
||||
import threading
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KombuRPCListener(ConsumerMixin):
|
||||
|
||||
def __init__(self, connections, callback_queue):
|
||||
self._results = {}
|
||||
self._connections = itertools.cycle(connections)
|
||||
self._callback_queue = callback_queue
|
||||
self._thread = None
|
||||
self.connection = next(self._connections)
|
||||
|
||||
self.ready = threading.Event()
|
||||
|
||||
def add_listener(self, correlation_id):
|
||||
self._results[correlation_id] = queue.Queue()
|
||||
|
||||
def remove_listener(self, correlation_id):
|
||||
if correlation_id in self._results:
|
||||
del self._results[correlation_id]
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
consumers = [Consumer(
|
||||
self._callback_queue,
|
||||
callbacks=[self.on_message],
|
||||
accept=['pickle', 'json']
|
||||
)]
|
||||
self.ready.set()
|
||||
|
||||
return consumers
|
||||
|
||||
def start(self):
|
||||
if self._thread is None:
|
||||
self._thread = threading.Thread(target=self.run)
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def on_message(self, response, message):
|
||||
"""Callback on response.
|
||||
|
||||
This method is automatically called when a response is incoming and
|
||||
decides if it is the message we are waiting for - the message with the
|
||||
result.
|
||||
|
||||
:param response: the body of the amqp message already deserialized
|
||||
by kombu
|
||||
:param message: the plain amqp kombu.message with additional
|
||||
information
|
||||
"""
|
||||
LOG.debug("Got response: {}", response)
|
||||
|
||||
try:
|
||||
message.ack()
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to acknowledge AMQP message: %s", e)
|
||||
else:
|
||||
LOG.debug("AMQP message acknowledged.")
|
||||
|
||||
correlation_id = message.properties['correlation_id']
|
||||
|
||||
queue = self._results.get(correlation_id)
|
||||
|
||||
if queue:
|
||||
result = {
|
||||
kombu_base.TYPE: 'error'
|
||||
if message.properties.get('type') == 'error'
|
||||
else None,
|
||||
kombu_base.RESULT: response
|
||||
}
|
||||
|
||||
queue.put(result)
|
||||
else:
|
||||
LOG.debug(
|
||||
"Got a response, but seems like no process is waiting for "
|
||||
"it [correlation_id={}]", correlation_id
|
||||
)
|
||||
|
||||
def get_result(self, correlation_id, timeout):
|
||||
return self._results[correlation_id].get(block=True, timeout=timeout)
|
||||
|
||||
def on_connection_error(self, exc, interval):
|
||||
self.ready.clear()
|
||||
|
||||
self.connection = next(self._connections)
|
||||
|
||||
LOG.debug("Broker connection failed: %s", exc)
|
||||
LOG.debug(
|
||||
"Sleeping for %s seconds, then retrying connection",
|
||||
interval
|
||||
)
|
||||
|
||||
def wait_ready(self, timeout=10.0):
|
||||
"""Waits for the listener to successfully declare the consumer
|
||||
|
||||
:param timeout: timeout for waiting in seconds
|
||||
:return: same as :func:`~threading.Event.wait`
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if self.ready.wait(timeout=timeout):
|
||||
return self.connection
|
||||
else:
|
||||
return False
|
@ -1,281 +0,0 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import amqp
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
import kombu
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from stevedore import driver
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral import exceptions as exc
|
||||
from mistral.rpc import base as rpc_base
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
from mistral.rpc.kombu import kombu_hosts
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
def __init__(self, conf):
|
||||
super(KombuRPCServer, self).__init__(conf)
|
||||
|
||||
kombu_base.set_transport_options()
|
||||
|
||||
self._register_mistral_serialization()
|
||||
|
||||
self.topic = conf.topic
|
||||
self.server_id = conf.host
|
||||
|
||||
self._hosts = kombu_hosts.KombuHosts(CONF)
|
||||
|
||||
# NOTE(amorin) let's use a hardcoded value until we deprecate the
|
||||
# kombu RPC driver
|
||||
self._executor_threads = 64
|
||||
self.exchange = CONF.control_exchange
|
||||
# TODO(rakhmerov): We shouldn't rely on any properties related
|
||||
# to oslo.messaging. Only "transport_url" should matter.
|
||||
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
|
||||
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
|
||||
self.routing_key = self.topic
|
||||
self.channel = None
|
||||
self.conn = None
|
||||
self._running = threading.Event()
|
||||
self._stopped = threading.Event()
|
||||
self.endpoints = []
|
||||
self._worker = None
|
||||
self._thread = None
|
||||
|
||||
# TODO(ddeja): Those 2 options should be gathered from config.
|
||||
self._sleep_time = 1
|
||||
self._max_sleep_time = 10
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
"""Return whether server is running."""
|
||||
return self._running.is_set()
|
||||
|
||||
def run(self, executor='threading'):
|
||||
if self._thread is None:
|
||||
self._thread = threading.Thread(target=self._run, args=(executor,))
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def _run(self, executor):
|
||||
"""Start the server."""
|
||||
self._prepare_worker(executor)
|
||||
|
||||
while True:
|
||||
try:
|
||||
_retry_connection = False
|
||||
host = self._hosts.get_host()
|
||||
|
||||
self.conn = self._make_connection(
|
||||
host.hostname,
|
||||
host.port,
|
||||
host.username,
|
||||
host.password,
|
||||
self._hosts.virtual_host,
|
||||
)
|
||||
|
||||
conn = kombu.connections[self.conn].acquire(block=True)
|
||||
|
||||
exchange = self._make_exchange(
|
||||
self.exchange,
|
||||
durable=self.durable_queue,
|
||||
auto_delete=self.auto_delete
|
||||
)
|
||||
|
||||
queue = self._make_queue(
|
||||
self.topic,
|
||||
exchange,
|
||||
routing_key=self.routing_key,
|
||||
durable=self.durable_queue,
|
||||
auto_delete=self.auto_delete
|
||||
)
|
||||
with conn.Consumer(
|
||||
queues=queue,
|
||||
callbacks=[self._process_message],
|
||||
) as consumer:
|
||||
consumer.qos(prefetch_count=1)
|
||||
|
||||
self._running.set()
|
||||
self._stopped.clear()
|
||||
|
||||
LOG.info(
|
||||
"Connected to AMQP at %s:%s",
|
||||
host.hostname,
|
||||
host.port
|
||||
)
|
||||
self._sleep_time = 1
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
conn.drain_events(timeout=1)
|
||||
except socket.timeout:
|
||||
pass
|
||||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
|
||||
LOG.info(
|
||||
"Server with id='%s' stopped.",
|
||||
self.server_id
|
||||
)
|
||||
|
||||
return
|
||||
except (socket.error, amqp.exceptions.ConnectionForced) as e:
|
||||
LOG.debug("Broker connection failed: %s", e)
|
||||
|
||||
_retry_connection = True
|
||||
finally:
|
||||
self._stopped.set()
|
||||
|
||||
if _retry_connection:
|
||||
LOG.debug(
|
||||
"Sleeping for %s seconds, then retrying "
|
||||
"connection",
|
||||
self._sleep_time
|
||||
)
|
||||
|
||||
time.sleep(self._sleep_time)
|
||||
|
||||
self._sleep_time = min(
|
||||
self._sleep_time * 2,
|
||||
self._max_sleep_time
|
||||
)
|
||||
|
||||
def stop(self, graceful=False):
|
||||
self._running.clear()
|
||||
|
||||
if graceful:
|
||||
self.wait()
|
||||
|
||||
def wait(self):
|
||||
self._stopped.wait()
|
||||
|
||||
try:
|
||||
self._worker.shutdown(wait=True)
|
||||
except AttributeError as e:
|
||||
LOG.warning("Cannot stop worker in graceful way: %s", e)
|
||||
|
||||
def _get_rpc_method(self, method_name):
|
||||
for endpoint in self.endpoints:
|
||||
if hasattr(endpoint, method_name):
|
||||
return getattr(endpoint, method_name)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _set_auth_ctx(ctx):
|
||||
if not isinstance(ctx, dict):
|
||||
return
|
||||
|
||||
context = auth_ctx.MistralContext.from_dict(ctx)
|
||||
auth_ctx.set_ctx(context)
|
||||
|
||||
return context
|
||||
|
||||
def publish_message(self, body, reply_to, corr_id, res_type='response'):
|
||||
if res_type != 'error':
|
||||
body = self._serialize_message({'body': body})
|
||||
|
||||
with kombu.producers[self.conn].acquire(block=True) as producer:
|
||||
producer.publish(
|
||||
body=body,
|
||||
exchange=self.exchange,
|
||||
routing_key=reply_to,
|
||||
correlation_id=corr_id,
|
||||
serializer='pickle' if res_type == 'error' else 'json',
|
||||
type=res_type
|
||||
)
|
||||
|
||||
def _on_message_safe(self, request, message):
|
||||
try:
|
||||
return self._on_message(request, message)
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
"Got exception while consuming message. Exception would be "
|
||||
"send back to the caller."
|
||||
)
|
||||
LOG.debug("Exceptions: %s", str(e))
|
||||
|
||||
# Wrap exception into another exception for compatibility
|
||||
# with oslo.
|
||||
self.publish_message(
|
||||
exc.KombuException(e),
|
||||
message.properties['reply_to'],
|
||||
message.properties['correlation_id'],
|
||||
res_type='error'
|
||||
)
|
||||
finally:
|
||||
message.ack()
|
||||
|
||||
def _on_message(self, request, message):
|
||||
LOG.debug('Received message %s', request)
|
||||
|
||||
is_async = request.get('async', False)
|
||||
rpc_ctx = request.get('rpc_ctx')
|
||||
redelivered = message.delivery_info.get('redelivered')
|
||||
rpc_method_name = request.get('rpc_method')
|
||||
arguments = self._deserialize_message(request.get('arguments'))
|
||||
correlation_id = message.properties['correlation_id']
|
||||
reply_to = message.properties['reply_to']
|
||||
|
||||
if redelivered is not None:
|
||||
rpc_ctx['redelivered'] = redelivered
|
||||
|
||||
rpc_context = self._set_auth_ctx(rpc_ctx)
|
||||
|
||||
rpc_method = self._get_rpc_method(rpc_method_name)
|
||||
|
||||
if not rpc_method:
|
||||
raise exc.MistralException("No such method: %s" % rpc_method_name)
|
||||
|
||||
response = rpc_method(rpc_ctx=rpc_context, **arguments)
|
||||
|
||||
if not is_async:
|
||||
LOG.debug(
|
||||
"RPC server sent a reply [reply_to = %s, correlation_id = %s",
|
||||
reply_to,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
self.publish_message(
|
||||
response,
|
||||
reply_to,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
def register_endpoint(self, endpoint):
|
||||
self.endpoints.append(endpoint)
|
||||
|
||||
def _process_message(self, request, message):
|
||||
self._worker.submit(self._on_message_safe, request, message)
|
||||
|
||||
def _prepare_worker(self, executor='blocking'):
|
||||
mgr = driver.DriverManager('kombu_driver.executors', executor)
|
||||
|
||||
executor_opts = {}
|
||||
|
||||
if executor != 'blocking':
|
||||
executor_opts['max_workers'] = self._executor_threads
|
||||
|
||||
self._worker = mgr.driver(**executor_opts)
|
@ -1,28 +0,0 @@
|
||||
# Copyright (c) 2016 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mistral import config as cfg
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
from mistral.tests.unit import base
|
||||
|
||||
|
||||
class KombuTestCase(base.BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
super(KombuTestCase, self).setUp()
|
||||
|
||||
kombu_base.set_transport_options(check_backend=False)
|
||||
|
||||
cfg.CONF.set_default('transport_url', 'rabbit://localhost:567')
|
@ -1,48 +0,0 @@
|
||||
# Copyright (c) 2016 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from kombu import mixins as mx
|
||||
from unittest import mock
|
||||
|
||||
|
||||
# Hack for making tests works with kombu listener
|
||||
mixins = mx
|
||||
|
||||
producer = mock.MagicMock()
|
||||
|
||||
producers = mock.MagicMock()
|
||||
producers.__getitem__ = lambda *args, **kwargs: producer
|
||||
|
||||
connection = mock.MagicMock()
|
||||
|
||||
connections = mock.MagicMock()
|
||||
connections.__getitem__ = lambda *args, **kwargs: connection
|
||||
|
||||
serialization = mock.MagicMock()
|
||||
|
||||
|
||||
def BrokerConnection(*args, **kwargs):
|
||||
return mock.MagicMock()
|
||||
|
||||
|
||||
def Exchange(*args, **kwargs):
|
||||
return mock.MagicMock()
|
||||
|
||||
|
||||
def Queue(*args, **kwargs):
|
||||
return mock.MagicMock()
|
||||
|
||||
|
||||
def Consumer(*args, **kwargs):
|
||||
return mock.MagicMock()
|
@ -1,100 +0,0 @@
|
||||
# Copyright (c) 2016 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mistral import exceptions as exc
|
||||
from mistral.tests.unit.rpc.kombu import base
|
||||
from mistral.tests.unit.rpc.kombu import fake_kombu
|
||||
from unittest import mock
|
||||
|
||||
import queue
|
||||
|
||||
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
from mistral.rpc.kombu import kombu_client
|
||||
|
||||
|
||||
class TestException(exc.MistralException):
|
||||
pass
|
||||
|
||||
|
||||
class KombuClientTest(base.KombuTestCase):
|
||||
|
||||
_RESPONSE = "response"
|
||||
|
||||
def setUp(self):
|
||||
super(KombuClientTest, self).setUp()
|
||||
|
||||
conf = mock.MagicMock()
|
||||
|
||||
listener_class = kombu_client.kombu_listener.KombuRPCListener
|
||||
|
||||
kombu_client.kombu_listener.KombuRPCListener = mock.MagicMock()
|
||||
|
||||
def restore_listener():
|
||||
kombu_client.kombu_listener.KombuRPCListener = listener_class
|
||||
|
||||
self.addCleanup(restore_listener)
|
||||
|
||||
self.client = kombu_client.KombuRPCClient(conf)
|
||||
self.ctx = type(
|
||||
'context',
|
||||
(object,),
|
||||
{'to_dict': lambda self: {}}
|
||||
)()
|
||||
|
||||
def test_sync_call_result_get(self):
|
||||
self.client._listener.get_result = mock.MagicMock(
|
||||
return_value={
|
||||
kombu_base.TYPE: None,
|
||||
kombu_base.RESULT: self.client._serialize_message({
|
||||
'body': self._RESPONSE
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
response = self.client.sync_call(self.ctx, 'method')
|
||||
|
||||
self.assertEqual(response, self._RESPONSE)
|
||||
|
||||
def test_sync_call_result_not_get(self):
|
||||
self.client._listener.get_result = mock.MagicMock(
|
||||
side_effect=queue.Empty
|
||||
)
|
||||
|
||||
self.assertRaises(
|
||||
exc.MistralException,
|
||||
self.client.sync_call,
|
||||
self.ctx,
|
||||
'method_not_found'
|
||||
)
|
||||
|
||||
def test_sync_call_result_type_error(self):
|
||||
def side_effect(*args, **kwargs):
|
||||
return {
|
||||
kombu_base.TYPE: 'error',
|
||||
kombu_base.RESULT: TestException()
|
||||
}
|
||||
|
||||
self.client._wait_for_result = mock.MagicMock(side_effect=side_effect)
|
||||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.client.sync_call,
|
||||
self.ctx,
|
||||
'method'
|
||||
)
|
||||
|
||||
def test_async_call(self):
|
||||
self.assertIsNone(self.client.async_call(self.ctx, 'method'))
|
@ -1,101 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral.rpc.kombu import kombu_hosts
|
||||
from mistral.tests.unit import base
|
||||
from oslo_config import cfg
|
||||
|
||||
import functools
|
||||
import oslo_messaging
|
||||
|
||||
|
||||
HOST_1 = 'rabbitmq_1'
|
||||
PORT_1 = 5671
|
||||
HOST_2 = 'rabbitmq_2'
|
||||
PORT_2 = 5672
|
||||
USER_1 = 'u_mistral_1'
|
||||
PASSWORD_1 = 'p_mistral_1'
|
||||
USER_2 = 'u_mistral_2'
|
||||
PASSWORD_2 = 'p_mistral_2'
|
||||
VIRTUAL_HOST_1 = 'vhost_1'
|
||||
VIRTUAL_HOST_2 = 'vhost_2'
|
||||
|
||||
|
||||
class KombuHostsTest(base.BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
super(KombuHostsTest, self).setUp()
|
||||
|
||||
# Oslo messaging set a default config option
|
||||
oslo_messaging.get_transport(cfg.CONF)
|
||||
|
||||
def assert_transports_host(self, expected, result):
|
||||
sorted_by_host = functools.partial(sorted, key=lambda x: x.hostname)
|
||||
|
||||
self.assertListEqual(sorted_by_host(expected), sorted_by_host(result))
|
||||
|
||||
def test_transport_url(self):
|
||||
self.override_config(
|
||||
'transport_url',
|
||||
'rabbit://{user}:{password}@{host}:{port}/{virtual_host}'.format(
|
||||
user=USER_1, port=PORT_1, host=HOST_1,
|
||||
password=PASSWORD_1,
|
||||
virtual_host=VIRTUAL_HOST_1
|
||||
))
|
||||
|
||||
hosts = kombu_hosts.KombuHosts(cfg.CONF)
|
||||
|
||||
self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host)
|
||||
self.assert_transports_host([oslo_messaging.TransportHost(
|
||||
hostname=HOST_1,
|
||||
port=PORT_1,
|
||||
username=USER_1,
|
||||
password=PASSWORD_1,
|
||||
)], hosts.hosts)
|
||||
|
||||
def test_transport_url_multiple_hosts(self):
|
||||
self.override_config(
|
||||
'transport_url',
|
||||
'rabbit://{user_1}:{password_1}@{host_1}:{port_1},'
|
||||
'{user_2}:{password_2}@{host_2}:{port_2}/{virtual_host}'.format(
|
||||
user_1=USER_1,
|
||||
password_1=PASSWORD_1,
|
||||
port_1=PORT_1,
|
||||
host_1=HOST_1,
|
||||
user_2=USER_2,
|
||||
password_2=PASSWORD_2,
|
||||
host_2=HOST_2,
|
||||
port_2=PORT_2,
|
||||
virtual_host=VIRTUAL_HOST_1
|
||||
))
|
||||
|
||||
hosts = kombu_hosts.KombuHosts(cfg.CONF)
|
||||
|
||||
self.assertEqual(VIRTUAL_HOST_1, hosts.virtual_host)
|
||||
self.assert_transports_host(
|
||||
[
|
||||
oslo_messaging.TransportHost(
|
||||
hostname=HOST_1,
|
||||
port=PORT_1,
|
||||
username=USER_1,
|
||||
password=PASSWORD_1
|
||||
),
|
||||
oslo_messaging.TransportHost(
|
||||
hostname=HOST_2,
|
||||
port=PORT_2,
|
||||
username=USER_2,
|
||||
password=PASSWORD_2
|
||||
)
|
||||
],
|
||||
hosts.hosts
|
||||
)
|
@ -1,219 +0,0 @@
|
||||
# Copyright (c) 2017 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mistral import exceptions as exc
|
||||
from mistral.tests.unit.rpc.kombu import base
|
||||
from mistral.tests.unit.rpc.kombu import fake_kombu
|
||||
from mistral_lib import utils
|
||||
from unittest import mock
|
||||
|
||||
import queue
|
||||
|
||||
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||
from mistral.rpc.kombu import base as kombu_base
|
||||
from mistral.rpc.kombu import kombu_listener
|
||||
|
||||
|
||||
class TestException(exc.MistralException):
|
||||
pass
|
||||
|
||||
|
||||
class KombuListenerTest(base.KombuTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(KombuListenerTest, self).setUp()
|
||||
|
||||
self.listener = kombu_listener.KombuRPCListener(
|
||||
[mock.MagicMock()],
|
||||
mock.MagicMock()
|
||||
)
|
||||
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
|
||||
|
||||
def test_add_listener(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
|
||||
self.assertEqual(
|
||||
type(self.listener._results.get(correlation_id)),
|
||||
queue.Queue
|
||||
)
|
||||
|
||||
self.assertEqual(0, self.listener._results[correlation_id].qsize())
|
||||
|
||||
def test_remove_listener_correlation_id_in_results(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
|
||||
self.assertEqual(
|
||||
type(self.listener._results.get(correlation_id)),
|
||||
queue.Queue
|
||||
)
|
||||
|
||||
self.listener.remove_listener(correlation_id)
|
||||
|
||||
self.assertIsNone(
|
||||
self.listener._results.get(correlation_id)
|
||||
)
|
||||
|
||||
def test_remove_listener_correlation_id_not_in_results(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
|
||||
self.assertEqual(
|
||||
type(self.listener._results.get(correlation_id)),
|
||||
queue.Queue
|
||||
)
|
||||
|
||||
self.listener.remove_listener(utils.generate_unicode_uuid())
|
||||
|
||||
self.assertEqual(
|
||||
type(self.listener._results.get(correlation_id)),
|
||||
queue.Queue
|
||||
)
|
||||
|
||||
@mock.patch('threading.Thread')
|
||||
def test_start_thread_not_set(self, thread_class_mock):
|
||||
thread_mock = mock.MagicMock()
|
||||
thread_class_mock.return_value = thread_mock
|
||||
|
||||
self.listener.start()
|
||||
|
||||
self.assertTrue(thread_mock.daemon)
|
||||
self.assertEqual(thread_mock.start.call_count, 1)
|
||||
|
||||
@mock.patch('threading.Thread')
|
||||
def test_start_thread_set(self, thread_class_mock):
|
||||
thread_mock = mock.MagicMock()
|
||||
thread_class_mock.return_value = thread_mock
|
||||
|
||||
self.listener._thread = mock.MagicMock()
|
||||
self.listener.start()
|
||||
|
||||
self.assertEqual(thread_mock.start.call_count, 0)
|
||||
|
||||
def test_get_result_results_in_queue(self):
|
||||
expected_result = 'abcd'
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
self.listener._results.get(correlation_id).put(expected_result)
|
||||
|
||||
result = self.listener.get_result(correlation_id, 5)
|
||||
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
def test_get_result_not_in_queue(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
|
||||
self.assertRaises(
|
||||
queue.Empty,
|
||||
self.listener.get_result,
|
||||
correlation_id,
|
||||
1 # timeout
|
||||
)
|
||||
|
||||
def test_get_result_lack_of_queue(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
self.assertRaises(
|
||||
KeyError,
|
||||
self.listener.get_result,
|
||||
correlation_id,
|
||||
1 # timeout
|
||||
)
|
||||
|
||||
def test__on_response_message_ack_fail(self):
|
||||
message = mock.MagicMock()
|
||||
message.ack.side_effect = Exception('Test Exception')
|
||||
response = 'response'
|
||||
|
||||
kombu_listener.LOG = mock.MagicMock()
|
||||
|
||||
self.listener.on_message(response, message)
|
||||
self.assertEqual(kombu_listener.LOG.debug.call_count, 1)
|
||||
self.assertEqual(kombu_listener.LOG.exception.call_count, 1)
|
||||
|
||||
def test__on_response_message_ack_ok_corr_id_not_match(self):
|
||||
message = mock.MagicMock()
|
||||
message.properties = mock.MagicMock()
|
||||
message.properties.__getitem__ = lambda *args, **kwargs: True
|
||||
response = 'response'
|
||||
|
||||
kombu_listener.LOG = mock.MagicMock()
|
||||
|
||||
self.listener.on_message(response, message)
|
||||
self.assertEqual(kombu_listener.LOG.debug.call_count, 3)
|
||||
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
|
||||
|
||||
def test__on_response_message_ack_ok_messsage_type_error(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
message = mock.MagicMock()
|
||||
message.properties = dict()
|
||||
message.properties['type'] = 'error'
|
||||
message.properties['correlation_id'] = correlation_id
|
||||
|
||||
response = TestException('response')
|
||||
|
||||
kombu_listener.LOG = mock.MagicMock()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
self.listener.on_message(response, message)
|
||||
|
||||
self.assertEqual(kombu_listener.LOG.debug.call_count, 2)
|
||||
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
|
||||
|
||||
result = self.listener.get_result(correlation_id, 5)
|
||||
|
||||
self.assertDictEqual(
|
||||
result,
|
||||
{
|
||||
kombu_base.TYPE: 'error',
|
||||
kombu_base.RESULT: response
|
||||
}
|
||||
)
|
||||
|
||||
def test__on_response_message_ack_ok(self):
|
||||
correlation_id = utils.generate_unicode_uuid()
|
||||
|
||||
message = mock.MagicMock()
|
||||
message.properties = dict()
|
||||
message.properties['type'] = None
|
||||
message.properties['correlation_id'] = correlation_id
|
||||
|
||||
response = 'response'
|
||||
|
||||
kombu_listener.LOG = mock.MagicMock()
|
||||
|
||||
self.listener.add_listener(correlation_id)
|
||||
self.listener.on_message(response, message)
|
||||
|
||||
self.assertEqual(kombu_listener.LOG.debug.call_count, 2)
|
||||
self.assertEqual(kombu_listener.LOG.exception.call_count, 0)
|
||||
|
||||
result = self.listener.get_result(correlation_id, 5)
|
||||
|
||||
self.assertDictEqual(
|
||||
result,
|
||||
{
|
||||
kombu_base.TYPE: None,
|
||||
kombu_base.RESULT: response
|
||||
}
|
||||
)
|
@ -1,311 +0,0 @@
|
||||
# Copyright (c) 2016 Intel Corporation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import futurist
|
||||
from mistral import context
|
||||
from mistral import exceptions as exc
|
||||
from mistral.tests.unit.rpc.kombu import base
|
||||
from mistral.tests.unit.rpc.kombu import fake_kombu
|
||||
from unittest import mock
|
||||
|
||||
import socket
|
||||
import time
|
||||
|
||||
from stevedore import driver
|
||||
|
||||
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||
from mistral.rpc.kombu import kombu_server
|
||||
|
||||
|
||||
class TestException(exc.MistralError):
|
||||
pass
|
||||
|
||||
|
||||
class KombuServerTest(base.KombuTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(KombuServerTest, self).setUp()
|
||||
|
||||
self.conf = mock.MagicMock()
|
||||
self.conf.host = 'fakehost'
|
||||
self.server = kombu_server.KombuRPCServer(self.conf)
|
||||
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
|
||||
|
||||
def test_is_running_is_running(self):
|
||||
self.server._running.set()
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
def test_is_running_is_not_running(self):
|
||||
self.server._running.clear()
|
||||
self.assertFalse(self.server.is_running)
|
||||
|
||||
def test_stop(self):
|
||||
self.server.stop()
|
||||
self.assertFalse(self.server.is_running)
|
||||
|
||||
def test_publish_message(self):
|
||||
body = 'body'
|
||||
reply_to = 'reply_to'
|
||||
corr_id = 'corr_id'
|
||||
type = 'type'
|
||||
|
||||
acquire_mock = mock.MagicMock()
|
||||
fake_kombu.producer.acquire.return_value = acquire_mock
|
||||
|
||||
enter_mock = mock.MagicMock()
|
||||
acquire_mock.__enter__.return_value = enter_mock
|
||||
|
||||
self.server.publish_message(body, reply_to, corr_id, type)
|
||||
enter_mock.publish.assert_called_once_with(
|
||||
body={'body': '"body"'},
|
||||
exchange='openstack',
|
||||
routing_key=reply_to,
|
||||
correlation_id=corr_id,
|
||||
type=type,
|
||||
serializer='json'
|
||||
)
|
||||
|
||||
def test_run_launch_successfully(self):
|
||||
acquire_mock = mock.MagicMock()
|
||||
acquire_mock.drain_events.side_effect = TestException()
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
def test_run_launch_successfully_than_stop(self):
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
self.assertTrue(self.server.is_running)
|
||||
raise KeyboardInterrupt
|
||||
|
||||
acquire_mock = mock.MagicMock()
|
||||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.server._run('blocking')
|
||||
self.assertFalse(self.server.is_running)
|
||||
self.assertEqual(self.server._sleep_time, 1)
|
||||
|
||||
def test_run_socket_error_reconnect(self):
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
if acquire_mock.drain_events.call_count == 1:
|
||||
raise socket.error()
|
||||
raise TestException()
|
||||
|
||||
acquire_mock = mock.MagicMock()
|
||||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertEqual(self.server._sleep_time, 1)
|
||||
|
||||
def test_run_socket_timeout_still_running(self):
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
if acquire_mock.drain_events.call_count == 0:
|
||||
raise socket.timeout()
|
||||
raise TestException()
|
||||
|
||||
acquire_mock = mock.MagicMock()
|
||||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.server._run,
|
||||
'blocking'
|
||||
)
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
def test_run_keyboard_interrupt_not_running(self):
|
||||
acquire_mock = mock.MagicMock()
|
||||
acquire_mock.drain_events.side_effect = KeyboardInterrupt()
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertIsNone(self.server.run())
|
||||
# Wait 1 sec so the thread start listening on RPC and receive the
|
||||
# side_effect
|
||||
time.sleep(1)
|
||||
self.assertFalse(self.server.is_running)
|
||||
|
||||
@mock.patch.object(
|
||||
kombu_server.KombuRPCServer,
|
||||
'_on_message',
|
||||
mock.MagicMock()
|
||||
)
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
def test__on_message_safe_message_processing_ok(self, publish_message):
|
||||
message = mock.MagicMock()
|
||||
|
||||
self.server._on_message_safe(None, message)
|
||||
|
||||
self.assertEqual(message.ack.call_count, 1)
|
||||
self.assertEqual(publish_message.call_count, 0)
|
||||
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, '_on_message')
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
def test__on_message_safe_message_processing_raise(
|
||||
self,
|
||||
publish_message,
|
||||
_on_message
|
||||
):
|
||||
reply_to = 'reply_to'
|
||||
correlation_id = 'corr_id'
|
||||
message = mock.MagicMock()
|
||||
message.properties = {
|
||||
'reply_to': reply_to,
|
||||
'correlation_id': correlation_id
|
||||
}
|
||||
|
||||
test_exception = TestException()
|
||||
_on_message.side_effect = test_exception
|
||||
|
||||
self.server._on_message_safe(None, message)
|
||||
|
||||
self.assertEqual(message.ack.call_count, 1)
|
||||
self.assertEqual(publish_message.call_count, 1)
|
||||
|
||||
@mock.patch.object(
|
||||
kombu_server.KombuRPCServer,
|
||||
'_get_rpc_method',
|
||||
mock.MagicMock(return_value=None)
|
||||
)
|
||||
def test__on_message_rpc_method_not_found(self):
|
||||
request = {
|
||||
'rpc_ctx': {},
|
||||
'rpc_method': 'not_found_method',
|
||||
'arguments': {}
|
||||
}
|
||||
|
||||
message = mock.MagicMock()
|
||||
message.properties = {
|
||||
'reply_to': None,
|
||||
'correlation_id': None
|
||||
}
|
||||
|
||||
self.assertRaises(
|
||||
exc.MistralException,
|
||||
self.server._on_message,
|
||||
request,
|
||||
message
|
||||
)
|
||||
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||
@mock.patch('mistral.context.MistralContext.from_dict')
|
||||
def test__on_message_is_async(self, mock_get_context, get_rpc_method,
|
||||
publish_message):
|
||||
result = 'result'
|
||||
request = {
|
||||
'async': True,
|
||||
'rpc_ctx': {},
|
||||
'rpc_method': 'found_method',
|
||||
'arguments': self.server._serialize_message({
|
||||
'a': 1,
|
||||
'b': 2
|
||||
})
|
||||
}
|
||||
|
||||
message = mock.MagicMock()
|
||||
message.properties = {
|
||||
'reply_to': None,
|
||||
'correlation_id': None
|
||||
}
|
||||
message.delivery_info.get.return_value = False
|
||||
|
||||
rpc_method = mock.MagicMock(return_value=result)
|
||||
get_rpc_method.return_value = rpc_method
|
||||
|
||||
ctx = context.MistralContext()
|
||||
mock_get_context.return_value = ctx
|
||||
|
||||
self.server._on_message(request, message)
|
||||
|
||||
rpc_method.assert_called_once_with(
|
||||
rpc_ctx=ctx,
|
||||
a=1,
|
||||
b=2
|
||||
)
|
||||
self.assertEqual(publish_message.call_count, 0)
|
||||
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||
@mock.patch('mistral.context.MistralContext.from_dict')
|
||||
def test__on_message_is_sync(self, mock_get_context, get_rpc_method,
|
||||
publish_message):
|
||||
result = 'result'
|
||||
request = {
|
||||
'async': False,
|
||||
'rpc_ctx': {},
|
||||
'rpc_method': 'found_method',
|
||||
'arguments': self.server._serialize_message({
|
||||
'a': 1,
|
||||
'b': 2
|
||||
})
|
||||
}
|
||||
|
||||
reply_to = 'reply_to'
|
||||
correlation_id = 'corr_id'
|
||||
message = mock.MagicMock()
|
||||
message.properties = {
|
||||
'reply_to': reply_to,
|
||||
'correlation_id': correlation_id
|
||||
}
|
||||
message.delivery_info.get.return_value = False
|
||||
|
||||
rpc_method = mock.MagicMock(return_value=result)
|
||||
get_rpc_method.return_value = rpc_method
|
||||
|
||||
ctx = context.MistralContext()
|
||||
mock_get_context.return_value = ctx
|
||||
|
||||
self.server._on_message(request, message)
|
||||
|
||||
rpc_method.assert_called_once_with(
|
||||
rpc_ctx=ctx,
|
||||
a=1,
|
||||
b=2
|
||||
)
|
||||
publish_message.assert_called_once_with(
|
||||
result,
|
||||
reply_to,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
def test__prepare_worker(self):
|
||||
self.server._prepare_worker('blocking')
|
||||
self.assertEqual(
|
||||
futurist.SynchronousExecutor,
|
||||
type(self.server._worker)
|
||||
)
|
||||
|
||||
self.server._prepare_worker('threading')
|
||||
self.assertEqual(
|
||||
futurist.ThreadPoolExecutor,
|
||||
type(self.server._worker)
|
||||
)
|
||||
|
||||
@mock.patch('stevedore.driver.DriverManager')
|
||||
def test__prepare_worker_no_valid_executor(self, driver_manager_mock):
|
||||
|
||||
driver_manager_mock.side_effect = driver.NoMatches()
|
||||
|
||||
self.assertRaises(
|
||||
driver.NoMatches,
|
||||
self.server._prepare_worker,
|
||||
'non_valid_executor'
|
||||
)
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
deprecations:
|
||||
- |
|
||||
Kombu RPC driver is now deleted from mistral RPC drivers.
|
||||
Only oslo messaging driver is supported.
|
@ -10,7 +10,6 @@ eventlet>=0.27.0 # MIT
|
||||
Jinja2>=2.10 # BSD License (3 clause)
|
||||
jsonschema>=3.2.0 # MIT
|
||||
keystonemiddleware>=4.18.0 # Apache-2.0
|
||||
kombu!=4.0.2,>=4.6.1 # BSD
|
||||
mistral-lib>=2.3.0 # Apache-2.0
|
||||
networkx>=2.3 # BSD
|
||||
oslo.concurrency>=3.26.0 # Apache-2.0
|
||||
|
@ -36,9 +36,6 @@ wsgi_scripts =
|
||||
mistral.rpc.backends =
|
||||
oslo_client = mistral.rpc.oslo.oslo_client:OsloRPCClient
|
||||
oslo_server = mistral.rpc.oslo.oslo_server:OsloRPCServer
|
||||
# NOTE(amorin) Kombu driver is deprecated
|
||||
kombu_client = mistral.rpc.kombu.kombu_client:KombuRPCClient
|
||||
kombu_server = mistral.rpc.kombu.kombu_server:KombuRPCServer
|
||||
|
||||
oslo.config.opts =
|
||||
mistral.config = mistral.config:list_opts
|
||||
@ -108,12 +105,6 @@ mistral.auth =
|
||||
keystone = mistral.auth.keystone:KeystoneAuthHandler
|
||||
keycloak-oidc = mistral.auth.keycloak:KeycloakAuthHandler
|
||||
|
||||
# NOTE(amorin) Kombu driver is deprecated
|
||||
kombu_driver.executors =
|
||||
blocking = futurist:SynchronousExecutor
|
||||
threading = futurist:ThreadPoolExecutor
|
||||
eventlet = futurist:GreenThreadPoolExecutor
|
||||
|
||||
pygments.lexers =
|
||||
mistral = mistral.ext.pygmentplugin:MistralLexer
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user