rabbit: Improves logging

For all transport errors, we first got a log of a failure with a
backtrace like 'Fail to publish message on topic' or 'fail to consume
messages' and then another message like 'AMQP is unreachable'.

But in most case, we retry to consume/publish messages when the reason
is a connection lost.

So, now we don't log the failure message anymore in case of connection lost,
but just message related to rabbit deconnection/reconnection.
The error message and the backtrace are only logged in case of we really fail to
publish a message.

Change-Id: Ifa1b04b348b347bad0b2abec3e759a2ce7815b86
This commit is contained in:
Mehdi Abaakouk 2015-03-18 07:59:19 +01:00
parent b9e134d7e9
commit 2d1a019427
3 changed files with 59 additions and 29 deletions
oslo_messaging
tests/drivers

@ -201,12 +201,12 @@ class ConsumerBase(object):
# Simply retrying will solve the error most of the time and
# should work well enough as a workaround until the race condition
# itself can be fixed.
# TODO(jrosenboom): In order to be able to match the Execption
# TODO(jrosenboom): In order to be able to match the Exception
# more specifically, we have to refactor ConsumerBase to use
# 'channel_errors' of the kombu connection object that
# has created the channel.
# See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
LOG.error(_("Declaring queue failed with (%s), retrying"), e)
self.queue.declare()
def _callback_handler(self, message, callback):
@ -750,10 +750,10 @@ class Connection(object):
return False
def ensure_connection(self):
self.ensure(error_callback=None,
method=lambda: True)
self.ensure(method=lambda: True)
def ensure(self, error_callback, method, retry=None,
def ensure(self, method, retry=None,
recoverable_error_callback=None, error_callback=None,
timeout_is_error=True):
"""Will retry up to retry number of times.
retry = None means use the value of rabbit_max_retries
@ -778,7 +778,10 @@ class Connection(object):
retry = None
def on_error(exc, interval):
error_callback and error_callback(exc)
LOG.debug(_("Received recoverable error from kombu:"),
exc_info=True)
recoverable_error_callback and recoverable_error_callback(exc)
interval = (self.driver_conf.kombu_reconnect_delay + interval
if self.driver_conf.kombu_reconnect_delay > 0
@ -788,13 +791,13 @@ class Connection(object):
info.update(self.connection.info())
if 'Socket closed' in six.text_type(exc):
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
LOG.error(_LE('AMQP server %(hostname)s:%(port)d closed'
' the connection. Check login credentials:'
' %(err_str)s'), info)
else:
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.'), info)
# XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the
@ -843,6 +846,9 @@ class Connection(object):
self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
LOG.debug(_("Received recoverable error from kombu:"),
exc_info=True)
error_callback and error_callback(exc)
self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
@ -855,6 +861,9 @@ class Connection(object):
'retry': retry}
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
except Exception as exc:
error_callback and error_callback(exc)
raise
def _set_current_channel(self, new_channel):
"""Change the channel to use.
@ -960,7 +969,8 @@ class Connection(object):
return consumer
with self._connection_lock:
return self.ensure(_connect_error, _declare_consumer)
return self.ensure(_declare_consumer,
error_callback=_connect_error)
def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers.
@ -975,9 +985,12 @@ class Connection(object):
LOG.debug('Timed out waiting for RPC response: %s', exc)
raise rpc_common.Timeout()
def _error_callback(exc):
def _recoverable_error_callback(exc):
self.do_consume = True
timer.check_return(_raise_timeout, exc)
def _error_callback(exc):
_recoverable_error_callback(exc)
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
@ -1009,16 +1022,28 @@ class Connection(object):
for iteration in itertools.count(0):
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
yield self.ensure(
_consume,
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
@staticmethod
def _log_publisher_send_error(topic, exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
default_marker = object()
def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
**kwargs):
error_callback=default_marker, **kwargs):
"""Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': exc}
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
def _default_error_callback(exc):
self._log_publisher_send_error(topic, exc)
if error_callback is self.default_marker:
error_callback = _default_error_callback
def _publish():
publisher = cls(self.driver_conf, self.channel, topic=topic,
@ -1026,7 +1051,7 @@ class Connection(object):
publisher.send(msg, timeout)
with self._connection_lock:
self.ensure(_error_callback, _publish, retry=retry)
self.ensure(_publish, retry=retry, error_callback=error_callback)
def declare_direct_consumer(self, topic, callback):
"""Create a 'direct' queue.
@ -1058,7 +1083,9 @@ class Connection(object):
while True:
try:
self.publisher_send(DirectPublisher, msg_id, msg)
self.publisher_send(DirectPublisher, msg_id, msg,
error_callback=None)
return
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
@ -1073,8 +1100,11 @@ class Connection(object):
"exist yet, retrying...") % msg_id)
time.sleep(1)
continue
self._log_publisher_send_error(msg_id, exc)
raise
except Exception as exc:
self._log_publisher_send_error(msg_id, exc)
raise
return
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message."""

@ -781,7 +781,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=4)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
@ -789,7 +789,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=1)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
@ -797,7 +797,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)

@ -726,7 +726,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_four_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=4)
self.assertEqual(5, self.kombu_connect.call_count)
self.assertEqual(6, mock_callback.call_count)
@ -734,7 +734,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_one_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=1)
self.assertEqual(2, self.kombu_connect.call_count)
self.assertEqual(3, mock_callback.call_count)
@ -742,7 +742,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
def test_ensure_no_retry(self):
mock_callback = mock.Mock(side_effect=IOError)
self.assertRaises(messaging.MessageDeliveryFailure,
self.connection.ensure, None, mock_callback,
self.connection.ensure, mock_callback,
retry=0)
self.assertEqual(1, self.kombu_connect.call_count)
self.assertEqual(2, mock_callback.call_count)