diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index eb6d11ab6..352fb534e 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -335,12 +335,15 @@ class Consumer(object): # bugs.launchpad.net/oslo.messaging/+bug/1609766 # bugs.launchpad.net/neutron/+bug/1318721 + # 406 error code relates to messages that are doubled ack'd + # At any channel error, the RabbitMQ closes # the channel, but the amqp-lib quietly re-open # it. So, we must reset all tags and declare # all consumers again. conn._new_tags = set(conn._consumers.values()) - if exc.code == 404: + if exc.code == 404 or (exc.code == 406 and + exc.method_name == 'Basic.ack'): self.declare(conn) self.queue.consume(callback=self._callback, consumer_tag=six.text_type(tag), @@ -593,6 +596,24 @@ class Connection(object): ' %(hostname)s:%(port)s', self._get_connection_info()) + # FIXME(gordc): wrapper to catch both kombu v3 and v4 errors + # remove this and only catch OperationalError when >4.0.0 + if hasattr(kombu.exceptions, 'OperationalError'): + self.recoverable_errors = kombu.exceptions.OperationalError + else: + # NOTE(sileht): Some dummy driver like the in-memory one doesn't + # have notion of recoverable connection, so we must raise the + # original exception like kombu does in this case. + has_modern_errors = hasattr( + self.connection.transport, 'recoverable_connection_errors', + ) + if has_modern_errors: + self.recoverable_errors = ( + self.connection.recoverable_channel_errors + + self.connection.recoverable_connection_errors) + else: + self.recoverable_errors = () + # NOTE(sileht): kombu recommend to run heartbeat_check every # seconds, but we use a lock around the kombu connection # so, to not lock to much this lock to most of the time do nothing @@ -707,7 +728,7 @@ class Connection(object): # NOTE(sileht): we reset the channel and ensure # the kombu underlying connection works self._set_current_channel(None) - self.ensure(method=lambda: self.connection.connection) + self.ensure(method=self.connection.connect) self.set_transport_socket_timeout() def ensure(self, method, retry=None, @@ -792,19 +813,6 @@ class Connection(object): self._set_current_channel(channel) method() - # NOTE(sileht): Some dummy driver like the in-memory one doesn't - # have notion of recoverable connection, so we must raise the original - # exception like kombu does in this case. - has_modern_errors = hasattr( - self.connection.transport, 'recoverable_connection_errors', - ) - if has_modern_errors: - recoverable_errors = ( - self.connection.recoverable_channel_errors + - self.connection.recoverable_connection_errors) - else: - recoverable_errors = () - try: autoretry_method = self.connection.autoretry( execute_method, channel=self.channel, @@ -817,7 +825,7 @@ class Connection(object): ret, channel = autoretry_method() self._set_current_channel(channel) return ret - except recoverable_errors as exc: + except self.recoverable_errors as exc: LOG.debug("Received recoverable error from kombu:", exc_info=True) error_callback and error_callback(exc) @@ -883,13 +891,11 @@ class Connection(object): def reset(self): """Reset a connection so it can be used again.""" - recoverable_errors = (self.connection.recoverable_channel_errors + - self.connection.recoverable_connection_errors) with self._connection_lock: try: for consumer, tag in self._consumers.items(): consumer.cancel(tag=tag) - except recoverable_errors: + except self.recoverable_errors: self.ensure_connection() self._consumers.clear() self._active_tags.clear() @@ -987,10 +993,6 @@ class Connection(object): while not self._heartbeat_exit_event.is_set(): with self._connection_lock.for_heartbeat(): - recoverable_errors = ( - self.connection.recoverable_channel_errors + - self.connection.recoverable_connection_errors) - try: try: self._heartbeat_check() @@ -1004,7 +1006,7 @@ class Connection(object): self.connection.drain_events(timeout=0.001) except socket.timeout: pass - except recoverable_errors as exc: + except self.recoverable_errors as exc: LOG.info(_LI("A recoverable connection/channel error " "occurred, trying to reconnect: %s"), exc) self.ensure_connection() @@ -1091,6 +1093,12 @@ class Connection(object): except socket.timeout as exc: poll_timeout = timer.check_return( _raise_timeout, exc, maximum=self._poll_timeout) + except self.connection.channel_errors as exc: + if exc.code == 406 and exc.method_name == 'Basic.ack': + # NOTE(gordc): occasionally multiple workers will grab + # same message and acknowledge it. if it happens, meh. + raise self.connection.recoverable_channel_errors[0] + raise with self._connection_lock: self.ensure(_consume, @@ -1172,7 +1180,8 @@ class Connection(object): def _get_connection_info(self): info = self.connection.info() client_port = None - if self.channel and hasattr(self.channel.connection, 'sock'): + if (self.channel and hasattr(self.channel.connection, 'sock') + and self.channel.connection.sock): client_port = self.channel.connection.sock.getsockname()[1] info.update({'client_port': client_port, 'connection_id': self.connection_id}) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index f3ddef60a..6ab452e36 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -24,9 +24,7 @@ import kombu import kombu.transport.memory from oslo_config import cfg from oslo_serialization import jsonutils -from oslo_utils import versionutils from oslotest import mockpatch -import pkg_resources import testscenarios import oslo_messaging @@ -106,7 +104,7 @@ class TestHeartbeat(test_utils.BaseTestCase): def test_test_heartbeat_sent_connection_fail(self): self._do_test_heartbeat_sent( - heartbeat_side_effect=kombu.exceptions.ConnectionError, + heartbeat_side_effect=kombu.exceptions.OperationalError, info='A recoverable connection/channel error occurred, ' 'trying to reconnect: %s') @@ -219,23 +217,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase): conn._publish(exchange_mock, 'msg', routing_key='routing_key', timeout=1) - # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since - # version 3.0.25, so do conversion according to kombu version. - # TODO(gcb) remove this workaround when all supported branches - # with requirement kombu >=3.0.25 - kombu_version = pkg_resources.get_distribution('kombu').version - if versionutils.is_compatible('3.0.25', kombu_version): - fake_publish.assert_called_with( - 'msg', expiration=1, - exchange=exchange_mock, - compression=self.conf.oslo_messaging_rabbit.kombu_compression, - routing_key='routing_key') - else: - fake_publish.assert_called_with( - 'msg', expiration=1000, - exchange=exchange_mock, - compression=self.conf.oslo_messaging_rabbit.kombu_compression, - routing_key='routing_key') + fake_publish.assert_called_with( + 'msg', expiration=1, + exchange=exchange_mock, + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + routing_key='routing_key') @mock.patch('kombu.messaging.Producer.publish') def test_send_no_timeout(self, fake_publish): @@ -279,7 +265,8 @@ class TestRabbitPublisher(test_utils.BaseTestCase): with mock.patch('kombu.transport.virtual.Channel.close'): # Ensure the exchange does not exists - self.assertRaises(exc, try_send, e_passive) + self.assertRaises(oslo_messaging.MessageDeliveryFailure, + try_send, e_passive) # Create it try_send(e_active) # Ensure it creates it @@ -287,12 +274,14 @@ class TestRabbitPublisher(test_utils.BaseTestCase): with mock.patch('kombu.messaging.Producer.publish', side_effect=exc): - # Ensure the exchange is already in cache - self.assertIn('foobar', conn._declared_exchanges) - # Reset connection - self.assertRaises(exc, try_send, e_passive) - # Ensure the cache is empty - self.assertEqual(0, len(conn._declared_exchanges)) + with mock.patch('kombu.transport.virtual.Channel.close'): + # Ensure the exchange is already in cache + self.assertIn('foobar', conn._declared_exchanges) + # Reset connection + self.assertRaises(oslo_messaging.MessageDeliveryFailure, + try_send, e_passive) + # Ensure the cache is empty + self.assertEqual(0, len(conn._declared_exchanges)) try_send(e_active) self.assertIn('foobar', conn._declared_exchanges) @@ -336,7 +325,7 @@ class TestRabbitConsume(test_utils.BaseTestCase): conn.connection.connection.recoverable_connection_errors = () conn.connection.connection.recoverable_channel_errors = () self.assertEqual(1, declare.call_count) - conn.connection.connection.transport.drain_events = mock.Mock() + conn.connection.connection.drain_events = mock.Mock() # Ensure that a queue will be re-declared if the consume method # of kombu.Queue raise amqp.NotFound conn.consume() @@ -360,7 +349,7 @@ class TestRabbitConsume(test_utils.BaseTestCase): IOError,) conn.connection.connection.recoverable_channel_errors = () self.assertEqual(1, declare.call_count) - conn.connection.connection.transport.drain_events = mock.Mock() + conn.connection.connection.drain_events = mock.Mock() # Ensure that a queue will be re-declared after # 'queue not found' exception despite on connection error. conn.consume() @@ -963,10 +952,6 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): heartbeat_timeout_threshold=0, group="oslo_messaging_rabbit") - self.kombu_connect = mock.Mock() - self.useFixture(mockpatch.Patch( - 'kombu.connection.Connection.connect', - side_effect=self.kombu_connect)) self.useFixture(mockpatch.Patch( 'kombu.connection.Connection.connection')) self.useFixture(mockpatch.Patch( @@ -976,6 +961,10 @@ class RpcKombuHATestCase(test_utils.BaseTestCase): url = oslo_messaging.TransportURL.parse(self.conf, None) self.connection = rabbit_driver.Connection(self.conf, url, driver_common.PURPOSE_SEND) + self.kombu_connect = mock.Mock() + self.useFixture(mockpatch.Patch( + 'kombu.connection.Connection.connect', + side_effect=self.kombu_connect)) self.addCleanup(self.connection.close) def test_ensure_four_retry(self): diff --git a/requirements.txt b/requirements.txt index 7ef1b538c..b1a992e1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,8 +27,9 @@ PyYAML>=3.10.0 # MIT # rabbit driver is the default # we set the amqp version to ensure heartbeat works -amqp<2.0,>=1.4.0 # LGPL -kombu<4.0.0,>=3.0.25 # BSD +# FIXME(gordc): bump to amqp2 and kombu4 once requirements updated +amqp>=1.4.0 # LGPL +kombu>=3.0.25 # BSD pika>=0.10.0 # BSD pika-pool>=0.1.3 # BSD diff --git a/tools/tox_install.sh b/tools/tox_install.sh index 97a198da4..48ccf9688 100755 --- a/tools/tox_install.sh +++ b/tools/tox_install.sh @@ -27,5 +27,8 @@ pip install -c$localfile openstack-requirements edit-constraints $localfile -- $CLIENT_NAME pip install -c$localfile -U $* +# NOTE(gordc): temporary override since kombu capped at <4.0.0 +pip install -U 'amqp>=2.0.0' +pip install -U 'kombu>=4.0.0' exit $? diff --git a/tox.ini b/tox.ini index beab5fa6a..5b0365dcb 100644 --- a/tox.ini +++ b/tox.ini @@ -32,12 +32,16 @@ commands = python setup.py build_sphinx setenv = {[testenv]setenv} TRANSPORT_DRIVER=rabbit + amqp>=2.0.0 + kombu>=4.0.0 commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}' [testenv:py35-func-rabbit] setenv = {[testenv]setenv} TRANSPORT_DRIVER=rabbit + amqp>=2.0.0 + kombu>=4.0.0 basepython = python3.5 commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'