Merge "Prevent rabbit from raising unexpected exceptions"
This commit is contained in:
commit
6f992dfc7f
oslo_messaging
@ -807,9 +807,11 @@ class Connection(object):
|
||||
ret, channel = autoretry_method()
|
||||
self._set_current_channel(channel)
|
||||
return ret
|
||||
except kombu.exceptions.OperationalError as exc:
|
||||
LOG.debug("Received recoverable error from kombu:",
|
||||
exc_info=True)
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
# NOTE(sileht): we must reraise this without
|
||||
# trigger error_callback
|
||||
raise
|
||||
except Exception as exc:
|
||||
error_callback and error_callback(exc)
|
||||
self._set_current_channel(None)
|
||||
# NOTE(sileht): number of retry exceeded and the connection
|
||||
@ -821,13 +823,6 @@ class Connection(object):
|
||||
'tries: %(err_str)s') % info
|
||||
LOG.error(msg)
|
||||
raise exceptions.MessageDeliveryFailure(msg)
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
# NOTE(sileht): we must reraise this without
|
||||
# trigger error_callback
|
||||
raise
|
||||
except Exception as exc:
|
||||
error_callback and error_callback(exc)
|
||||
raise
|
||||
|
||||
def _set_current_channel(self, new_channel):
|
||||
"""Change the channel to use.
|
||||
|
@ -30,6 +30,7 @@ import oslo_messaging
|
||||
from oslo_messaging._drivers import amqpdriver
|
||||
from oslo_messaging._drivers import common as driver_common
|
||||
from oslo_messaging._drivers import impl_rabbit as rabbit_driver
|
||||
from oslo_messaging.exceptions import MessageDeliveryFailure
|
||||
from oslo_messaging.tests import utils as test_utils
|
||||
from six.moves import mock
|
||||
|
||||
@ -285,6 +286,20 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
|
||||
try_send(e_active)
|
||||
self.assertIn('foobar', conn._declared_exchanges)
|
||||
|
||||
def test_send_exception_remap(self):
|
||||
bad_exc = Exception("Non-oslo.messaging exception")
|
||||
transport = oslo_messaging.get_transport(self.conf,
|
||||
'kombu+memory:////')
|
||||
exchange_mock = mock.Mock()
|
||||
with transport._driver._get_connection(
|
||||
driver_common.PURPOSE_SEND) as pool_conn:
|
||||
conn = pool_conn.connection
|
||||
with mock.patch('kombu.messaging.Producer.publish',
|
||||
side_effect=bad_exc):
|
||||
self.assertRaises(MessageDeliveryFailure,
|
||||
conn._ensure_publishing,
|
||||
conn._publish, exchange_mock, 'msg')
|
||||
|
||||
|
||||
class TestRabbitConsume(test_utils.BaseTestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user