diff --git a/oslo_messaging/_drivers/impl_pika.py b/oslo_messaging/_drivers/impl_pika.py index 9f47102d5..25047e107 100644 --- a/oslo_messaging/_drivers/impl_pika.py +++ b/oslo_messaging/_drivers/impl_pika.py @@ -214,7 +214,7 @@ class PikaDriver(base.BaseDriver): ) if target.fanout: - return self.cast_all_servers( + return self.cast_all_workers( exchange, target.topic, ctxt, message, expiration_time, retrier ) @@ -249,7 +249,7 @@ class PikaDriver(base.BaseDriver): return reply.result - def cast_all_servers(self, exchange, topic, ctxt, message, expiration_time, + def cast_all_workers(self, exchange, topic, ctxt, message, expiration_time, retrier=None): msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message, ctxt) @@ -257,7 +257,7 @@ class PikaDriver(base.BaseDriver): msg.send( exchange=exchange, routing_key=self._pika_engine.get_rpc_queue_name( - topic, "all_servers", retrier is None + topic, "all_workers", retrier is None ), mandatory=False, expiration_time=expiration_time, diff --git a/oslo_messaging/_drivers/pika_driver/pika_engine.py b/oslo_messaging/_drivers/pika_driver/pika_engine.py index f5593c4e6..c09936576 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_engine.py +++ b/oslo_messaging/_drivers/pika_driver/pika_engine.py @@ -25,6 +25,8 @@ from pika import credentials as pika_credentials import pika_pool import six +import uuid + from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc LOG = logging.getLogger(__name__) @@ -429,17 +431,21 @@ class PikaEngine(object): return exchange or self.default_rpc_exchange @staticmethod - def get_rpc_queue_name(topic, server, no_ack): + def get_rpc_queue_name(topic, server, no_ack, worker=False): """Returns RabbitMQ queue name for given rpc request :param topic: String, oslo.messaging target's topic :param server: String, oslo.messaging target's server :param no_ack: Boolean, use message delivery with acknowledges or not + :param worker: Boolean, use queue by single worker only or not - :return: String, RabbitMQ exchange name + :return: String, RabbitMQ queue name """ queue_parts = ["no_ack" if no_ack else "with_ack", topic] if server is not None: queue_parts.append(server) + if worker: + queue_parts.append("worker") + queue_parts.append(uuid.uuid4().hex) queue = '.'.join(queue_parts) return queue diff --git a/oslo_messaging/_drivers/pika_driver/pika_poller.py b/oslo_messaging/_drivers/pika_driver/pika_poller.py index cc2d6e983..a5279fa8e 100644 --- a/oslo_messaging/_drivers/pika_driver/pika_poller.py +++ b/oslo_messaging/_drivers/pika_driver/pika_poller.py @@ -323,16 +323,24 @@ class RpcServicePikaPoller(PikaPoller): queue=server_queue, routing_key=server_queue, exchange_type='direct', queue_expiration=queue_expiration ) - all_servers_routing_key = self._pika_engine.get_rpc_queue_name( - self._target.topic, "all_servers", no_ack + queues_to_consume.append( + {"queue_name": server_queue, "no_ack": no_ack, + "consumer_tag": None} + ) + + worker_queue = self._pika_engine.get_rpc_queue_name( + self._target.topic, self._target.server, no_ack, True + ) + all_workers_routing_key = self._pika_engine.get_rpc_queue_name( + self._target.topic, "all_workers", no_ack ) self._pika_engine.declare_queue_binding_by_channel( channel=self._channel, exchange=exchange, durable=False, - queue=server_queue, routing_key=all_servers_routing_key, + queue=worker_queue, routing_key=all_workers_routing_key, exchange_type='direct', queue_expiration=queue_expiration ) queues_to_consume.append( - {"queue_name": server_queue, "no_ack": no_ack, + {"queue_name": worker_queue, "no_ack": no_ack, "consumer_tag": None} ) diff --git a/oslo_messaging/tests/drivers/pika/test_poller.py b/oslo_messaging/tests/drivers/pika/test_poller.py index 17ba3b708..1209d131a 100644 --- a/oslo_messaging/tests/drivers/pika/test_poller.py +++ b/oslo_messaging/tests/drivers/pika/test_poller.py @@ -228,9 +228,8 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): ) self._pika_engine.get_rpc_queue_name.side_effect = ( - lambda topic, server, no_ack: "_".join( - [topic, str(server), str(no_ack)] - ) + lambda topic, server, no_ack, worker=False: + "_".join([topic, str(server), str(no_ack), str(worker)]) ) self._pika_engine.get_rpc_exchange_name.side_effect = ( @@ -277,49 +276,49 @@ class RpcServicePikaPollerTestCase(unittest.TestCase): channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_None_True", + queue="topic_None_True_False", queue_expiration=12345, - routing_key="topic_None_True" + routing_key="topic_None_True_False" ), mock.call( channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_server_True", + queue="topic_server_True_False", queue_expiration=12345, - routing_key="topic_server_True" + routing_key="topic_server_True_False" ), mock.call( channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_server_True", + queue="topic_server_True_True", queue_expiration=12345, - routing_key="topic_all_servers_True" + routing_key="topic_all_workers_True_False" ), mock.call( channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_None_False", + queue="topic_None_False_False", queue_expiration=12345, - routing_key="topic_None_False" + routing_key="topic_None_False_False" ), mock.call( channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_server_False", + queue="topic_server_False_False", queue_expiration=12345, - routing_key='topic_server_False' + routing_key='topic_server_False_False' ), mock.call( channel=self._poller_channel_mock, durable=False, exchange="exchange", exchange_type='direct', - queue="topic_server_False", + queue="topic_server_False_True", queue_expiration=12345, - routing_key='topic_all_servers_False' + routing_key='topic_all_workers_False_False' ) ))