diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 1ab593ce6..202db86a0 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -469,6 +469,7 @@ class Connection(object): self.memory_transport = self.conf.fake_rabbit self.connection = None + self.do_consume = None self.reconnect() def _fetch_ssl_params(self): @@ -518,6 +519,7 @@ class Connection(object): if self.memory_transport: # Kludge to speed up tests. self.connection.transport.polling_interval = 0.0 + self.do_consume = True self.consumer_num = itertools.count(1) self.connection.connect() self.channel = self.connection.channel() @@ -645,8 +647,6 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" - info = {'do_consume': True} - def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.debug(_('Timed out waiting for RPC response: %s') % @@ -655,16 +655,16 @@ class Connection(object): else: LOG.exception(_('Failed to consume message from queue: %s') % str(exc)) - info['do_consume'] = True + self.do_consume = True def _consume(): - if info['do_consume']: + if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) - info['do_consume'] = False + self.do_consume = False return self.connection.drain_events(timeout=timeout) for iteration in itertools.count(0):