Merge "rabbit: redeclare consumers when ack/requeue fail"
This commit is contained in:
commit
cab3f20c36
oslo_messaging
@ -885,6 +885,13 @@ class Connection(object):
|
||||
exc)
|
||||
|
||||
def _consume():
|
||||
# NOTE(sileht): in case the acknowledgement or requeue of a
|
||||
# message fail, the kombu transport can be disconnected
|
||||
# In this case, we must redeclare our consumers, so raise
|
||||
# a recoverable error to trigger the reconnection code.
|
||||
if not self.connection.connected:
|
||||
raise self.connection.recoverable_connection_errors[0]
|
||||
|
||||
if self._new_consumers:
|
||||
for tag, consumer in enumerate(self._consumers):
|
||||
if consumer in self._new_consumers:
|
||||
|
@ -208,6 +208,21 @@ class TestRabbitConsume(test_utils.BaseTestCase):
|
||||
conn.reset()
|
||||
self.assertEqual(channel, conn.channel)
|
||||
|
||||
def test_connection_ack_have_disconnected_kombu_connection(self):
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
self.addCleanup(transport.cleanup)
|
||||
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
|
||||
).connection
|
||||
channel = conn.channel
|
||||
with mock.patch('kombu.connection.Connection.connected',
|
||||
new_callable=mock.PropertyMock,
|
||||
return_value=False):
|
||||
self.assertRaises(driver_common.Timeout,
|
||||
conn.consume, timeout=0.01)
|
||||
# Ensure a new channel have been setuped
|
||||
self.assertNotEqual(channel, conn.channel)
|
||||
|
||||
|
||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user