Don't create a new channel in RabbitMQ Connection.reset()
Current implementation of RabbitMQ driver in in Connection.reset() change the channel to use and create a new channel for it. This happens after the each message send. There no big need to create a new channel each time, so we can cancel all consumer queues, instead of creating a channel in reset(). Test test_connection_reset_always_succeed() removed, because we are not create channel on reset() anymore. Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com> Change-Id: Ie164840e6c055b01525b13aabdb8b9c7f5d1b98b
This commit is contained in:
parent
baddce34a2
commit
c1c0af2069
oslo_messaging
@ -231,6 +231,9 @@ class Consumer(object):
|
||||
consumer_tag=six.text_type(tag),
|
||||
nowait=self.nowait)
|
||||
|
||||
def cancel(self, tag):
|
||||
self.queue.cancel(six.text_type(tag))
|
||||
|
||||
def _callback(self, message):
|
||||
"""Call callback with deserialized message.
|
||||
|
||||
@ -689,11 +692,12 @@ class Connection(object):
|
||||
|
||||
with self._connection_lock:
|
||||
try:
|
||||
self._set_current_channel(self.connection.channel())
|
||||
for tag, consumer in enumerate(self._consumers):
|
||||
consumer.cancel(tag=tag)
|
||||
except recoverable_errors:
|
||||
self._set_current_channel(None)
|
||||
self.ensure_connection()
|
||||
self._consumers = []
|
||||
self._consumers = []
|
||||
|
||||
def _heartbeat_supported_and_enabled(self):
|
||||
if self.driver_conf.heartbeat_timeout_threshold <= 0:
|
||||
|
@ -257,18 +257,6 @@ class TestRabbitConsume(test_utils.BaseTestCase):
|
||||
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
|
||||
def test_connection_reset_always_succeed(self):
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
self.addCleanup(transport.cleanup)
|
||||
channel = mock.Mock()
|
||||
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
|
||||
conn.connection.connection.recoverable_channel_errors = (IOError,)
|
||||
with mock.patch.object(conn.connection.connection, 'channel',
|
||||
side_effect=[IOError, IOError, channel]):
|
||||
conn.connection.reset()
|
||||
self.assertEqual(channel, conn.connection.channel)
|
||||
|
||||
def test_connection_ack_have_disconnected_kombu_connection(self):
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
|
Loading…
x
Reference in New Issue
Block a user