Don't call consume() each time iterconsume() is called
We're going to be using iterconsume(limit=1) and this basically seems to be broken right now because you get an error if you call consume() multiple times on the same connection. Set the 'do_consume' flag at connection time rather than on entry into iterconsume(). Change-Id: I988e4074ae0e267384931d6e1994e9cbe5248196
This commit is contained in:
parent
8ccb5741d2
commit
c719dc306b
@ -469,6 +469,7 @@ class Connection(object):
|
|||||||
self.memory_transport = self.conf.fake_rabbit
|
self.memory_transport = self.conf.fake_rabbit
|
||||||
|
|
||||||
self.connection = None
|
self.connection = None
|
||||||
|
self.do_consume = None
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
def _fetch_ssl_params(self):
|
def _fetch_ssl_params(self):
|
||||||
@ -518,6 +519,7 @@ class Connection(object):
|
|||||||
if self.memory_transport:
|
if self.memory_transport:
|
||||||
# Kludge to speed up tests.
|
# Kludge to speed up tests.
|
||||||
self.connection.transport.polling_interval = 0.0
|
self.connection.transport.polling_interval = 0.0
|
||||||
|
self.do_consume = True
|
||||||
self.consumer_num = itertools.count(1)
|
self.consumer_num = itertools.count(1)
|
||||||
self.connection.connect()
|
self.connection.connect()
|
||||||
self.channel = self.connection.channel()
|
self.channel = self.connection.channel()
|
||||||
@ -645,8 +647,6 @@ class Connection(object):
|
|||||||
def iterconsume(self, limit=None, timeout=None):
|
def iterconsume(self, limit=None, timeout=None):
|
||||||
"""Return an iterator that will consume from all queues/consumers."""
|
"""Return an iterator that will consume from all queues/consumers."""
|
||||||
|
|
||||||
info = {'do_consume': True}
|
|
||||||
|
|
||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
if isinstance(exc, socket.timeout):
|
if isinstance(exc, socket.timeout):
|
||||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||||
@ -655,16 +655,16 @@ class Connection(object):
|
|||||||
else:
|
else:
|
||||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||||
str(exc))
|
str(exc))
|
||||||
info['do_consume'] = True
|
self.do_consume = True
|
||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
if info['do_consume']:
|
if self.do_consume:
|
||||||
queues_head = self.consumers[:-1] # not fanout.
|
queues_head = self.consumers[:-1] # not fanout.
|
||||||
queues_tail = self.consumers[-1] # fanout
|
queues_tail = self.consumers[-1] # fanout
|
||||||
for queue in queues_head:
|
for queue in queues_head:
|
||||||
queue.consume(nowait=True)
|
queue.consume(nowait=True)
|
||||||
queues_tail.consume(nowait=False)
|
queues_tail.consume(nowait=False)
|
||||||
info['do_consume'] = False
|
self.do_consume = False
|
||||||
return self.connection.drain_events(timeout=timeout)
|
return self.connection.drain_events(timeout=timeout)
|
||||||
|
|
||||||
for iteration in itertools.count(0):
|
for iteration in itertools.count(0):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user