From 0dff20b8b979d902d855aa1c0ae0f4b9398a3116 Mon Sep 17 00:00:00 2001
From: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Date: Wed, 18 Mar 2015 08:31:32 +0100
Subject: [PATCH] cleanup connection pool return

This change ensures that connections that fail to return to the pool are
cleanly closed and exception raised are not returned to the caller.

For rabbit, we also try to reconnection in case of connection failure,
before dropping the connection.

Closes-bug: #1433458

Change-Id: Ic714db7b8be9df8b6935a903732c60aaea0bc404
---
 oslo_messaging/_drivers/amqp.py                  | 12 ++++++++++--
 oslo_messaging/_drivers/impl_rabbit.py           |  9 ++++++++-
 oslo_messaging/tests/drivers/test_impl_rabbit.py | 13 +++++++++++++
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 0d3dc5194..0b8cafaa6 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection):
             if self.pooled:
                 # Reset the connection so it's ready for the next caller
                 # to grab from the pool
-                self.connection.reset()
-                self.connection_pool.put(self.connection)
+                try:
+                    self.connection.reset()
+                except Exception:
+                    LOG.exception("Fail to reset the connection, drop it")
+                    try:
+                        self.connection.close()
+                    except Exception:
+                        pass
+                else:
+                    self.connection_pool.put(self.connection)
             else:
                 try:
                     self.connection.close()
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index dcb2b7c02..8f2dc7d22 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -884,8 +884,15 @@ class Connection(object):
 
     def reset(self):
         """Reset a connection so it can be used again."""
+        recoverable_errors = (self.connection.recoverable_channel_errors +
+                              self.connection.recoverable_connection_errors)
+
         with self._connection_lock:
-            self._set_current_channel(self.connection.channel())
+            try:
+                self._set_current_channel(self.connection.channel())
+            except recoverable_errors:
+                self._set_current_channel(None)
+                self.ensure_connection()
         self.consumers = []
         self.consumer_num = itertools.count(1)
 
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index 81c2fe6fc..e59843d52 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
 
         self.assertEqual(0, int(deadline - time.time()))
 
+    def test_connection_reset_always_succeed(self):
+        transport = oslo_messaging.get_transport(self.conf,
+                                                 'kombu+memory:////')
+        self.addCleanup(transport.cleanup)
+        channel = mock.Mock()
+        conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
+                                                 ).connection
+        conn.connection.recoverable_channel_errors = (IOError,)
+        with mock.patch.object(conn.connection, 'channel',
+                               side_effect=[IOError, IOError, channel]):
+            conn.reset()
+            self.assertEqual(channel, conn.channel)
+
 
 class TestRabbitTransportURL(test_utils.BaseTestCase):