diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index c001ae4c8..35b824fcc 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -807,9 +807,11 @@ class Connection(object): ret, channel = autoretry_method() self._set_current_channel(channel) return ret - except kombu.exceptions.OperationalError as exc: - LOG.debug("Received recoverable error from kombu:", - exc_info=True) + except rpc_amqp.AMQPDestinationNotFound: + # NOTE(sileht): we must reraise this without + # trigger error_callback + raise + except Exception as exc: error_callback and error_callback(exc) self._set_current_channel(None) # NOTE(sileht): number of retry exceeded and the connection @@ -821,13 +823,6 @@ class Connection(object): 'tries: %(err_str)s') % info LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) - except rpc_amqp.AMQPDestinationNotFound: - # NOTE(sileht): we must reraise this without - # trigger error_callback - raise - except Exception as exc: - error_callback and error_callback(exc) - raise def _set_current_channel(self, new_channel): """Change the channel to use. diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 94be527b6..1ba3d6324 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -30,6 +30,7 @@ import oslo_messaging from oslo_messaging._drivers import amqpdriver from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import impl_rabbit as rabbit_driver +from oslo_messaging.exceptions import MessageDeliveryFailure from oslo_messaging.tests import utils as test_utils from six.moves import mock @@ -285,6 +286,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase): try_send(e_active) self.assertIn('foobar', conn._declared_exchanges) + def test_send_exception_remap(self): + bad_exc = Exception("Non-oslo.messaging exception") + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + exchange_mock = mock.Mock() + with transport._driver._get_connection( + driver_common.PURPOSE_SEND) as pool_conn: + conn = pool_conn.connection + with mock.patch('kombu.messaging.Producer.publish', + side_effect=bad_exc): + self.assertRaises(MessageDeliveryFailure, + conn._ensure_publishing, + conn._publish, exchange_mock, 'msg') + class TestRabbitConsume(test_utils.BaseTestCase):