From c719dc306bdd9fac0e07f83cadf9efe496dfb568 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin <markmc@redhat.com> Date: Tue, 23 Jul 2013 18:06:59 +0100 Subject: [PATCH] Don't call consume() each time iterconsume() is called We're going to be using iterconsume(limit=1) and this basically seems to be broken right now because you get an error if you call consume() multiple times on the same connection. Set the 'do_consume' flag at connection time rather than on entry into iterconsume(). Change-Id: I988e4074ae0e267384931d6e1994e9cbe5248196 --- oslo/messaging/_drivers/impl_rabbit.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 1ab593ce6..202db86a0 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -469,6 +469,7 @@ class Connection(object): self.memory_transport = self.conf.fake_rabbit self.connection = None + self.do_consume = None self.reconnect() def _fetch_ssl_params(self): @@ -518,6 +519,7 @@ class Connection(object): if self.memory_transport: # Kludge to speed up tests. self.connection.transport.polling_interval = 0.0 + self.do_consume = True self.consumer_num = itertools.count(1) self.connection.connect() self.channel = self.connection.channel() @@ -645,8 +647,6 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" - info = {'do_consume': True} - def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.debug(_('Timed out waiting for RPC response: %s') % @@ -655,16 +655,16 @@ class Connection(object): else: LOG.exception(_('Failed to consume message from queue: %s') % str(exc)) - info['do_consume'] = True + self.do_consume = True def _consume(): - if info['do_consume']: + if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) - info['do_consume'] = False + self.do_consume = False return self.connection.drain_events(timeout=timeout) for iteration in itertools.count(0):