diff --git a/.gitreview b/.gitreview index beb811ae3..24d3e8df3 100644 --- a/.gitreview +++ b/.gitreview @@ -2,3 +2,4 @@ host=review.openstack.org port=29418 project=openstack/oslo.messaging.git +branch=feature/pika diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py new file mode 100644 index 000000000..3d633a5b1 --- /dev/null +++ b/oslo_messaging/_drivers/impl_pika.py @@ -0,0 +1,276 @@ +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_messaging import exceptions +import pika_pool +import retrying + +from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc +from oslo_messaging._drivers.pika_driver import pika_listener as pika_drv_lstnr +from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg +from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller + +LOG = logging.getLogger(__name__) + +pika_opts = [ + cfg.IntOpt('channel_max', default=None, + help='Maximum number of channels to allow'), + cfg.IntOpt('frame_max', default=None, + help='The maximum byte size for an AMQP frame'), + cfg.IntOpt('heartbeat_interval', default=1, + help="How often to send heartbeats for consumer's connections"), + cfg.BoolOpt('ssl', default=None, + help='Enable SSL'), + cfg.DictOpt('ssl_options', default=None, + help='Arguments passed to ssl.wrap_socket'), + cfg.FloatOpt('socket_timeout', default=0.25, + help="Set socket timeout in seconds for connection's socket"), + cfg.FloatOpt('tcp_user_timeout', default=0.25, + help="Set TCP_USER_TIMEOUT in seconds for connection's " + "socket"), + cfg.FloatOpt('host_connection_reconnect_delay', default=0.25, + help="Set delay for reconnection to some host which has " + "connection error") +] + +pika_pool_opts = [ + cfg.IntOpt('pool_max_size', default=10, + help="Maximum number of connections to keep queued."), + cfg.IntOpt('pool_max_overflow', default=0, + help="Maximum number of connections to create above " + "`pool_max_size`."), + cfg.IntOpt('pool_timeout', default=30, + help="Default number of seconds to wait for a connections to " + "available"), + cfg.IntOpt('pool_recycle', default=600, + help="Lifetime of a connection (since creation) in seconds " + "or None for no recycling. Expired connections are " + "closed on acquire."), + cfg.IntOpt('pool_stale', default=60, + help="Threshold at which inactive (since release) connections " + "are considered stale in seconds or None for no " + "staleness. Stale connections are closed on acquire.") +] + +notification_opts = [ + cfg.BoolOpt('notification_persistence', default=False, + help="Persist notification messages."), + cfg.StrOpt('default_notification_exchange', + default="${control_exchange}_notification", + help="Exchange name for for sending notifications"), + cfg.IntOpt( + 'default_notification_retry_attempts', default=-1, + help="Reconnecting retry count in case of connectivity problem during " + "sending notification, -1 means infinite retry." + ), + cfg.FloatOpt( + 'notification_retry_delay', default=0.25, + help="Reconnecting retry delay in case of connectivity problem during " + "sending notification message" + ) +] + +rpc_opts = [ + cfg.IntOpt('rpc_queue_expiration', default=60, + help="Time to live for rpc queues without consumers in " + "seconds."), + cfg.StrOpt('default_rpc_exchange', default="${control_exchange}_rpc", + help="Exchange name for for sending RPC messages"), + cfg.StrOpt('rpc_reply_exchange', default="${control_exchange}_rpc_reply", + help="Exchange name for for receiving RPC replies"), + cfg.IntOpt( + 'rpc_listener_prefetch_count', default=10, + help="Max number of not acknowledged message which RabbitMQ can send " + "to rpc listener. Works only if rpc_listener_ack == True" + ), + cfg.IntOpt( + 'rpc_reply_listener_prefetch_count', default=10, + help="Max number of not acknowledged message which RabbitMQ can send " + "to rpc reply listener. Works only if rpc_reply_listener_ack == " + "True" + ), + cfg.IntOpt( + 'rpc_reply_retry_attempts', default=-1, + help="Reconnecting retry count in case of connectivity problem during " + "sending reply. -1 means infinite retry during rpc_timeout" + ), + cfg.FloatOpt( + 'rpc_reply_retry_delay', default=0.25, + help="Reconnecting retry delay in case of connectivity problem during " + "sending reply." + ), + cfg.IntOpt( + 'default_rpc_retry_attempts', default=-1, + help="Reconnecting retry count in case of connectivity problem during " + "sending RPC message, -1 means infinite retry. If actual " + "retry attempts in not 0 the rpc request could be processed more " + "then one time" + ), + cfg.FloatOpt( + 'rpc_retry_delay', default=0.25, + help="Reconnecting retry delay in case of connectivity problem during " + "sending RPC message" + ) +] + + +class PikaDriver(object): + def __init__(self, conf, url, default_exchange=None, + allowed_remote_exmods=None): + opt_group = cfg.OptGroup(name='oslo_messaging_pika', + title='Pika driver options') + conf.register_group(opt_group) + conf.register_opts(pika_opts, group=opt_group) + conf.register_opts(pika_pool_opts, group=opt_group) + conf.register_opts(rpc_opts, group=opt_group) + conf.register_opts(notification_opts, group=opt_group) + + self.conf = conf + + self._pika_engine = pika_drv_engine.PikaEngine( + conf, url, default_exchange, allowed_remote_exmods + ) + self._reply_listener = pika_drv_lstnr.RpcReplyPikaListener( + self._pika_engine + ) + + def require_features(self, requeue=False): + pass + + def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, + retry=None): + expiration_time = None if timeout is None else time.time() + timeout + + if retry is None: + retry = self._pika_engine.default_rpc_retry_attempts + + def on_exception(ex): + if isinstance(ex, (pika_drv_exc.ConnectionException, + exceptions.MessageDeliveryFailure)): + LOG.warn(str(ex)) + return True + else: + return False + + retrier = ( + None if retry == 0 else + retrying.retry( + stop_max_attempt_number=(None if retry == -1 else retry), + retry_on_exception=on_exception, + wait_fixed=self._pika_engine.rpc_retry_delay * 1000, + ) + ) + + msg = pika_drv_msg.RpcPikaOutgoingMessage(self._pika_engine, message, + ctxt) + reply = msg.send( + target, + reply_listener=self._reply_listener if wait_for_reply else None, + expiration_time=expiration_time, + retrier=retrier + ) + + if reply is not None: + if reply.failure is not None: + raise reply.failure + + return reply.result + + def _declare_notification_queue_binding(self, target, timeout=None): + if timeout is not None and timeout < 0: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired." + ) + try: + with (self._pika_engine.connection_without_confirmation_pool + .acquire)(timeout=timeout) as conn: + self._pika_engine.declare_queue_binding_by_channel( + conn.channel, + exchange=( + target.exchange or + self._pika_engine.default_notification_exchange + ), + queue=target.topic, + routing_key=target.topic, + exchange_type='direct', + queue_expiration=None, + durable=self._pika_engine.notification_persistence, + ) + except pika_pool.Timeout as e: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired. {}.".format(str(e)) + ) + + def send_notification(self, target, ctxt, message, version, retry=None): + if retry is None: + retry = self._pika_engine.default_notification_retry_attempts + + def on_exception(ex): + if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException, + pika_drv_exc.RoutingException)): + LOG.warn(str(ex)) + try: + self._declare_notification_queue_binding(target) + except pika_drv_exc.ConnectionException as e: + LOG.warn(str(e)) + return True + elif isinstance(ex, (pika_drv_exc.ConnectionException, + pika_drv_exc.MessageRejectedException)): + LOG.warn(str(ex)) + return True + else: + return False + + retrier = retrying.retry( + stop_max_attempt_number=(None if retry == -1 else retry), + retry_on_exception=on_exception, + wait_fixed=self._pika_engine.notification_retry_delay * 1000, + ) + + msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, + ctxt) + return msg.send( + exchange=( + target.exchange or + self._pika_engine.default_notification_exchange + ), + routing_key=target.topic, + confirm=True, + mandatory=True, + persistent=self._pika_engine.notification_persistence, + retrier=retrier + ) + + def listen(self, target): + listener = pika_drv_poller.RpcServicePikaPoller( + self._pika_engine, target, + prefetch_count=self._pika_engine.rpc_listener_prefetch_count + ) + listener.start() + return listener + + def listen_for_notifications(self, targets_and_priorities, pool): + listener = pika_drv_poller.NotificationPikaPoller( + self._pika_engine, targets_and_priorities, pool + ) + listener.start() + return listener + + def cleanup(self): + self._reply_listener.cleanup() diff --git a/oslo_messaging/_drivers/pika_driver/__init__.py b/oslo_messaging/_drivers/pika_driver/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py new file mode 100644 index 000000000..4f38295a8 --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -0,0 +1,434 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import random +import socket +import sys +import threading +import time + +from oslo_log import log as logging +import pika +from pika.adapters import select_connection +from pika import credentials as pika_credentials +import pika_pool +import six + +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc + +LOG = logging.getLogger(__name__) + +_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins' + + +def _is_eventlet_monkey_patched(module): + """Determines safely is eventlet patching for module enabled or not + + :param module: String, module name + :return Bool, True if module is pathed, False otherwise + """ + + if 'eventlet.patcher' not in sys.modules: + return False + import eventlet.patcher + return eventlet.patcher.is_monkey_patched(module) + + +def _create_select_poller_connection_impl( + parameters, on_open_callback, on_open_error_callback, + on_close_callback, stop_ioloop_on_close): + """Used for disabling autochoise of poller ('select', 'poll', 'epool', etc) + inside default 'SelectConnection.__init__(...)' logic. It is necessary to + force 'select' poller usage if eventlet is monkeypatched because eventlet + patches only 'select' system call + + Method signature is copied form 'SelectConnection.__init__(...)', because + it is used as replacement of 'SelectConnection' class to create instances + """ + return select_connection.SelectConnection( + parameters=parameters, + on_open_callback=on_open_callback, + on_open_error_callback=on_open_error_callback, + on_close_callback=on_close_callback, + stop_ioloop_on_close=stop_ioloop_on_close, + custom_ioloop=select_connection.SelectPoller() + ) + + +class _PooledConnectionWithConfirmations(pika_pool.Connection): + """Derived from 'pika_pool.Connection' and extends its logic - adds + 'confirm_delivery' call after channel creation to enable delivery + confirmation for channel + """ + @property + def channel(self): + if self.fairy.channel is None: + self.fairy.channel = self.fairy.cxn.channel() + self.fairy.channel.confirm_delivery() + return self.fairy.channel + + +class PikaEngine(object): + """Used for shared functionality between other pika driver modules, like + connection factory, connection pools, processing and holding configuration, + etc. + """ + + # constants for creating connection statistics + HOST_CONNECTION_LAST_TRY_TIME = "last_try_time" + HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time" + + # constant for setting tcp_user_timeout socket option + # (it should be defined in 'select' module of standard library in future) + TCP_USER_TIMEOUT = 18 + + def __init__(self, conf, url, default_exchange=None, + allowed_remote_exmods=None): + self.conf = conf + + self._force_select_poller_use = _is_eventlet_monkey_patched('select') + + # processing rpc options + self.default_rpc_exchange = ( + conf.oslo_messaging_pika.default_rpc_exchange if + conf.oslo_messaging_pika.default_rpc_exchange else + default_exchange + ) + self.rpc_reply_exchange = ( + conf.oslo_messaging_pika.rpc_reply_exchange if + conf.oslo_messaging_pika.rpc_reply_exchange else + default_exchange + ) + + self.allowed_remote_exmods = [_EXCEPTIONS_MODULE] + if allowed_remote_exmods: + self.allowed_remote_exmods.extend(allowed_remote_exmods) + + self.rpc_listener_prefetch_count = ( + conf.oslo_messaging_pika.rpc_listener_prefetch_count + ) + + self.rpc_reply_listener_prefetch_count = ( + conf.oslo_messaging_pika.rpc_listener_prefetch_count + ) + + self.rpc_reply_retry_attempts = ( + conf.oslo_messaging_pika.rpc_reply_retry_attempts + ) + if self.rpc_reply_retry_attempts is None: + raise ValueError("rpc_reply_retry_attempts should be integer") + self.rpc_reply_retry_delay = ( + conf.oslo_messaging_pika.rpc_reply_retry_delay + ) + if (self.rpc_reply_retry_delay is None or + self.rpc_reply_retry_delay < 0): + raise ValueError("rpc_reply_retry_delay should be non-negative " + "integer") + + self.rpc_queue_expiration = ( + self.conf.oslo_messaging_pika.rpc_queue_expiration + ) + + # processing notification options + self.default_notification_exchange = ( + conf.oslo_messaging_pika.default_notification_exchange if + conf.oslo_messaging_pika.default_notification_exchange else + default_exchange + ) + + self.notification_persistence = ( + conf.oslo_messaging_pika.notification_persistence + ) + + self.default_rpc_retry_attempts = ( + conf.oslo_messaging_pika.default_rpc_retry_attempts + ) + if self.default_rpc_retry_attempts is None: + raise ValueError("default_rpc_retry_attempts should be an integer") + self.rpc_retry_delay = ( + conf.oslo_messaging_pika.rpc_retry_delay + ) + if (self.rpc_retry_delay is None or + self.rpc_retry_delay < 0): + raise ValueError("rpc_retry_delay should be non-negative integer") + + self.default_notification_retry_attempts = ( + conf.oslo_messaging_pika.default_notification_retry_attempts + ) + if self.default_notification_retry_attempts is None: + raise ValueError("default_notification_retry_attempts should be " + "an integer") + self.notification_retry_delay = ( + conf.oslo_messaging_pika.notification_retry_delay + ) + if (self.notification_retry_delay is None or + self.notification_retry_delay < 0): + raise ValueError("notification_retry_delay should be non-negative " + "integer") + + self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout + self.host_connection_reconnect_delay = ( + self.conf.oslo_messaging_pika.host_connection_reconnect_delay + ) + self._heartbeat_interval = ( + self.conf.oslo_messaging_pika.heartbeat_interval + ) + + # initializing connection parameters for configured RabbitMQ hosts + common_pika_params = { + 'virtual_host': url.virtual_host, + 'channel_max': self.conf.oslo_messaging_pika.channel_max, + 'frame_max': self.conf.oslo_messaging_pika.frame_max, + 'ssl': self.conf.oslo_messaging_pika.ssl, + 'ssl_options': self.conf.oslo_messaging_pika.ssl_options, + 'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout, + } + + self._connection_lock = threading.Lock() + + self._connection_host_param_list = [] + self._connection_host_status_list = [] + + if not url.hosts: + raise ValueError("You should provide at least one RabbitMQ host") + + for transport_host in url.hosts: + pika_params = common_pika_params.copy() + pika_params.update( + host=transport_host.hostname, + port=transport_host.port, + credentials=pika_credentials.PlainCredentials( + transport_host.username, transport_host.password + ), + ) + self._connection_host_param_list.append(pika_params) + self._connection_host_status_list.append({ + self.HOST_CONNECTION_LAST_TRY_TIME: 0, + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0 + }) + + self._next_connection_host_num = random.randint( + 0, len(self._connection_host_param_list) - 1 + ) + + # initializing 2 connection pools: 1st for connections without + # confirmations, 2nd - with confirmations + self.connection_without_confirmation_pool = pika_pool.QueuedPool( + create=self.create_connection, + max_size=self.conf.oslo_messaging_pika.pool_max_size, + max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, + timeout=self.conf.oslo_messaging_pika.pool_timeout, + recycle=self.conf.oslo_messaging_pika.pool_recycle, + stale=self.conf.oslo_messaging_pika.pool_stale, + ) + + self.connection_with_confirmation_pool = pika_pool.QueuedPool( + create=self.create_connection, + max_size=self.conf.oslo_messaging_pika.pool_max_size, + max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow, + timeout=self.conf.oslo_messaging_pika.pool_timeout, + recycle=self.conf.oslo_messaging_pika.pool_recycle, + stale=self.conf.oslo_messaging_pika.pool_stale, + ) + + self.connection_with_confirmation_pool.Connection = ( + _PooledConnectionWithConfirmations + ) + + def _next_connection_num(self): + """Used for creating connections to different RabbitMQ nodes in + round robin order + + :return: next host number to create connection to + """ + with self._connection_lock: + cur_num = self._next_connection_host_num + self._next_connection_host_num += 1 + self._next_connection_host_num %= len( + self._connection_host_param_list + ) + return cur_num + + def create_connection(self, for_listening=False): + """Create and return connection to any available host. + + :return: created connection + :raise: ConnectionException if all hosts are not reachable + """ + host_count = len(self._connection_host_param_list) + connection_attempts = host_count + + pika_next_connection_num = self._next_connection_num() + + while connection_attempts > 0: + try: + return self.create_host_connection( + pika_next_connection_num, for_listening + ) + except pika_pool.Connection.connectivity_errors as e: + LOG.warn(str(e)) + except pika_drv_exc.HostConnectionNotAllowedException as e: + LOG.warn(str(e)) + + connection_attempts -= 1 + pika_next_connection_num += 1 + pika_next_connection_num %= host_count + + raise pika_drv_exc.EstablishConnectionException( + "Can not establish connection to any configured RabbitMQ host: " + + str(self._connection_host_param_list) + ) + + def _set_tcp_user_timeout(self, s): + if not self._tcp_user_timeout: + return + try: + s.setsockopt( + socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT, + int(self._tcp_user_timeout * 1000) + ) + except socket.error: + LOG.warn( + "Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT." + ) + + def create_host_connection(self, host_index, for_listening=False): + """Create new connection to host #host_index + :param host_index: Integer, number of host for connection establishing + :param for_listening: Boolean, creates connection for listening + (enable heartbeats) if True + :return: New connection + """ + + with self._connection_lock: + cur_time = time.time() + + last_success_time = self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] + last_time = self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_TRY_TIME + ] + + # raise HostConnectionNotAllowedException if we tried to establish + # connection in last 'host_connection_reconnect_delay' and got + # failure + if (last_time != last_success_time and + cur_time - last_time < + self.host_connection_reconnect_delay): + raise pika_drv_exc.HostConnectionNotAllowedException( + "Connection to host #{} is not allowed now because of " + "previous failure".format(host_index) + ) + + try: + base_host_params = self._connection_host_param_list[host_index] + + connection = pika.BlockingConnection( + parameters=pika.ConnectionParameters( + heartbeat_interval=( + self._heartbeat_interval + if for_listening else None + ), + **base_host_params + ), + _impl_class=(_create_select_poller_connection_impl + if self._force_select_poller_use else None) + ) + + self._set_tcp_user_timeout(connection._impl.socket) + + self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME + ] = cur_time + + return connection + finally: + self._connection_host_status_list[host_index][ + self.HOST_CONNECTION_LAST_TRY_TIME + ] = cur_time + + @staticmethod + def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, + exchange_type, queue_expiration, + durable): + """Declare exchange, queue and bind them using already created + channel, if they don't exist + + :param channel: Channel for communication with RabbitMQ + :param exchange: String, RabbitMQ exchange name + :param queue: Sting, RabbitMQ queue name + :param routing_key: Sting, RabbitMQ routing key for queue binding + :param exchange_type: String ('direct', 'topic' or 'fanout') + exchange type for exchange to be declared + :param queue_expiration: Integer, time in seconds which queue will + remain existing in RabbitMQ when there no consumers connected + :param durable: Boolean, creates durable exchange and queue if true + """ + try: + channel.exchange_declare( + exchange, exchange_type, auto_delete=True, durable=durable + ) + arguments = {} + + if queue_expiration > 0: + arguments['x-expires'] = queue_expiration * 1000 + + channel.queue_declare(queue, durable=durable, arguments=arguments) + + channel.queue_bind(queue, exchange, routing_key) + except pika_pool.Connection.connectivity_errors as e: + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during declaring queue " + "binding: exchange:{}, queue: {}, routing_key: {}, " + "exchange_type: {}, queue_expiration: {}, " + "durable: {}. {}".format( + exchange, queue, routing_key, exchange_type, + queue_expiration, durable, str(e) + ) + ) + + def get_rpc_exchange_name(self, exchange, topic, fanout, no_ack): + """Returns RabbitMQ exchange name for given rpc request + + :param exchange: String, oslo.messaging target's exchange + :param topic: String, oslo.messaging target's topic + :param fanout: Boolean, oslo.messaging target's fanout mode + :param no_ack: Boolean, use message delivery with acknowledges or not + + :return: String, RabbitMQ exchange name + """ + exchange = (exchange or self.default_rpc_exchange) + + if fanout: + exchange = '{}_fanout_{}_{}'.format( + exchange, "no_ack" if no_ack else "with_ack", topic + ) + return exchange + + @staticmethod + def get_rpc_queue_name(topic, server, no_ack): + """Returns RabbitMQ queue name for given rpc request + + :param topic: String, oslo.messaging target's topic + :param server: String, oslo.messaging target's server + :param no_ack: Boolean, use message delivery with acknowledges or not + + :return: String, RabbitMQ exchange name + """ + queue_parts = ["no_ack" if no_ack else "with_ack", topic] + if server is not None: + queue_parts.append(server) + queue = '.'.join(queue_parts) + return queue diff --git a/oslo_messaging/_drivers/pika_driver/pika_exceptions.py b/oslo_messaging/_drivers/pika_driver/pika_exceptions.py new file mode 100644 index 000000000..c32d7e401 --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_exceptions.py @@ -0,0 +1,68 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_messaging import exceptions + + +class ExchangeNotFoundException(exceptions.MessageDeliveryFailure): + """Is raised if specified exchange is not found in RabbitMQ.""" + pass + + +class MessageRejectedException(exceptions.MessageDeliveryFailure): + """Is raised if message which you are trying to send was nacked by RabbitMQ + it may happen if RabbitMQ is not able to process message + """ + pass + + +class RoutingException(exceptions.MessageDeliveryFailure): + """Is raised if message can not be delivered to any queue. Usually it means + that any queue is not binded to given exchange with given routing key. + Raised if 'mandatory' flag specified only + """ + pass + + +class ConnectionException(exceptions.MessagingException): + """Is raised if some operation can not be performed due to connectivity + problem + """ + pass + + +class TimeoutConnectionException(ConnectionException): + """Is raised if socket timeout was expired during network interaction""" + pass + + +class EstablishConnectionException(ConnectionException): + """Is raised if we have some problem during establishing connection + procedure + """ + pass + + +class HostConnectionNotAllowedException(EstablishConnectionException): + """Is raised in case of try to establish connection to temporary + not allowed host (because of reconnection policy for example) + """ + pass + + +class UnsupportedDriverVersion(exceptions.MessagingException): + """Is raised when message is received but was sent by different, + not supported driver version + """ + pass diff --git a/oslo_messaging/_drivers/pika_driver/pika_listener.py b/oslo_messaging/_drivers/pika_driver/pika_listener.py new file mode 100644 index 000000000..2c33168e5 --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_listener.py @@ -0,0 +1,155 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import time +import uuid + +from concurrent import futures +from oslo_log import log as logging + +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc +from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller + +LOG = logging.getLogger(__name__) + + +class RpcReplyPikaListener(object): + """Provide functionality for listening RPC replies. Create and handle + reply poller and coroutine for performing polling job + """ + + def __init__(self, pika_engine): + self._pika_engine = pika_engine + + # preparing poller for listening replies + self._reply_queue = None + + self._reply_poller = None + self._reply_waiting_futures = {} + + self._reply_consumer_initialized = False + self._reply_consumer_initialization_lock = threading.Lock() + self._poller_thread = None + + def get_reply_qname(self, expiration_time=None): + """As result return reply queue name, shared for whole process, + but before this check is RPC listener initialized or not and perform + initialization if needed + + :param expiration_time: Float, expiration time in seconds + (like time.time()), + :return: String, queue name which hould be used for reply sending + """ + if self._reply_consumer_initialized: + return self._reply_queue + + with self._reply_consumer_initialization_lock: + if self._reply_consumer_initialized: + return self._reply_queue + + # generate reply queue name if needed + if self._reply_queue is None: + self._reply_queue = "reply.{}.{}.{}".format( + self._pika_engine.conf.project, + self._pika_engine.conf.prog, uuid.uuid4().hex + ) + + # initialize reply poller if needed + if self._reply_poller is None: + self._reply_poller = pika_drv_poller.RpcReplyPikaPoller( + pika_engine=self._pika_engine, + exchange=self._pika_engine.rpc_reply_exchange, + queue=self._reply_queue, + prefetch_count=( + self._pika_engine.rpc_reply_listener_prefetch_count + ) + ) + + self._reply_poller.start(timeout=expiration_time - time.time()) + + # start reply poller job thread if needed + if self._poller_thread is None: + self._poller_thread = threading.Thread(target=self._poller) + self._poller_thread.daemon = True + + if not self._poller_thread.is_alive(): + self._poller_thread.start() + + self._reply_consumer_initialized = True + + return self._reply_queue + + def _poller(self): + """Reply polling job. Poll replies in infinite loop and notify + registered features + """ + while self._reply_poller: + try: + try: + messages = self._reply_poller.poll() + except pika_drv_exc.EstablishConnectionException: + LOG.exception("Problem during establishing connection for " + "reply polling") + time.sleep( + self._pika_engine.host_connection_reconnect_delay + ) + continue + + for message in messages: + try: + message.acknowledge() + future = self._reply_waiting_futures.pop( + message.msg_id, None + ) + if future is not None: + future.set_result(message) + except Exception: + LOG.exception("Unexpected exception during processing" + "reply message") + except BaseException: + LOG.exception("Unexpected exception during reply polling") + + def register_reply_waiter(self, msg_id): + """Register reply waiter. Should be called before message sending to + the server + :param msg_id: String, message_id of expected reply + :return future: Future, container for expected reply to be returned + over + """ + future = futures.Future() + self._reply_waiting_futures[msg_id] = future + return future + + def unregister_reply_waiter(self, msg_id): + """Unregister reply waiter. Should be called if client has not got + reply and doesn't want to continue waiting (if timeout_expired for + example) + :param msg_id: + """ + self._reply_waiting_futures.pop(msg_id, None) + + def cleanup(self): + """Stop replies consuming and cleanup resources""" + if self._reply_poller: + self._reply_poller.stop() + self._reply_poller.cleanup() + self._reply_poller = None + + if self._poller_thread: + if self._poller_thread.is_alive(): + self._poller_thread.join() + self._poller_thread = None + + self._reply_queue = None diff --git a/oslo_messaging/_drivers/pika_driver/pika_message.py b/oslo_messaging/_drivers/pika_driver/pika_message.py new file mode 100644 index 000000000..eac2be938 --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_message.py @@ -0,0 +1,621 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import socket +import time +import traceback +import uuid + +from concurrent import futures +from oslo_log import log as logging +from oslo_serialization import jsonutils +from oslo_utils import importutils +from pika import exceptions as pika_exceptions +from pika import spec as pika_spec +import pika_pool +import retrying +import six + + +import oslo_messaging +from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc +from oslo_messaging import _utils as utils +from oslo_messaging import exceptions + + +LOG = logging.getLogger(__name__) + +_VERSION_HEADER = "version" +_VERSION = "1.0" + + +class RemoteExceptionMixin(object): + """Used for constructing dynamic exception type during deserialization of + remote exception. It defines unified '__init__' method signature and + exception message format + """ + def __init__(self, module, clazz, message, trace): + """Store serialized data + :param module: String, module name for importing original exception + class of serialized remote exception + :param clazz: String, original class name of serialized remote + exception + :param message: String, original message of serialized remote + exception + :param trace: String, original trace of serialized remote exception + """ + self.module = module + self.clazz = clazz + self.message = message + self.trace = trace + + self._str_msgs = message + "\n" + "\n".join(trace) + + def __str__(self): + return self._str_msgs + + +class PikaIncomingMessage(object): + """Driver friendly adapter for received message. Extract message + information from RabbitMQ message and provide access to it + """ + + def __init__(self, pika_engine, channel, method, properties, body): + """Parse RabbitMQ message + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery, used for sending ack back. + If None - ack is not required + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + """ + headers = getattr(properties, "headers", {}) + version = headers.get(_VERSION_HEADER, None) + if not utils.version_is_compatible(version, _VERSION): + raise pika_drv_exc.UnsupportedDriverVersion( + "Message's version: {} is not compatible with driver version: " + "{}".format(version, _VERSION)) + + self._pika_engine = pika_engine + self._channel = channel + self._delivery_tag = method.delivery_tag + + self._version = version + + self._content_type = properties.content_type + self._content_encoding = properties.content_encoding + self.unique_id = properties.message_id + + self.expiration_time = ( + None if properties.expiration is None else + time.time() + float(properties.expiration) / 1000 + ) + + if self._content_type != "application/json": + raise NotImplementedError( + "Content-type['{}'] is not valid, " + "'application/json' only is supported.".format( + self._content_type + ) + ) + + message_dict = jsonutils.loads(body, encoding=self._content_encoding) + + context_dict = {} + + for key in list(message_dict.keys()): + key = six.text_type(key) + if key.startswith('_$_'): + value = message_dict.pop(key) + context_dict[key[3:]] = value + self.message = message_dict + self.ctxt = context_dict + + def need_ack(self): + return self._channel is not None + + def acknowledge(self): + """Ack the message. Should be called by message processing logic when + it considered as consumed (means that we don't need redelivery of this + message anymore) + """ + if self.need_ack(): + self._channel.basic_ack(delivery_tag=self._delivery_tag) + + def requeue(self): + """Rollback the message. Should be called by message processing logic + when it can not process the message right now and should be redelivered + later if it is possible + """ + if self.need_ack(): + return self._channel.basic_nack(delivery_tag=self._delivery_tag, + requeue=True) + + +class RpcPikaIncomingMessage(PikaIncomingMessage): + """PikaIncomingMessage implementation for RPC messages. It expects + extra RPC related fields in message body (msg_id and reply_q). Also 'reply' + method added to allow consumer to send RPC reply back to the RPC client + """ + + def __init__(self, pika_engine, channel, method, properties, body): + """Defines default values of msg_id and reply_q fields and just call + super.__init__ method + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery, used for sending ack back. + If None - ack is not required + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + """ + super(RpcPikaIncomingMessage, self).__init__( + pika_engine, channel, method, properties, body + ) + self.reply_q = properties.reply_to + self.msg_id = properties.correlation_id + + def reply(self, reply=None, failure=None, log_failure=True): + """Send back reply to the RPC client + :param reply: Dictionary, reply. In case of exception should be None + :param failure: Tuple, should be a sys.exc_info() tuple. + Should be None if RPC request was successfully processed. + :param log_failure: Boolean, not used in this implementation. + It present here to be compatible with driver API + + :return RpcReplyPikaIncomingMessage, message with reply + """ + + if self.reply_q is None: + return + + reply_outgoing_message = RpcReplyPikaOutgoingMessage( + self._pika_engine, self.msg_id, reply=reply, failure_info=failure, + content_type=self._content_type, + content_encoding=self._content_encoding + ) + + def on_exception(ex): + if isinstance(ex, pika_drv_exc.ConnectionException): + LOG.warn(str(ex)) + return True + else: + return False + + retrier = retrying.retry( + stop_max_attempt_number=( + None if self._pika_engine.rpc_reply_retry_attempts == -1 + else self._pika_engine.rpc_reply_retry_attempts + ), + retry_on_exception=on_exception, + wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, + ) if self._pika_engine.rpc_reply_retry_attempts else None + + try: + reply_outgoing_message.send( + reply_q=self.reply_q, + expiration_time=self.expiration_time, + retrier=retrier + ) + LOG.debug( + "Message [id:'{}'] replied to '{}'.".format( + self.msg_id, self.reply_q + ) + ) + except Exception: + LOG.exception( + "Message [id:'{}'] wasn't replied to : {}".format( + self.msg_id, self.reply_q + ) + ) + + +class RpcReplyPikaIncomingMessage(PikaIncomingMessage): + """PikaIncomingMessage implementation for RPC reply messages. It expects + extra RPC reply related fields in message body (result and failure). + """ + def __init__(self, pika_engine, channel, method, properties, body): + """Defines default values of result and failure fields, call + super.__init__ method and then construct Exception object if failure is + not None + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param channel: Channel, RabbitMQ channel which was used for + this message delivery, used for sending ack back. + If None - ack is not required + :param method: Method, RabbitMQ message method + :param properties: Properties, RabbitMQ message properties + :param body: Bytes, RabbitMQ message body + """ + super(RpcReplyPikaIncomingMessage, self).__init__( + pika_engine, channel, method, properties, body + ) + + self.msg_id = properties.correlation_id + + self.result = self.message.get("s", None) + self.failure = self.message.get("e", None) + + if self.failure is not None: + trace = self.failure.get('t', []) + message = self.failure.get('s', "") + class_name = self.failure.get('c') + module_name = self.failure.get('m') + + res_exc = None + + if module_name in pika_engine.allowed_remote_exmods: + try: + module = importutils.import_module(module_name) + klass = getattr(module, class_name) + + ex_type = type( + klass.__name__, + (RemoteExceptionMixin, klass), + {} + ) + + res_exc = ex_type(module_name, class_name, message, trace) + except ImportError as e: + LOG.warn( + "Can not deserialize remote exception [module:{}, " + "class:{}]. {}".format(module_name, class_name, str(e)) + ) + + # if we have not processed failure yet, use RemoteError class + if res_exc is None: + res_exc = oslo_messaging.RemoteError( + class_name, message, trace + ) + self.failure = res_exc + + +class PikaOutgoingMessage(object): + """Driver friendly adapter for sending message. Construct RabbitMQ message + and send it + """ + + def __init__(self, pika_engine, message, context, + content_type="application/json", content_encoding="utf-8"): + """Parse RabbitMQ message + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param message: Dictionary, user's message fields + :param context: Dictionary, request context's fields + :param content_type: String, content-type header, defines serialization + mechanism + :param content_encoding: String, defines encoding for text data + """ + + self._pika_engine = pika_engine + + self._content_type = content_type + self._content_encoding = content_encoding + + if self._content_type != "application/json": + raise NotImplementedError( + "Content-type['{}'] is not valid, " + "'application/json' only is supported.".format( + self._content_type + ) + ) + + self.message = message + self.context = context + + self.unique_id = uuid.uuid4().hex + + def _prepare_message_to_send(self): + """Combine user's message fields an system fields (_unique_id, + context's data etc) + """ + msg = self.message.copy() + + if self.context: + for key, value in six.iteritems(self.context): + key = six.text_type(key) + msg['_$_' + key] = value + + props = pika_spec.BasicProperties( + content_encoding=self._content_encoding, + content_type=self._content_type, + headers={_VERSION_HEADER: _VERSION}, + message_id=self.unique_id, + ) + return msg, props + + @staticmethod + def _publish(pool, exchange, routing_key, body, properties, mandatory, + expiration_time): + """Execute pika publish method using connection from connection pool + Also this message catches all pika related exceptions and raise + oslo.messaging specific exceptions + + :param pool: Pool, pika connection pool for connection choosing + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param body: Bytes, RabbitMQ message payload + :param properties: Properties, RabbitMQ message properties + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param expiration_time: Float, expiration time in seconds + (like time.time()) + """ + timeout = (None if expiration_time is None else + expiration_time - time.time()) + if timeout is not None and timeout < 0: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired." + ) + try: + with pool.acquire(timeout=timeout) as conn: + if timeout is not None: + properties.expiration = str(int(timeout * 1000)) + conn.channel.publish( + exchange=exchange, + routing_key=routing_key, + body=body, + properties=properties, + mandatory=mandatory + ) + except pika_exceptions.NackError as e: + raise pika_drv_exc.MessageRejectedException( + "Can not send message: [body: {}], properties: {}] to " + "target [exchange: {}, routing_key: {}]. {}".format( + body, properties, exchange, routing_key, str(e) + ) + ) + except pika_exceptions.UnroutableError as e: + raise pika_drv_exc.RoutingException( + "Can not deliver message:[body:{}, properties: {}] to any" + "queue using target: [exchange:{}, " + "routing_key:{}]. {}".format( + body, properties, exchange, routing_key, str(e) + ) + ) + except pika_pool.Timeout as e: + raise exceptions.MessagingTimeout( + "Timeout for current operation was expired. {}".format(str(e)) + ) + except pika_pool.Connection.connectivity_errors as e: + if (isinstance(e, pika_exceptions.ChannelClosed) + and e.args and e.args[0] == 404): + raise pika_drv_exc.ExchangeNotFoundException( + "Attempt to send message to not existing exchange " + "detected, message: [body:{}, properties: {}], target: " + "[exchange:{}, routing_key:{}]. {}".format( + body, properties, exchange, routing_key, str(e) + ) + ) + + raise pika_drv_exc.ConnectionException( + "Connectivity problem detected during sending the message: " + "[body:{}, properties: {}] to target: [exchange:{}, " + "routing_key:{}]. {}".format( + body, properties, exchange, routing_key, str(e) + ) + ) + except socket.timeout: + raise pika_drv_exc.TimeoutConnectionException( + "Socket timeout exceeded." + ) + + def _do_send(self, exchange, routing_key, msg_dict, msg_props, + confirm=True, mandatory=True, persistent=False, + expiration_time=None, retrier=None): + """Send prepared message with configured retrying + + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param msg_dict: Dictionary, message payload + :param msg_props: Properties, message properties + :param confirm: Boolean, enable publisher confirmation if True + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param persistent: Boolean, send persistent message if True, works only + for routing into durable queues + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ + msg_props.delivery_mode = 2 if persistent else 1 + + pool = (self._pika_engine.connection_with_confirmation_pool + if confirm else + self._pika_engine.connection_without_confirmation_pool) + + body = jsonutils.dump_as_bytes(msg_dict, + encoding=self._content_encoding) + + LOG.debug( + "Sending message:[body:{}; properties: {}] to target: " + "[exchange:{}; routing_key:{}]".format( + body, msg_props, exchange, routing_key + ) + ) + + publish = (self._publish if retrier is None else + retrier(self._publish)) + + return publish(pool, exchange, routing_key, body, msg_props, + mandatory, expiration_time) + + def send(self, exchange, routing_key='', confirm=True, mandatory=True, + persistent=False, expiration_time=None, retrier=None): + """Send message with configured retrying + + :param exchange: String, RabbitMQ exchange name for message sending + :param routing_key: String, RabbitMQ routing key for message routing + :param confirm: Boolean, enable publisher confirmation if True + :param mandatory: Boolean, RabbitMQ publish mandatory flag (raise + exception if it is not possible to deliver message to any queue) + :param persistent: Boolean, send persistent message if True, works only + for routing into durable queues + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ + msg_dict, msg_props = self._prepare_message_to_send() + + return self._do_send(exchange, routing_key, msg_dict, msg_props, + confirm, mandatory, persistent, expiration_time, + retrier) + + +class RpcPikaOutgoingMessage(PikaOutgoingMessage): + """PikaOutgoingMessage implementation for RPC messages. It adds + possibility to wait and receive RPC reply + """ + def __init__(self, pika_engine, message, context, + content_type="application/json", content_encoding="utf-8"): + super(RpcPikaOutgoingMessage, self).__init__( + pika_engine, message, context, content_type, content_encoding + ) + self.msg_id = None + self.reply_q = None + + def send(self, target, reply_listener=None, expiration_time=None, + retrier=None): + """Send RPC message with configured retrying + + :param target: Target, oslo.messaging target which defines RPC service + :param reply_listener: RpcReplyPikaListener, listener for waiting + reply. If None - return immediately without reply waiting + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ + + exchange = self._pika_engine.get_rpc_exchange_name( + target.exchange, target.topic, target.fanout, retrier is None + ) + + queue = "" if target.fanout else self._pika_engine.get_rpc_queue_name( + target.topic, target.server, retrier is None + ) + + msg_dict, msg_props = self._prepare_message_to_send() + + if reply_listener: + self.msg_id = uuid.uuid4().hex + msg_props.correlation_id = self.msg_id + LOG.debug('MSG_ID is %s', self.msg_id) + + self.reply_q = reply_listener.get_reply_qname( + expiration_time - time.time() + ) + msg_props.reply_to = self.reply_q + + future = reply_listener.register_reply_waiter(msg_id=self.msg_id) + + self._do_send( + exchange=exchange, routing_key=queue, msg_dict=msg_dict, + msg_props=msg_props, confirm=True, mandatory=True, + persistent=False, expiration_time=expiration_time, + retrier=retrier + ) + + try: + return future.result(expiration_time - time.time()) + except BaseException as e: + reply_listener.unregister_reply_waiter(self.msg_id) + if isinstance(e, futures.TimeoutError): + e = exceptions.MessagingTimeout() + raise e + else: + self._do_send( + exchange=exchange, routing_key=queue, msg_dict=msg_dict, + msg_props=msg_props, confirm=True, mandatory=True, + persistent=False, expiration_time=expiration_time, + retrier=retrier + ) + + +class RpcReplyPikaOutgoingMessage(PikaOutgoingMessage): + """PikaOutgoingMessage implementation for RPC reply messages. It sets + correlation_id AMQP property to link this reply with response + """ + def __init__(self, pika_engine, msg_id, reply=None, failure_info=None, + content_type="application/json", content_encoding="utf-8"): + """Initialize with reply information for sending + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param msg_id: String, msg_id of RPC request, which waits for reply + :param reply: Dictionary, reply. In case of exception should be None + :param failure_info: Tuple, should be a sys.exc_info() tuple. + Should be None if RPC request was successfully processed. + :param content_type: String, content-type header, defines serialization + mechanism + :param content_encoding: String, defines encoding for text data + """ + self.msg_id = msg_id + + if failure_info is not None: + ex_class = failure_info[0] + ex = failure_info[1] + tb = traceback.format_exception(*failure_info) + if issubclass(ex_class, RemoteExceptionMixin): + failure_data = { + 'c': ex.clazz, + 'm': ex.module, + 's': ex.message, + 't': tb + } + else: + failure_data = { + 'c': six.text_type(ex_class.__name__), + 'm': six.text_type(ex_class.__module__), + 's': six.text_type(ex), + 't': tb + } + + msg = {'e': failure_data} + else: + msg = {'s': reply} + + super(RpcReplyPikaOutgoingMessage, self).__init__( + pika_engine, msg, None, content_type, content_encoding + ) + + def send(self, reply_q, expiration_time=None, retrier=None): + """Send RPC message with configured retrying + + :param reply_q: String, queue name for sending reply + :param expiration_time: Float, expiration time in seconds + (like time.time()) + :param retrier: retrying.Retrier, configured retrier object for sending + message, if None no retrying is performed + """ + + msg_dict, msg_props = self._prepare_message_to_send() + msg_props.correlation_id = self.msg_id + + self._do_send( + exchange=self._pika_engine.rpc_reply_exchange, routing_key=reply_q, + msg_dict=msg_dict, msg_props=msg_props, confirm=True, + mandatory=True, persistent=False, expiration_time=expiration_time, + retrier=retrier + ) diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py new file mode 100644 index 000000000..3533dad2f --- /dev/null +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -0,0 +1,402 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import time + +from oslo_log import log as logging +import pika_pool +import retrying +import six + +from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg + +LOG = logging.getLogger(__name__) + + +class PikaPoller(object): + """Provides user friendly functionality for RabbitMQ message consuming, + handles low level connectivity problems and restore connection if some + connectivity related problem detected + """ + + def __init__(self, pika_engine, prefetch_count, incoming_message_class): + """Initialize required fields + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param prefetch_count: Integer, maximum count of unacknowledged + messages which RabbitMQ broker sends to this consumer + :param incoming_message_class: PikaIncomingMessage, wrapper for + consumed RabbitMQ message + """ + self._pika_engine = pika_engine + self._prefetch_count = prefetch_count + self._incoming_message_class = incoming_message_class + + self._connection = None + self._channel = None + self._lock = threading.Lock() + + self._started = False + + self._queues_to_consume = None + + self._message_queue = [] + + def _reconnect(self): + """Performs reconnection to the broker. It is unsafe method for + internal use only + """ + self._connection = self._pika_engine.create_connection( + for_listening=True + ) + self._channel = self._connection.channel() + self._channel.basic_qos(prefetch_count=self._prefetch_count) + + if self._queues_to_consume is None: + self._queues_to_consume = self._declare_queue_binding() + + for queue, no_ack in six.iteritems(self._queues_to_consume): + self._start_consuming(queue, no_ack) + + def _declare_queue_binding(self): + """Is called by recovering connection logic if target RabbitMQ + exchange and (or) queue do not exist. Should be overridden in child + classes + + :return Dictionary, declared_queue_name -> no_ack_mode + """ + raise NotImplementedError( + "It is base class. Please declare exchanges and queues here" + ) + + def _start_consuming(self, queue, no_ack): + """Is called by recovering connection logic for starting consumption + of the RabbitMQ queue + + :param queue: String, RabbitMQ queue name for consuming + :param no_ack: Boolean, Choose consuming acknowledgement mode. If True, + acknowledges are not needed. RabbitMQ considers message consumed + after sending it to consumer immediately + """ + on_message_no_ack_callback = ( + self._on_message_no_ack_callback if no_ack + else self._on_message_with_ack_callback + ) + + try: + self._channel.basic_consume(on_message_no_ack_callback, queue, + no_ack=no_ack) + except Exception: + self._queues_to_consume = None + raise + + def _on_message_no_ack_callback(self, unused, method, properties, body): + """Is called by Pika when message was received from queue listened with + no_ack=True mode + """ + self._message_queue.append( + self._incoming_message_class( + self._pika_engine, None, method, properties, body + ) + ) + + def _on_message_with_ack_callback(self, unused, method, properties, body): + """Is called by Pika when message was received from queue listened with + no_ack=False mode + """ + self._message_queue.append( + self._incoming_message_class( + self._pika_engine, self._channel, method, properties, body + ) + ) + + def _cleanup(self): + """Cleanup allocated resources (channel, connection, etc). It is unsafe + method for internal use only + """ + if self._channel: + try: + self._channel.close() + except Exception as ex: + if not pika_pool.Connection.is_connection_invalidated(ex): + LOG.exception("Unexpected error during closing channel") + self._channel = None + + if self._connection: + try: + self._connection.close() + except Exception as ex: + if not pika_pool.Connection.is_connection_invalidated(ex): + LOG.exception("Unexpected error during closing connection") + self._connection = None + + for i in xrange(len(self._message_queue) - 1, -1, -1): + message = self._message_queue[i] + if message.need_ack(): + del self._message_queue[i] + + def poll(self, timeout=None, prefetch_size=1): + """Main method of this class - consumes message from RabbitMQ + + :param: timeout: float, seconds, timeout for waiting new incoming + message, None means wait forever + :param: prefetch_size: Integer, count of messages which we are want to + poll. It blocks until prefetch_size messages are consumed or until + timeout gets expired + :return: list of PikaIncomingMessage, RabbitMQ messages + """ + expiration_time = time.time() + timeout if timeout else None + + while True: + with self._lock: + if timeout is not None: + timeout = expiration_time - time.time() + if (len(self._message_queue) < prefetch_size and + self._started and ((timeout is None) or timeout > 0)): + try: + if self._channel is None: + self._reconnect() + # we need some time_limit here, not too small to avoid + # a lot of not needed iterations but not too large to + # release lock time to time and give a chance to + # perform another method waiting this lock + self._connection.process_data_events( + time_limit=0.25 + ) + except pika_pool.Connection.connectivity_errors: + self._cleanup() + raise + else: + result = self._message_queue[:prefetch_size] + del self._message_queue[:prefetch_size] + return result + + def start(self): + """Starts poller. Should be called before polling to allow message + consuming + """ + self._started = True + + def stop(self): + """Stops poller. Should be called when polling is not needed anymore to + stop new message consuming. After that it is necessary to poll already + prefetched messages + """ + with self._lock: + if not self._started: + return + + self._started = False + + def reconnect(self): + """Safe version of _reconnect. Performs reconnection to the broker.""" + with self._lock: + self._cleanup() + try: + self._reconnect() + except Exception: + self._cleanup() + raise + + def cleanup(self): + """Safe version of _cleanup. Cleans up allocated resources (channel, + connection, etc). + """ + with self._lock: + self._cleanup() + + +class RpcServicePikaPoller(PikaPoller): + """PikaPoller implementation for polling RPC messages. Overrides base + functionality according to RPC specific + """ + def __init__(self, pika_engine, target, prefetch_count): + """Adds target parameter for declaring RPC specific exchanges and + queues + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param target: Target, oslo.messaging Target object which defines RPC + endpoint + :param prefetch_count: Integer, maximum count of unacknowledged + messages which RabbitMQ broker sends to this consumer + """ + self._target = target + + super(RpcServicePikaPoller, self).__init__( + pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.RpcPikaIncomingMessage + ) + + def _declare_queue_binding(self): + """Overrides base method and perform declaration of RabbitMQ exchanges + and queues which correspond to oslo.messaging RPC target + + :return Dictionary, declared_queue_name -> no_ack_mode + """ + queue_expiration = self._pika_engine.rpc_queue_expiration + + queues_to_consume = {} + + for no_ack in [True, False]: + exchange = self._pika_engine.get_rpc_exchange_name( + self._target.exchange, self._target.topic, False, no_ack + ) + fanout_exchange = self._pika_engine.get_rpc_exchange_name( + self._target.exchange, self._target.topic, True, no_ack + ) + queue = self._pika_engine.get_rpc_queue_name( + self._target.topic, None, no_ack + ) + server_queue = self._pika_engine.get_rpc_queue_name( + self._target.topic, self._target.server, no_ack + ) + + queues_to_consume[queue] = no_ack + queues_to_consume[server_queue] = no_ack + + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, queue=queue, + routing_key=queue, exchange_type='direct', durable=False, + queue_expiration=queue_expiration + ) + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=exchange, queue=server_queue, + routing_key=server_queue, exchange_type='direct', + queue_expiration=queue_expiration, durable=False + ) + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, exchange=fanout_exchange, durable=False, + queue=server_queue, routing_key="", exchange_type='fanout', + queue_expiration=queue_expiration + ) + return queues_to_consume + + +class RpcReplyPikaPoller(PikaPoller): + """PikaPoller implementation for polling RPC reply messages. Overrides + base functionality according to RPC reply specific + """ + def __init__(self, pika_engine, exchange, queue, prefetch_count): + """Adds exchange and queue parameter for declaring exchange and queue + used for RPC reply delivery + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param exchange: String, exchange name used for RPC reply delivery + :param queue: String, queue name used for RPC reply delivery + :param prefetch_count: Integer, maximum count of unacknowledged + messages which RabbitMQ broker sends to this consumer + """ + self._exchange = exchange + self._queue = queue + + super(RpcReplyPikaPoller, self).__init__( + pika_engine=pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.RpcReplyPikaIncomingMessage + ) + + def _declare_queue_binding(self): + """Overrides base method and perform declaration of RabbitMQ exchange + and queue used for RPC reply delivery + + :return Dictionary, declared_queue_name -> no_ack_mode + """ + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, + exchange=self._exchange, queue=self._queue, + routing_key=self._queue, exchange_type='direct', + queue_expiration=self._pika_engine.rpc_queue_expiration, + durable=False + ) + + return {self._queue: False} + + def start(self, timeout=None): + """Overrides default behaviour of start method. Base start method + does not create connection to RabbitMQ during start method (uses + lazy connecting during first poll method call). This class should be + connected after start call to ensure that exchange and queue for reply + delivery are created before RPC request sending + """ + super(RpcReplyPikaPoller, self).start() + + def on_exception(ex): + LOG.warn(str(ex)) + + return True + + retrier = retrying.retry( + stop_max_attempt_number=self._pika_engine.rpc_reply_retry_attempts, + stop_max_delay=None if timeout is None else timeout * 1000, + wait_fixed=self._pika_engine.rpc_reply_retry_delay * 1000, + retry_on_exception=on_exception, + ) + + retrier(self.reconnect)() + + +class NotificationPikaPoller(PikaPoller): + """PikaPoller implementation for polling Notification messages. Overrides + base functionality according to Notification specific + """ + def __init__(self, pika_engine, targets_and_priorities, + queue_name=None, prefetch_count=100): + """Adds targets_and_priorities and queue_name parameter + for declaring exchanges and queues used for notification delivery + + :param pika_engine: PikaEngine, shared object with configuration and + shared driver functionality + :param targets_and_priorities: list of (target, priority), defines + default queue names for corresponding notification types + :param queue: String, alternative queue name used for this poller + instead of default queue name + :param prefetch_count: Integer, maximum count of unacknowledged + messages which RabbitMQ broker sends to this consumer + """ + self._targets_and_priorities = targets_and_priorities + self._queue_name = queue_name + + super(NotificationPikaPoller, self).__init__( + pika_engine, prefetch_count=prefetch_count, + incoming_message_class=pika_drv_msg.PikaIncomingMessage + ) + + def _declare_queue_binding(self): + """Overrides base method and perform declaration of RabbitMQ exchanges + and queues used for notification delivery + + :return Dictionary, declared_queue_name -> no_ack_mode + """ + queues_to_consume = {} + for target, priority in self._targets_and_priorities: + routing_key = '%s.%s' % (target.topic, priority) + queue = self._queue_name or routing_key + self._pika_engine.declare_queue_binding_by_channel( + channel=self._channel, + exchange=( + target.exchange or + self._pika_engine.default_notification_exchange + ), + queue = queue, + routing_key=routing_key, + exchange_type='direct', + queue_expiration=None, + durable=self._pika_engine.notification_persistence, + ) + queues_to_consume[queue] = False + + return queues_to_consume diff --git a/oslo_messaging/tests/drivers/pika/__init__.py b/oslo_messaging/tests/drivers/pika/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/tests/drivers/pika/test_message.py b/oslo_messaging/tests/drivers/pika/test_message.py new file mode 100644 index 000000000..3c3f87e39 --- /dev/null +++ b/oslo_messaging/tests/drivers/pika/test_message.py @@ -0,0 +1,622 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import functools +import time +import unittest + +from concurrent import futures +from mock import mock, patch +from oslo_serialization import jsonutils +import pika +from pika import spec + +import oslo_messaging +from oslo_messaging._drivers.pika_driver import pika_engine +from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg + + +class PikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + self._properties = pika.BasicProperties( + content_type="application/json", + headers={"version": "1.0"}, + ) + self._body = ( + b'{"_$_key_context":"context_value",' + b'"payload_key": "payload_value"}' + ) + + def test_message_body_parsing(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + def test_message_acknowledge(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + message.acknowledge() + + self.assertEqual(1, self._channel.basic_ack.call_count) + self.assertEqual({"delivery_tag": self._delivery_tag}, + self._channel.basic_ack.call_args[1]) + + def test_message_acknowledge_no_ack(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, None, self._method, self._properties, + self._body + ) + + message.acknowledge() + + self.assertEqual(0, self._channel.basic_ack.call_count) + + def test_message_requeue(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + message.requeue() + + self.assertEqual(1, self._channel.basic_nack.call_count) + self.assertEqual({"delivery_tag": self._delivery_tag, 'requeue': True}, + self._channel.basic_nack.call_args[1]) + + def test_message_requeue_no_ack(self): + message = pika_drv_msg.PikaIncomingMessage( + self._pika_engine, None, self._method, self._properties, + self._body + ) + + message.requeue() + + self.assertEqual(0, self._channel.basic_nack.call_count) + + +class RpcPikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._pika_engine.rpc_reply_retry_attempts = 3 + self._pika_engine.rpc_reply_retry_delay = 0.25 + + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + self._body = ( + b'{"_$_key_context":"context_value",' + b'"payload_key":"payload_value"}' + ) + self._properties = pika.BasicProperties( + content_type="application/json", + content_encoding="utf-8", + headers={"version": "1.0"}, + ) + + def test_call_message_body_parsing(self): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + def test_cast_message_body_parsing(self): + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, None) + self.assertEqual(message.reply_q, None) + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + @patch(("oslo_messaging._drivers.pika_driver.pika_message." + "PikaOutgoingMessage.send")) + def test_reply_for_cast_message(self, send_reply_mock): + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, None) + self.assertEqual(message.reply_q, None) + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + message.reply(reply=object()) + + self.assertEqual(send_reply_mock.call_count, 0) + + @patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcReplyPikaOutgoingMessage") + @patch("retrying.retry") + def test_positive_reply_for_call_message(self, + retry_mock, + outgoing_message_mock): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + reply = "all_fine" + message.reply(reply=reply) + + outgoing_message_mock.assert_called_once_with( + self._pika_engine, 123456789, failure_info=None, reply='all_fine', + content_encoding='utf-8', content_type='application/json' + ) + outgoing_message_mock().send.assert_called_once_with( + expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + ) + retry_mock.assert_called_once_with( + retry_on_exception=mock.ANY, stop_max_attempt_number=3, + wait_fixed=250.0 + ) + + @patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcReplyPikaOutgoingMessage") + @patch("retrying.retry") + def test_negative_reply_for_call_message(self, + retry_mock, + outgoing_message_mock): + self._properties.correlation_id = 123456789 + self._properties.reply_to = "reply_queue" + + message = pika_drv_msg.RpcPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + self._body + ) + + self.assertEqual(message.ctxt.get("key_context", None), + "context_value") + self.assertEqual(message.msg_id, 123456789) + self.assertEqual(message.reply_q, "reply_queue") + + self.assertEqual(message.message.get("payload_key", None), + "payload_value") + + failure_info = object() + message.reply(failure=failure_info) + + outgoing_message_mock.assert_called_once_with( + self._pika_engine, 123456789, + failure_info=failure_info, + reply=None, + content_encoding='utf-8', + content_type='application/json' + ) + outgoing_message_mock().send.assert_called_once_with( + expiration_time=None, reply_q='reply_queue', retrier=mock.ANY + ) + retry_mock.assert_called_once_with( + retry_on_exception=mock.ANY, stop_max_attempt_number=3, + wait_fixed=250.0 + ) + + +class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._pika_engine.allowed_remote_exmods = [ + pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions" + ] + + self._channel = mock.Mock() + + self._delivery_tag = 12345 + + self._method = pika.spec.Basic.Deliver(delivery_tag=self._delivery_tag) + + self._properties = pika.BasicProperties( + content_type="application/json", + content_encoding="utf-8", + headers={"version": "1.0"}, + correlation_id=123456789 + ) + + def test_positive_reply_message_body_parsing(self): + + body = b'{"s": "all fine"}' + + message = pika_drv_msg.RpcReplyPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + body + ) + + self.assertEqual(message.msg_id, 123456789) + self.assertIsNone(message.failure) + self.assertEquals(message.result, "all fine") + + def test_negative_reply_message_body_parsing(self): + + body = (b'{' + b' "e": {' + b' "s": "Error message",' + b' "t": ["TRACE HERE"],' + b' "c": "MessagingException",' + b' "m": "oslo_messaging.exceptions"' + b' }' + b'}') + + message = pika_drv_msg.RpcReplyPikaIncomingMessage( + self._pika_engine, self._channel, self._method, self._properties, + body + ) + + self.assertEqual(message.msg_id, 123456789) + self.assertIsNone(message.result) + self.assertEquals( + str(message.failure), + 'Error message\n' + 'TRACE HERE' + ) + self.assertIsInstance(message.failure, + oslo_messaging.MessagingException) + + +class PikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.MagicMock() + self._exchange = "it is exchange" + self._routing_key = "it is routing key" + self._expiration = 1 + self._expiration_time = time.time() + self._expiration + self._mandatory = object() + + self._message = {"msg_type": 1, "msg_str": "hello"} + self._context = {"request_id": 555, "token": "it is a token"} + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_with_confirmation(self): + message = pika_drv_msg.PikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + message.send( + exchange=self._exchange, + routing_key=self._routing_key, + confirm=True, + mandatory=self._mandatory, + persistent=True, + expiration_time=self._expiration_time, + retrier=None + ) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=self._mandatory, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 2) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertTrue(props.message_id) + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_without_confirmation(self): + message = pika_drv_msg.PikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + message.send( + exchange=self._exchange, + routing_key=self._routing_key, + confirm=False, + mandatory=self._mandatory, + persistent=False, + expiration_time=self._expiration_time, + retrier=None + ) + + self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=self._mandatory, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_without_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) + < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertTrue(props.message_id) + + +class RpcPikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._exchange = "it is exchange" + self._routing_key = "it is routing key" + + self._pika_engine = mock.MagicMock() + self._pika_engine.get_rpc_exchange_name.return_value = self._exchange + self._pika_engine.get_rpc_queue_name.return_value = self._routing_key + + self._message = {"msg_type": 1, "msg_str": "hello"} + self._context = {"request_id": 555, "token": "it is a token"} + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_cast_message(self): + message = pika_drv_msg.RpcPikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + expiration = 1 + expiration_time = time.time() + expiration + + message.send( + target=oslo_messaging.Target(exchange=self._exchange, + topic=self._routing_key), + reply_listener=None, + expiration_time=expiration_time, + retrier=None + ) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(expiration * 1000 - float(props.expiration) < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertIsNone(props.correlation_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id) + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_send_call_message(self): + message = pika_drv_msg.RpcPikaOutgoingMessage( + self._pika_engine, self._message, self._context + ) + + expiration = 1 + expiration_time = time.time() + expiration + + result = "it is a result" + reply_queue_name = "reply_queue_name" + + future = futures.Future() + future.set_result(result) + reply_listener = mock.Mock() + reply_listener.register_reply_waiter.return_value = future + reply_listener.get_reply_qname.return_value = reply_queue_name + + res = message.send( + target=oslo_messaging.Target(exchange=self._exchange, + topic=self._routing_key), + reply_listener=reply_listener, + expiration_time=expiration_time, + retrier=None + ) + + self.assertEqual(result, res) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._routing_key + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + + self.assertEqual( + b'{"_$_request_id": 555, "_$_token": "it is a token", ' + b'"msg_str": "hello", "msg_type": 1}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(expiration * 1000 - float(props.expiration) < 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertEquals(props.reply_to, reply_queue_name) + self.assertTrue(props.message_id) + + +class RpcReplyPikaOutgoingMessageTestCase(unittest.TestCase): + def setUp(self): + self._reply_q = "reply_queue_name" + + self._expiration = 1 + self._expiration_time = time.time() + self._expiration + + self._pika_engine = mock.MagicMock() + + self._rpc_reply_exchange = "rpc_reply_exchange" + self._pika_engine.rpc_reply_exchange = self._rpc_reply_exchange + + self._msg_id = 12345567 + + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_success_message_send(self): + message = pika_drv_msg.RpcReplyPikaOutgoingMessage( + self._pika_engine, self._msg_id, reply="all_fine" + ) + + message.send(self._reply_q, expiration_time=self._expiration_time, + retrier=None) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=b'{"s": "all_fine"}', + exchange=self._rpc_reply_exchange, mandatory=True, + properties=mock.ANY, + routing_key=self._reply_q + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id) + + @patch("traceback.format_exception", new=lambda x,y,z:z) + @patch("oslo_serialization.jsonutils.dumps", + new=functools.partial(jsonutils.dumps, sort_keys=True)) + def test_failure_message_send(self): + failure_info = (oslo_messaging.MessagingException, + oslo_messaging.MessagingException("Error message"), + ['It is a trace']) + + + message = pika_drv_msg.RpcReplyPikaOutgoingMessage( + self._pika_engine, self._msg_id, failure_info=failure_info + ) + + message.send(self._reply_q, expiration_time=self._expiration_time, + retrier=None) + + self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.assert_called_once_with( + body=mock.ANY, + exchange=self._rpc_reply_exchange, + mandatory=True, + properties=mock.ANY, + routing_key=self._reply_q + ) + + body = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["body"] + self.assertEqual( + b'{"e": {"c": "MessagingException", ' + b'"m": "oslo_messaging.exceptions", "s": "Error message", ' + b'"t": ["It is a trace"]}}', + body + ) + + props = self._pika_engine.connection_with_confirmation_pool.acquire( + ).__enter__().channel.publish.call_args[1]["properties"] + + self.assertEqual(props.content_encoding, 'utf-8') + self.assertEqual(props.content_type, 'application/json') + self.assertEqual(props.delivery_mode, 1) + self.assertTrue(self._expiration * 1000 - float(props.expiration) < + 100) + self.assertEqual(props.headers, {'version': '1.0'}) + self.assertEqual(props.correlation_id, message.msg_id) + self.assertIsNone(props.reply_to) + self.assertTrue(props.message_id) diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py new file mode 100644 index 000000000..77a3b6b29 --- /dev/null +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -0,0 +1,536 @@ +# Copyright 2015 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import unittest + +import mock + +from oslo_messaging._drivers.pika_driver import pika_poller + + +class PikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + self._prefetch_count = 123 + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + unused = object() + method = object() + properties = object() + body = object() + + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + unused, method, properties, body + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], incoming_message_class_mock.return_value) + incoming_message_class_mock.assert_called_once_with( + self._pika_engine, self._poller_channel_mock, method, properties, + body + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_after_stop(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + params = [] + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + for i in range(10): + poller._on_message_no_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=1) + self.assertEqual(len(res), 1) + self.assertEqual(res[0], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[0][0], + (self._pika_engine, None) + params[0][1:] + ) + + poller.stop() + + res2 = poller.poll(prefetch_size=n) + + self.assertEqual(len(res2), n-1) + self.assertEqual(incoming_message_class_mock.call_count, n) + + self.assertEqual( + self._poller_connection_mock.process_data_events.call_count, 1) + + for i in range(n-1): + self.assertEqual(res2[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i+1][0], + (self._pika_engine, None) + params[i+1][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_batch(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + params = [] + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + poller._on_message_with_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=n) + + self.assertEqual(len(res), n) + self.assertEqual(incoming_message_class_mock.call_count, n) + + for i in range(n): + self.assertEqual(res[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i][0], + (self._pika_engine, self._poller_channel_mock) + params[i][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." + "_declare_queue_binding") + def test_poll_batch_with_timeout(self, declare_queue_binding_mock): + incoming_message_class_mock = mock.Mock() + poller = pika_poller.PikaPoller( + self._pika_engine, self._prefetch_count, + incoming_message_class=incoming_message_class_mock + ) + + n = 10 + timeout = 1 + sleep_time = 0.2 + params = [] + + success_count = 5 + + for i in range(n): + params.append((object(), object(), object(), object())) + + index = [0] + + def f(time_limit): + time.sleep(sleep_time) + poller._on_message_with_ack_callback( + *params[index[0]] + ) + index[0] += 1 + + self._poller_connection_mock.process_data_events.side_effect = f + + poller.start() + res = poller.poll(prefetch_size=n, timeout=timeout) + + self.assertEqual(len(res), success_count) + self.assertEqual(incoming_message_class_mock.call_count, success_count) + + for i in range(success_count): + self.assertEqual(res[i], incoming_message_class_mock.return_value) + self.assertEqual( + incoming_message_class_mock.call_args_list[i][0], + (self._pika_engine, self._poller_channel_mock) + params[i][1:] + ) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + self.assertTrue(declare_queue_binding_mock.called) + + +class RpcServicePikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._pika_engine.get_rpc_queue_name.side_effect = ( + lambda topic, server, no_ack: "_".join( + [topic, str(server), str(no_ack)] + ) + ) + + self._pika_engine.get_rpc_exchange_name.side_effect = ( + lambda exchange, topic, fanout, no_ack: "_".join( + [exchange, topic, str(fanout), str(no_ack)] + ) + ) + + self._prefetch_count = 123 + self._target = mock.Mock(exchange="exchange", topic="topic", + server="server") + self._pika_engine.rpc_queue_expiration = 12345 + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "RpcPikaIncomingMessage") + def test_declare_rpc_queue_bindings(self, rpc_pika_incoming_message_mock): + poller = pika_poller.RpcServicePikaPoller( + self._pika_engine, self._target, self._prefetch_count, + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], rpc_pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 6 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_True", + exchange_type='direct', + queue="topic_None_True", + queue_expiration=12345, + routing_key="topic_None_True" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_True", + exchange_type='direct', + queue="topic_server_True", + queue_expiration=12345, + routing_key="topic_server_True" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_True_True", + exchange_type='fanout', + queue="topic_server_True", + queue_expiration=12345, + routing_key='' + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_False", + exchange_type='direct', + queue="topic_None_False", + queue_expiration=12345, + routing_key="topic_None_False" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_False_False", + exchange_type='direct', + queue="topic_server_False", + queue_expiration=12345, + routing_key="topic_server_False" + ), + mock.call( + channel=self._poller_channel_mock, durable=False, + exchange="exchange_topic_True_False", + exchange_type='fanout', + queue="topic_server_False", + queue_expiration=12345, + routing_key='' + ), + )) + + +class RpcReplyServicePikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._prefetch_count = 123 + self._exchange = "rpc_reply_exchange" + self._queue = "rpc_reply_queue" + + self._pika_engine.rpc_reply_retry_delay = 12132543456 + + self._pika_engine.rpc_queue_expiration = 12345 + self._pika_engine.rpc_reply_retry_attempts = 3 + + def test_start(self): + poller = pika_poller.RpcReplyPikaPoller( + self._pika_engine, self._exchange, self._queue, + self._prefetch_count, + ) + + poller.start() + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + def test_declare_rpc_reply_queue_binding(self): + poller = pika_poller.RpcReplyPikaPoller( + self._pika_engine, self._exchange, self._queue, + self._prefetch_count, + ) + + poller.start() + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 1 + ) + + declare_queue_binding_by_channel_mock.assert_called_once_with( + channel=self._poller_channel_mock, durable=False, + exchange='rpc_reply_exchange', exchange_type='direct', + queue='rpc_reply_queue', queue_expiration=12345, + routing_key='rpc_reply_queue' + ) + + +class NotificationPikaPollerTestCase(unittest.TestCase): + def setUp(self): + self._pika_engine = mock.Mock() + self._poller_connection_mock = mock.Mock() + self._poller_channel_mock = mock.Mock() + self._poller_connection_mock.channel.return_value = ( + self._poller_channel_mock + ) + self._pika_engine.create_connection.return_value = ( + self._poller_connection_mock + ) + + self._prefetch_count = 123 + self._target_and_priorities = ( + ( + mock.Mock(exchange="exchange1", topic="topic1", + server="server1"), 1 + ), + ( + mock.Mock(exchange="exchange1", topic="topic1"), 2 + ), + ( + mock.Mock(exchange="exchange2", topic="topic2",), 1 + ), + ) + self._pika_engine.notification_persistence = object() + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "PikaIncomingMessage") + def test_declare_notification_queue_bindings_default_queue( + self, pika_incoming_message_mock): + poller = pika_poller.NotificationPikaPoller( + self._pika_engine, self._target_and_priorities, None, + self._prefetch_count, + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 3 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="topic1.1", + queue_expiration=None, + routing_key="topic1.1" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="topic1.2", + queue_expiration=None, + routing_key="topic1.2" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange2", + exchange_type='direct', + queue="topic2.1", + queue_expiration=None, + routing_key="topic2.1" + ) + )) + + @mock.patch("oslo_messaging._drivers.pika_driver.pika_message." + "PikaIncomingMessage") + def test_declare_notification_queue_bindings_custom_queue( + self, pika_incoming_message_mock): + poller = pika_poller.NotificationPikaPoller( + self._pika_engine, self._target_and_priorities, + "custom_queue_name", self._prefetch_count + ) + self._poller_connection_mock.process_data_events.side_effect = ( + lambda time_limit: poller._on_message_with_ack_callback( + None, None, None, None + ) + ) + + poller.start() + res = poller.poll() + + self.assertEqual(len(res), 1) + + self.assertEqual(res[0], pika_incoming_message_mock.return_value) + + self.assertTrue(self._pika_engine.create_connection.called) + self.assertTrue(self._poller_connection_mock.channel.called) + + declare_queue_binding_by_channel_mock = ( + self._pika_engine.declare_queue_binding_by_channel + ) + + self.assertEqual( + declare_queue_binding_by_channel_mock.call_count, 3 + ) + + declare_queue_binding_by_channel_mock.assert_has_calls(( + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic1.1" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange1", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic1.2" + ), + mock.call( + channel=self._poller_channel_mock, + durable=self._pika_engine.notification_persistence, + exchange="exchange2", + exchange_type='direct', + queue="custom_queue_name", + queue_expiration=None, + routing_key="topic2.1" + ) + )) diff --git a/requirements.txt b/requirements.txt index b0efa1852..bce63e3cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,6 +35,8 @@ PyYAML>=3.1.0 # we set the amqp version to ensure heartbeat works amqp>=1.4.0 kombu>=3.0.7 +pika>=0.10.0 +pika-pool>=0.1.3 # middleware oslo.middleware>=3.0.0 # Apache-2.0 diff --git a/setup-test-env-pika.sh b/setup-test-env-pika.sh new file mode 100755 index 000000000..5fe189555 --- /dev/null +++ b/setup-test-env-pika.sh @@ -0,0 +1,32 @@ +#!/bin/bash +set -e + +. tools/functions.sh + +DATADIR=$(mktemp -d /tmp/OSLOMSG-RABBIT.XXXXX) +trap "clean_exit $DATADIR" EXIT + +export RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 +export RABBITMQ_NODE_PORT=65123 +export RABBITMQ_NODENAME=oslomsg-test@localhost +export RABBITMQ_LOG_BASE=$DATADIR +export RABBITMQ_MNESIA_BASE=$DATADIR +export RABBITMQ_PID_FILE=$DATADIR/pid +export HOME=$DATADIR + +# NOTE(sileht): We directly use the rabbitmq scripts +# to avoid distribution check, like running as root/rabbitmq +# enforcing. +export PATH=/usr/lib/rabbitmq/bin/:$PATH + + +mkfifo ${DATADIR}/out +rabbitmq-server &> ${DATADIR}/out & +wait_for_line "Starting broker... completed" "ERROR:" ${DATADIR}/out + +rabbitmqctl add_user oslomsg oslosecret +rabbitmqctl set_permissions "oslomsg" ".*" ".*" ".*" + + +export TRANSPORT_URL=pika://oslomsg:oslosecret@127.0.0.1:65123// +$* diff --git a/setup.cfg b/setup.cfg index b45466ca9..d09e6e685 100644 --- a/setup.cfg +++ b/setup.cfg @@ -37,6 +37,7 @@ oslo.messaging.drivers = # This is just for internal testing fake = oslo_messaging._drivers.impl_fake:FakeDriver + pika = oslo_messaging._drivers.impl_pika:PikaDriver oslo.messaging.executors = aioeventlet = oslo_messaging._executors.impl_aioeventlet:AsyncioEventletExecutor diff --git a/tox.ini b/tox.ini index b0d346351..6e1f019a1 100644 --- a/tox.ini +++ b/tox.ini @@ -27,6 +27,9 @@ commands = python setup.py build_sphinx [testenv:py27-func-rabbit] commands = {toxinidir}/setup-test-env-rabbit.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' +[testenv:py27-func-pika] +commands = {toxinidir}/setup-test-env-pika.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' + [testenv:py27-func-amqp1] setenv = TRANSPORT_URL=amqp://stackqpid:secretqpid@127.0.0.1:65123// # NOTE(flaper87): This gate job run on fedora21 for now.