Merge "cleanup connection pool return"
This commit is contained in:
commit
dabdc26a1a
oslo_messaging
@ -134,8 +134,16 @@ class ConnectionContext(rpc_common.Connection):
|
|||||||
if self.pooled:
|
if self.pooled:
|
||||||
# Reset the connection so it's ready for the next caller
|
# Reset the connection so it's ready for the next caller
|
||||||
# to grab from the pool
|
# to grab from the pool
|
||||||
self.connection.reset()
|
try:
|
||||||
self.connection_pool.put(self.connection)
|
self.connection.reset()
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("Fail to reset the connection, drop it")
|
||||||
|
try:
|
||||||
|
self.connection.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
self.connection_pool.put(self.connection)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
|
@ -884,8 +884,15 @@ class Connection(object):
|
|||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset a connection so it can be used again."""
|
"""Reset a connection so it can be used again."""
|
||||||
|
recoverable_errors = (self.connection.recoverable_channel_errors +
|
||||||
|
self.connection.recoverable_connection_errors)
|
||||||
|
|
||||||
with self._connection_lock:
|
with self._connection_lock:
|
||||||
self._set_current_channel(self.connection.channel())
|
try:
|
||||||
|
self._set_current_channel(self.connection.channel())
|
||||||
|
except recoverable_errors:
|
||||||
|
self._set_current_channel(None)
|
||||||
|
self.ensure_connection()
|
||||||
self.consumers = []
|
self.consumers = []
|
||||||
self.consumer_num = itertools.count(1)
|
self.consumer_num = itertools.count(1)
|
||||||
|
|
||||||
|
@ -183,6 +183,19 @@ class TestRabbitIterconsume(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
self.assertEqual(0, int(deadline - time.time()))
|
self.assertEqual(0, int(deadline - time.time()))
|
||||||
|
|
||||||
|
def test_connection_reset_always_succeed(self):
|
||||||
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
'kombu+memory:////')
|
||||||
|
self.addCleanup(transport.cleanup)
|
||||||
|
channel = mock.Mock()
|
||||||
|
conn = transport._driver._get_connection(amqp.PURPOSE_LISTEN
|
||||||
|
).connection
|
||||||
|
conn.connection.recoverable_channel_errors = (IOError,)
|
||||||
|
with mock.patch.object(conn.connection, 'channel',
|
||||||
|
side_effect=[IOError, IOError, channel]):
|
||||||
|
conn.reset()
|
||||||
|
self.assertEqual(channel, conn.channel)
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user