From c1c0af206944ee283509fbf49a932ff623d78a0c Mon Sep 17 00:00:00 2001 From: Victor Sergeyev <vsergeyev@mirantis.com> Date: Tue, 12 May 2015 14:25:47 +0300 Subject: [PATCH] Don't create a new channel in RabbitMQ Connection.reset() Current implementation of RabbitMQ driver in in Connection.reset() change the channel to use and create a new channel for it. This happens after the each message send. There no big need to create a new channel each time, so we can cancel all consumer queues, instead of creating a channel in reset(). Test test_connection_reset_always_succeed() removed, because we are not create channel on reset() anymore. Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com> Change-Id: Ie164840e6c055b01525b13aabdb8b9c7f5d1b98b --- oslo_messaging/_drivers/impl_rabbit.py | 8 ++++++-- oslo_messaging/tests/drivers/test_impl_rabbit.py | 12 ------------ 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 8c696cb7e..75041cd8a 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -231,6 +231,9 @@ class Consumer(object): consumer_tag=six.text_type(tag), nowait=self.nowait) + def cancel(self, tag): + self.queue.cancel(six.text_type(tag)) + def _callback(self, message): """Call callback with deserialized message. @@ -689,11 +692,12 @@ class Connection(object): with self._connection_lock: try: - self._set_current_channel(self.connection.channel()) + for tag, consumer in enumerate(self._consumers): + consumer.cancel(tag=tag) except recoverable_errors: self._set_current_channel(None) self.ensure_connection() - self._consumers = [] + self._consumers = [] def _heartbeat_supported_and_enabled(self): if self.driver_conf.heartbeat_timeout_threshold <= 0: diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 1e942a9cf..48ffffca6 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -257,18 +257,6 @@ class TestRabbitConsume(test_utils.BaseTestCase): self.assertEqual(0, int(deadline - time.time())) - def test_connection_reset_always_succeed(self): - transport = oslo_messaging.get_transport(self.conf, - 'kombu+memory:////') - self.addCleanup(transport.cleanup) - channel = mock.Mock() - with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - conn.connection.connection.recoverable_channel_errors = (IOError,) - with mock.patch.object(conn.connection.connection, 'channel', - side_effect=[IOError, IOError, channel]): - conn.connection.reset() - self.assertEqual(channel, conn.connection.channel) - def test_connection_ack_have_disconnected_kombu_connection(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////')