cleanup connection pool return
This change ensures that connections that fail to return to the pool are cleanly closed and exception raised are not returned to the caller. For rabbit, we also try to reconnection in case of connection failure, before dropping the connection. Closes-bug: #1433458 Change-Id: Ic714db7b8be9df8b6935a903732c60aaea0bc404
This commit is contained in:
parent
2d1a019427
commit
0dff20b8b9
oslo_messaging
@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection):
|
||||
if self.pooled:
|
||||
# Reset the connection so it's ready for the next caller
|
||||
# to grab from the pool
|
||||
self.connection.reset()
|
||||
self.connection_pool.put(self.connection)
|
||||
try:
|
||||
self.connection.reset()
|
||||
except Exception:
|
||||
LOG.exception("Fail to reset the connection, drop it")
|
||||
try:
|
||||
self.connection.close()
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
self.connection_pool.put(self.connection)
|
||||
else:
|
||||
try:
|
||||
self.connection.close()
|
||||
|
@ -884,8 +884,15 @@ class Connection(object):
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again."""
|
||||
recoverable_errors = (self.connection.recoverable_channel_errors +
|
||||
self.connection.recoverable_connection_errors)
|
||||
|
||||
with self._connection_lock:
|
||||
self._set_current_channel(self.connection.channel())
|
||||
try:
|
||||
self._set_current_channel(self.connection.channel())
|
||||
except recoverable_errors:
|
||||
self._set_current_channel(None)
|
||||
self.ensure_connection()
|
||||
self.consumers = []
|
||||
self.consumer_num = itertools.count(1)
|
||||
|
||||
|
@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
|
||||
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
|
||||
def test_connection_reset_always_succeed(self):
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
self.addCleanup(transport.cleanup)
|
||||
channel = mock.Mock()
|
||||
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
|
||||
).connection
|
||||
conn.connection.recoverable_channel_errors = (IOError,)
|
||||
with mock.patch.object(conn.connection, 'channel',
|
||||
side_effect=[IOError, IOError, channel]):
|
||||
conn.reset()
|
||||
self.assertEqual(channel, conn.channel)
|
||||
|
||||
|
||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user