diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 73512742f..08cb20517 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -194,11 +194,11 @@ class Consumer(object): durable=self.durable, auto_delete=self.auto_delete) - def declare(self, channel): + def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" self.queue = kombu.entity.Queue( name=self.queue_name, - channel=channel, + channel=conn.channel, exchange=self.exchange, durable=self.durable, auto_delete=self.auto_delete, @@ -207,18 +207,16 @@ class Consumer(object): try: self.queue.declare() - except Exception as e: - # NOTE: This exception may be triggered by a race condition. - # Simply retrying will solve the error most of the time and - # should work well enough as a workaround until the race condition - # itself can be fixed. - # TODO(jrosenboom): In order to be able to match the Exception - # more specifically, we have to refactor ConsumerBase to use - # 'channel_errors' of the kombu connection object that - # has created the channel. + except conn.connection.channel_errors as exc: + # NOTE(jrosenboom): This exception may be triggered by a race + # condition. Simply retrying will solve the error most of the time + # and should work well enough as a workaround until the race + # condition itself can be fixed. # See https://bugs.launchpad.net/neutron/+bug/1318721 for details. - LOG.error(_("Declaring queue failed with (%s), retrying"), e) - self.queue.declare() + if exc.code == 404: + self.queue.declare() + else: + raise def consume(self, tag): """Actually declare the consumer on the amqp channel. This will @@ -706,7 +704,7 @@ class Connection(object): """ self._set_current_channel(new_channel) for consumer in self._consumers: - consumer.declare(new_channel) + consumer.declare(self) LOG.info(_LI('Reconnected to AMQP server on ' '%(hostname)s:%(port)d'), @@ -858,7 +856,7 @@ class Connection(object): "%(err_str)s"), log_info) def _declare_consumer(): - consumer.declare(self.channel) + consumer.declare(self) self._consumers.append(consumer) self._new_consumers.append(consumer) return consumer