Merge "rabbit: fix exception path in queue redeclaration"
This commit is contained in:
commit
a5451eed35
@ -194,11 +194,11 @@ class Consumer(object):
|
|||||||
durable=self.durable,
|
durable=self.durable,
|
||||||
auto_delete=self.auto_delete)
|
auto_delete=self.auto_delete)
|
||||||
|
|
||||||
def declare(self, channel):
|
def declare(self, conn):
|
||||||
"""Re-declare the queue after a rabbit (re)connect."""
|
"""Re-declare the queue after a rabbit (re)connect."""
|
||||||
self.queue = kombu.entity.Queue(
|
self.queue = kombu.entity.Queue(
|
||||||
name=self.queue_name,
|
name=self.queue_name,
|
||||||
channel=channel,
|
channel=conn.channel,
|
||||||
exchange=self.exchange,
|
exchange=self.exchange,
|
||||||
durable=self.durable,
|
durable=self.durable,
|
||||||
auto_delete=self.auto_delete,
|
auto_delete=self.auto_delete,
|
||||||
@ -207,18 +207,16 @@ class Consumer(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
except Exception as e:
|
except conn.connection.channel_errors as exc:
|
||||||
# NOTE: This exception may be triggered by a race condition.
|
# NOTE(jrosenboom): This exception may be triggered by a race
|
||||||
# Simply retrying will solve the error most of the time and
|
# condition. Simply retrying will solve the error most of the time
|
||||||
# should work well enough as a workaround until the race condition
|
# and should work well enough as a workaround until the race
|
||||||
# itself can be fixed.
|
# condition itself can be fixed.
|
||||||
# TODO(jrosenboom): In order to be able to match the Exception
|
|
||||||
# more specifically, we have to refactor ConsumerBase to use
|
|
||||||
# 'channel_errors' of the kombu connection object that
|
|
||||||
# has created the channel.
|
|
||||||
# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
|
# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
|
||||||
LOG.error(_("Declaring queue failed with (%s), retrying"), e)
|
if exc.code == 404:
|
||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
def consume(self, tag):
|
def consume(self, tag):
|
||||||
"""Actually declare the consumer on the amqp channel. This will
|
"""Actually declare the consumer on the amqp channel. This will
|
||||||
@ -706,7 +704,7 @@ class Connection(object):
|
|||||||
"""
|
"""
|
||||||
self._set_current_channel(new_channel)
|
self._set_current_channel(new_channel)
|
||||||
for consumer in self._consumers:
|
for consumer in self._consumers:
|
||||||
consumer.declare(new_channel)
|
consumer.declare(self)
|
||||||
|
|
||||||
LOG.info(_LI('Reconnected to AMQP server on '
|
LOG.info(_LI('Reconnected to AMQP server on '
|
||||||
'%(hostname)s:%(port)d'),
|
'%(hostname)s:%(port)d'),
|
||||||
@ -858,7 +856,7 @@ class Connection(object):
|
|||||||
"%(err_str)s"), log_info)
|
"%(err_str)s"), log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
consumer.declare(self.channel)
|
consumer.declare(self)
|
||||||
self._consumers.append(consumer)
|
self._consumers.append(consumer)
|
||||||
self._new_consumers.append(consumer)
|
self._new_consumers.append(consumer)
|
||||||
return consumer
|
return consumer
|
||||||
|
Loading…
x
Reference in New Issue
Block a user