Merge "Remove server queue creating if target's server is empty"

This commit is contained in:
Jenkins 2016-02-12 03:34:47 +00:00 committed by Gerrit Code Review
commit eb4df4ecd0
2 changed files with 25 additions and 21 deletions

View File

@ -548,7 +548,7 @@ class RpcPikaOutgoingMessage(PikaOutgoingMessage):
else:
self._do_send(
exchange=exchange, routing_key=queue, msg_dict=msg_dict,
msg_props=msg_props, confirm=True, mandatory=True,
msg_props=msg_props, confirm=True, mandatory=not target.fanout,
persistent=False, expiration_time=expiration_time,
retrier=retrier
)

View File

@ -263,34 +263,38 @@ class RpcServicePikaPoller(PikaPoller):
exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, False, no_ack
)
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, True, no_ack
)
queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, None, no_ack
)
server_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack
)
queues_to_consume[queue] = no_ack
queues_to_consume[server_queue] = no_ack
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, queue=queue,
routing_key=queue, exchange_type='direct', durable=False,
queue_expiration=queue_expiration
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, queue=server_queue,
routing_key=server_queue, exchange_type='direct',
queue_expiration=queue_expiration, durable=False
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=fanout_exchange, durable=False,
queue=server_queue, routing_key="", exchange_type='fanout',
queue_expiration=queue_expiration
)
queues_to_consume[queue] = no_ack
if self._target.server:
server_queue = self._pika_engine.get_rpc_queue_name(
self._target.topic, self._target.server, no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=exchange, durable=False,
queue=server_queue, routing_key=server_queue,
exchange_type='direct', queue_expiration=queue_expiration
)
queues_to_consume[server_queue] = no_ack
fanout_exchange = self._pika_engine.get_rpc_exchange_name(
self._target.exchange, self._target.topic, True, no_ack
)
self._pika_engine.declare_queue_binding_by_channel(
channel=self._channel, exchange=fanout_exchange,
queue=server_queue, routing_key="", exchange_type='fanout',
queue_expiration=queue_expiration, durable=False
)
return queues_to_consume