Merge "Fix consuming from unbound reply queue"

This commit is contained in:
Jenkins 2016-09-12 17:01:34 +00:00 committed by Gerrit Code Review
commit 5bf96fe761

@ -277,6 +277,7 @@ class Consumer(object):
rabbit_queue_ttl) rabbit_queue_ttl)
self.queue = None self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange( self.exchange = kombu.entity.Exchange(
name=exchange_name, name=exchange_name,
type=type, type=type,
@ -285,6 +286,7 @@ class Consumer(object):
def declare(self, conn): def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect.""" """Re-declare the queue after a rabbit (re)connect."""
self.queue = kombu.entity.Queue( self.queue = kombu.entity.Queue(
name=self.queue_name, name=self.queue_name,
channel=conn.channel, channel=conn.channel,
@ -308,17 +310,41 @@ class Consumer(object):
self.queue.declare() self.queue.declare()
else: else:
raise raise
self._declared_on = conn.channel
def consume(self, tag): def consume(self, conn, tag):
"""Actually declare the consumer on the amqp channel. This will """Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the start the flow of messages from the queue. Using the
Connection.consume() will process the messages, Connection.consume() will process the messages,
calling the appropriate callback. calling the appropriate callback.
""" """
self.queue.consume(callback=self._callback, # Ensure we are on the correct channel before consuming
consumer_tag=six.text_type(tag), if conn.channel != self._declared_on:
nowait=self.nowait) self.declare(conn)
try:
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
except conn.connection.channel_errors as exc:
# We retries once because of some races that we can
# recover before informing the deployer
# bugs.launchpad.net/oslo.messaging/+bug/1581148
# bugs.launchpad.net/oslo.messaging/+bug/1609766
# bugs.launchpad.net/neutron/+bug/1318721
# At any channel error, the RabbitMQ closes
# the channel, but the amqp-lib quietly re-open
# it. So, we must reset all tags and declare
# all consumers again.
conn._new_tags = set(conn._consumers.values())
if exc.code == 404:
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
else:
raise
def cancel(self, tag): def cancel(self, tag):
LOG.trace('ConsumerBase.cancel: canceling %s', tag) LOG.trace('ConsumerBase.cancel: canceling %s', tag)
@ -754,8 +780,6 @@ class Connection(object):
self.set_transport_socket_timeout() self.set_transport_socket_timeout()
self._set_current_channel(new_channel) self._set_current_channel(new_channel)
for consumer in self._consumers:
consumer.declare(self)
LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on ' LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
'%(hostname)s:%(port)s via [%(transport)s] client' '%(hostname)s:%(port)s via [%(transport)s] client'
@ -832,6 +856,8 @@ class Connection(object):
if self.purpose == rpc_common.PURPOSE_LISTEN: if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel) self._set_qos(new_channel)
self._producer = kombu.messaging.Producer(new_channel) self._producer = kombu.messaging.Producer(new_channel)
for consumer in self._consumers:
consumer.declare(self)
def _set_qos(self, channel): def _set_qos(self, channel):
"""Set QoS prefetch count on the channel""" """Set QoS prefetch count on the channel"""
@ -1042,31 +1068,11 @@ class Connection(object):
if not self.connection.connected: if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0] raise self.connection.recoverable_connection_errors[0]
consume_max_retries = 2 while self._new_tags:
while self._new_tags and consume_max_retries:
for consumer, tag in self._consumers.items(): for consumer, tag in self._consumers.items():
if tag in self._new_tags: if tag in self._new_tags:
try: consumer.consume(self, tag=tag)
consumer.consume(tag=tag) self._new_tags.remove(tag)
self._new_tags.remove(tag)
except self.connection.channel_errors as exc:
# NOTE(kbespalov): during the interval between
# a queue declaration and consumer declaration
# the queue can disappear. In this case
# we must redeclare queue and try to re-consume.
# More details is here:
# bugs.launchpad.net/oslo.messaging/+bug/1581148
if exc.code == 404 and consume_max_retries:
consumer.declare(self)
# NOTE(kbespalov): the broker closes a channel
# at any channel error. The py-amqp catches
# this situation and re-open a new channel.
# So, we must re-declare all consumers again.
self._new_tags = set(self._consumers.values())
consume_max_retries -= 1
break
else:
raise
poll_timeout = (self._poll_timeout if timeout is None poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout)) else min(timeout, self._poll_timeout))