diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py index 0d3dc5194..0b8cafaa6 100644 --- a/oslo_messaging/_drivers/amqp.py +++ b/oslo_messaging/_drivers/amqp.py @@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection): if self.pooled: # Reset the connection so it's ready for the next caller # to grab from the pool - self.connection.reset() - self.connection_pool.put(self.connection) + try: + self.connection.reset() + except Exception: + LOG.exception("Fail to reset the connection, drop it") + try: + self.connection.close() + except Exception: + pass + else: + self.connection_pool.put(self.connection) else: try: self.connection.close() diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 15273a693..b24e67a01 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -884,8 +884,15 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" + recoverable_errors = (self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + with self._connection_lock: - self._set_current_channel(self.connection.channel()) + try: + self._set_current_channel(self.connection.channel()) + except recoverable_errors: + self._set_current_channel(None) + self.ensure_connection() self.consumers = [] self.consumer_num = itertools.count(1) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 883012542..3f3145f29 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) + def test_connection_reset_always_succeed(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + channel = mock.Mock() + conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN + ).connection + conn.connection.recoverable_channel_errors = (IOError,) + with mock.patch.object(conn.connection, 'channel', + side_effect=[IOError, IOError, channel]): + conn.reset() + self.assertEqual(channel, conn.channel) + class TestRabbitTransportURL(test_utils.BaseTestCase):