Merge "Pika: fix sending fanout messages"
This commit is contained in:
commit
9ecac96e2e
oslo_messaging
@ -214,7 +214,7 @@ class PikaDriver(base.BaseDriver):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if target.fanout:
|
if target.fanout:
|
||||||
return self.cast_all_servers(
|
return self.cast_all_workers(
|
||||||
exchange, target.topic, ctxt, message, expiration_time,
|
exchange, target.topic, ctxt, message, expiration_time,
|
||||||
retrier
|
retrier
|
||||||
)
|
)
|
||||||
@ -249,7 +249,7 @@ class PikaDriver(base.BaseDriver):
|
|||||||
|
|
||||||
return reply.result
|
return reply.result
|
||||||
|
|
||||||
def cast_all_servers(self, exchange, topic, ctxt, message, expiration_time,
|
def cast_all_workers(self, exchange, topic, ctxt, message, expiration_time,
|
||||||
retrier=None):
|
retrier=None):
|
||||||
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
msg = pika_drv_msg.PikaOutgoingMessage(self._pika_engine, message,
|
||||||
ctxt)
|
ctxt)
|
||||||
@ -257,7 +257,7 @@ class PikaDriver(base.BaseDriver):
|
|||||||
msg.send(
|
msg.send(
|
||||||
exchange=exchange,
|
exchange=exchange,
|
||||||
routing_key=self._pika_engine.get_rpc_queue_name(
|
routing_key=self._pika_engine.get_rpc_queue_name(
|
||||||
topic, "all_servers", retrier is None
|
topic, "all_workers", retrier is None
|
||||||
),
|
),
|
||||||
mandatory=False,
|
mandatory=False,
|
||||||
expiration_time=expiration_time,
|
expiration_time=expiration_time,
|
||||||
|
@ -25,6 +25,8 @@ from pika import credentials as pika_credentials
|
|||||||
import pika_pool
|
import pika_pool
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
import uuid
|
||||||
|
|
||||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -429,17 +431,21 @@ class PikaEngine(object):
|
|||||||
return exchange or self.default_rpc_exchange
|
return exchange or self.default_rpc_exchange
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_rpc_queue_name(topic, server, no_ack):
|
def get_rpc_queue_name(topic, server, no_ack, worker=False):
|
||||||
"""Returns RabbitMQ queue name for given rpc request
|
"""Returns RabbitMQ queue name for given rpc request
|
||||||
|
|
||||||
:param topic: String, oslo.messaging target's topic
|
:param topic: String, oslo.messaging target's topic
|
||||||
:param server: String, oslo.messaging target's server
|
:param server: String, oslo.messaging target's server
|
||||||
:param no_ack: Boolean, use message delivery with acknowledges or not
|
:param no_ack: Boolean, use message delivery with acknowledges or not
|
||||||
|
:param worker: Boolean, use queue by single worker only or not
|
||||||
|
|
||||||
:return: String, RabbitMQ exchange name
|
:return: String, RabbitMQ queue name
|
||||||
"""
|
"""
|
||||||
queue_parts = ["no_ack" if no_ack else "with_ack", topic]
|
queue_parts = ["no_ack" if no_ack else "with_ack", topic]
|
||||||
if server is not None:
|
if server is not None:
|
||||||
queue_parts.append(server)
|
queue_parts.append(server)
|
||||||
|
if worker:
|
||||||
|
queue_parts.append("worker")
|
||||||
|
queue_parts.append(uuid.uuid4().hex)
|
||||||
queue = '.'.join(queue_parts)
|
queue = '.'.join(queue_parts)
|
||||||
return queue
|
return queue
|
||||||
|
@ -323,16 +323,24 @@ class RpcServicePikaPoller(PikaPoller):
|
|||||||
queue=server_queue, routing_key=server_queue,
|
queue=server_queue, routing_key=server_queue,
|
||||||
exchange_type='direct', queue_expiration=queue_expiration
|
exchange_type='direct', queue_expiration=queue_expiration
|
||||||
)
|
)
|
||||||
all_servers_routing_key = self._pika_engine.get_rpc_queue_name(
|
queues_to_consume.append(
|
||||||
self._target.topic, "all_servers", no_ack
|
{"queue_name": server_queue, "no_ack": no_ack,
|
||||||
|
"consumer_tag": None}
|
||||||
|
)
|
||||||
|
|
||||||
|
worker_queue = self._pika_engine.get_rpc_queue_name(
|
||||||
|
self._target.topic, self._target.server, no_ack, True
|
||||||
|
)
|
||||||
|
all_workers_routing_key = self._pika_engine.get_rpc_queue_name(
|
||||||
|
self._target.topic, "all_workers", no_ack
|
||||||
)
|
)
|
||||||
self._pika_engine.declare_queue_binding_by_channel(
|
self._pika_engine.declare_queue_binding_by_channel(
|
||||||
channel=self._channel, exchange=exchange, durable=False,
|
channel=self._channel, exchange=exchange, durable=False,
|
||||||
queue=server_queue, routing_key=all_servers_routing_key,
|
queue=worker_queue, routing_key=all_workers_routing_key,
|
||||||
exchange_type='direct', queue_expiration=queue_expiration
|
exchange_type='direct', queue_expiration=queue_expiration
|
||||||
)
|
)
|
||||||
queues_to_consume.append(
|
queues_to_consume.append(
|
||||||
{"queue_name": server_queue, "no_ack": no_ack,
|
{"queue_name": worker_queue, "no_ack": no_ack,
|
||||||
"consumer_tag": None}
|
"consumer_tag": None}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -228,9 +228,8 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
|
|
||||||
self._pika_engine.get_rpc_queue_name.side_effect = (
|
self._pika_engine.get_rpc_queue_name.side_effect = (
|
||||||
lambda topic, server, no_ack: "_".join(
|
lambda topic, server, no_ack, worker=False:
|
||||||
[topic, str(server), str(no_ack)]
|
"_".join([topic, str(server), str(no_ack), str(worker)])
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
self._pika_engine.get_rpc_exchange_name.side_effect = (
|
self._pika_engine.get_rpc_exchange_name.side_effect = (
|
||||||
@ -277,49 +276,49 @@ class RpcServicePikaPollerTestCase(unittest.TestCase):
|
|||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_None_True",
|
queue="topic_None_True_False",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key="topic_None_True"
|
routing_key="topic_None_True_False"
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_server_True",
|
queue="topic_server_True_False",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key="topic_server_True"
|
routing_key="topic_server_True_False"
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_server_True",
|
queue="topic_server_True_True",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key="topic_all_servers_True"
|
routing_key="topic_all_workers_True_False"
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_None_False",
|
queue="topic_None_False_False",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key="topic_None_False"
|
routing_key="topic_None_False_False"
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_server_False",
|
queue="topic_server_False_False",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key='topic_server_False'
|
routing_key='topic_server_False_False'
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
channel=self._poller_channel_mock, durable=False,
|
channel=self._poller_channel_mock, durable=False,
|
||||||
exchange="exchange",
|
exchange="exchange",
|
||||||
exchange_type='direct',
|
exchange_type='direct',
|
||||||
queue="topic_server_False",
|
queue="topic_server_False_True",
|
||||||
queue_expiration=12345,
|
queue_expiration=12345,
|
||||||
routing_key='topic_all_servers_False'
|
routing_key='topic_all_workers_False_False'
|
||||||
)
|
)
|
||||||
))
|
))
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user