From 415db68b67368d7c8aa550e7108122200816e665 Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com> Date: Tue, 5 May 2015 10:29:22 +0200 Subject: [PATCH] rabbit: redeclare consumers when ack/requeue fail In case the acknowledgement or requeue of a message fail, the kombu transport can be disconnected In this case, we must redeclare our consumers. This changes fixes that. This have no tests because the kombu memory transport we use in our tests cannot be in disconnected state. Closes-bug: #1448650 Change-Id: I5991a4cf827411bc27c857561d97461212a17f40 --- oslo_messaging/_drivers/impl_rabbit.py | 7 +++++++ oslo_messaging/tests/drivers/test_impl_rabbit.py | 15 +++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 08cb20517..2d4f7558b 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -885,6 +885,13 @@ class Connection(object): exc) def _consume(): + # NOTE(sileht): in case the acknowledgement or requeue of a + # message fail, the kombu transport can be disconnected + # In this case, we must redeclare our consumers, so raise + # a recoverable error to trigger the reconnection code. + if not self.connection.connected: + raise self.connection.recoverable_connection_errors[0] + if self._new_consumers: for tag, consumer in enumerate(self._consumers): if consumer in self._new_consumers: diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index c346019ee..945c5c018 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -208,6 +208,21 @@ class TestRabbitConsume(test_utils.BaseTestCase): conn.reset() self.assertEqual(channel, conn.channel) + def test_connection_ack_have_disconnected_kombu_connection(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN + ).connection + channel = conn.channel + with mock.patch('kombu.connection.Connection.connected', + new_callable=mock.PropertyMock, + return_value=False): + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=0.01) + # Ensure a new channel have been setuped + self.assertNotEqual(channel, conn.channel) + class TestRabbitTransportURL(test_utils.BaseTestCase):