rabbit: fix consumers declaration

When a consumer is declared after we have started to consume
amqp, its queue is never consumed.

This fixes that.

Closes bug: #1450342

Change-Id: I9f2e7d83283504dfe762ac88384efde0f7b52d47
This commit is contained in:
Mehdi Abaakouk 2015-05-01 00:09:18 +02:00
parent b737a92afd
commit 2c3c8a3a55
3 changed files with 22 additions and 28 deletions

View File

@ -466,7 +466,6 @@ class Connection(object):
pools = {} pools = {}
def __init__(self, conf, url, purpose): def __init__(self, conf, url, purpose):
self.consumers = []
self.conf = conf self.conf = conf
self.driver_conf = self.conf.oslo_messaging_rabbit self.driver_conf = self.conf.oslo_messaging_rabbit
self.max_retries = self.driver_conf.rabbit_max_retries self.max_retries = self.driver_conf.rabbit_max_retries
@ -525,7 +524,8 @@ class Connection(object):
self._initial_pid = os.getpid() self._initial_pid = os.getpid()
self.do_consume = True self._consumers = []
self._new_consumers = []
self._consume_loop_stopped = False self._consume_loop_stopped = False
self.channel = None self.channel = None
@ -705,7 +705,7 @@ class Connection(object):
a new channel, we use it the reconfigure our consumers. a new channel, we use it the reconfigure our consumers.
""" """
self._set_current_channel(new_channel) self._set_current_channel(new_channel)
for consumer in self.consumers: for consumer in self._consumers:
consumer.declare(new_channel) consumer.declare(new_channel)
LOG.info(_LI('Reconnected to AMQP server on ' LOG.info(_LI('Reconnected to AMQP server on '
@ -780,7 +780,7 @@ class Connection(object):
except recoverable_errors: except recoverable_errors:
self._set_current_channel(None) self._set_current_channel(None)
self.ensure_connection() self.ensure_connection()
self.consumers = [] self._consumers = []
def _heartbeat_supported_and_enabled(self): def _heartbeat_supported_and_enabled(self):
if self.driver_conf.heartbeat_timeout_threshold <= 0: if self.driver_conf.heartbeat_timeout_threshold <= 0:
@ -859,7 +859,8 @@ class Connection(object):
def _declare_consumer(): def _declare_consumer():
consumer.declare(self.channel) consumer.declare(self.channel)
self.consumers.append(consumer) self._consumers.append(consumer)
self._new_consumers.append(consumer)
return consumer return consumer
with self._connection_lock: with self._connection_lock:
@ -877,7 +878,7 @@ class Connection(object):
raise rpc_common.Timeout() raise rpc_common.Timeout()
def _recoverable_error_callback(exc): def _recoverable_error_callback(exc):
self.do_consume = True self._new_consumers = self._consumers
timer.check_return(_raise_timeout, exc) timer.check_return(_raise_timeout, exc)
def _error_callback(exc): def _error_callback(exc):
@ -886,10 +887,11 @@ class Connection(object):
exc) exc)
def _consume(): def _consume():
if self.do_consume: if self._new_consumers:
for tag, consumer in enumerate(self.consumers): for tag, consumer in enumerate(self._consumers):
consumer.consume(tag=tag) if consumer in self._new_consumers:
self.do_consume = False consumer.consume(tag=tag)
self._new_consumers = []
poll_timeout = (self._poll_timeout if timeout is None poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout)) else min(timeout, self._poll_timeout))

View File

@ -179,12 +179,10 @@ class TestRabbitConsume(test_utils.BaseTestCase):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////') 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
deadline = time.time() + 3 deadline = time.time() + 6
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
# FIXME(sileht): the deadline should be 6 seconds, not 3 self.assertRaises(driver_common.Timeout,
# consuming with no consumer have never worked conn.consume, timeout=3)
# https://bugs.launchpad.net/oslo.messaging/+bug/1450342
# conn.consume(timeout=3)
# kombu memory transport doesn't really raise error # kombu memory transport doesn't really raise error
# so just simulate a real driver behavior # so just simulate a real driver behavior
@ -192,10 +190,8 @@ class TestRabbitConsume(test_utils.BaseTestCase):
conn.declare_fanout_consumer("notif.info", lambda msg: True) conn.declare_fanout_consumer("notif.info", lambda msg: True)
with mock.patch('kombu.connection.Connection.drain_events', with mock.patch('kombu.connection.Connection.drain_events',
side_effect=IOError): side_effect=IOError):
try: self.assertRaises(driver_common.Timeout,
conn.consume(timeout=3) conn.consume, timeout=3)
except driver_common.Timeout:
pass
self.assertEqual(0, int(deadline - time.time())) self.assertEqual(0, int(deadline - time.time()))

View File

@ -87,12 +87,10 @@ class TestRabbitConsume(test_utils.BaseTestCase):
def test_consume_timeout(self): def test_consume_timeout(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////') transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
deadline = time.time() + 3 deadline = time.time() + 6
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
# FIXME(sileht): the deadline should be 6 seconds, not 3 self.assertRaises(driver_common.Timeout,
# consuming with no consumer have never worked conn.consume, timeout=3)
# https://bugs.launchpad.net/oslo.messaging/+bug/1450342
# conn.consume(timeout=3)
# kombu memory transport doesn't really raise error # kombu memory transport doesn't really raise error
# so just simulate a real driver behavior # so just simulate a real driver behavior
@ -100,10 +98,8 @@ class TestRabbitConsume(test_utils.BaseTestCase):
conn.declare_fanout_consumer("notif.info", lambda msg: True) conn.declare_fanout_consumer("notif.info", lambda msg: True)
with mock.patch('kombu.connection.Connection.drain_events', with mock.patch('kombu.connection.Connection.drain_events',
side_effect=IOError): side_effect=IOError):
try: self.assertRaises(driver_common.Timeout,
conn.consume(timeout=3) conn.consume, timeout=3)
except driver_common.Timeout:
pass
self.assertEqual(0, int(deadline - time.time())) self.assertEqual(0, int(deadline - time.time()))