Merge "Added trace logging for debuggability"

This commit is contained in:
Jenkins 2015-08-22 01:47:50 +00:00 committed by Gerrit Code Review
commit d0d08bd406

@ -15,7 +15,6 @@
import collections import collections
import contextlib import contextlib
import functools import functools
import logging
import os import os
import socket import socket
import ssl import ssl
@ -28,6 +27,7 @@ import kombu.connection
import kombu.entity import kombu.entity
import kombu.messaging import kombu.messaging
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import netutils from oslo_utils import netutils
import six import six
from six.moves.urllib import parse from six.moves.urllib import parse
@ -175,12 +175,15 @@ class RabbitMessage(dict):
def __init__(self, raw_message): def __init__(self, raw_message):
super(RabbitMessage, self).__init__( super(RabbitMessage, self).__init__(
rpc_common.deserialize_msg(raw_message.payload)) rpc_common.deserialize_msg(raw_message.payload))
LOG.trace('RabbitMessage.Init: message %s', self)
self._raw_message = raw_message self._raw_message = raw_message
def acknowledge(self): def acknowledge(self):
LOG.trace('RabbitMessage.acknowledge: message %s', self)
self._raw_message.ack() self._raw_message.ack()
def requeue(self): def requeue(self):
LOG.trace('RabbitMessage.requeue: message %s', self)
self._raw_message.requeue() self._raw_message.requeue()
@ -221,6 +224,8 @@ class Consumer(object):
queue_arguments=self.queue_arguments) queue_arguments=self.queue_arguments)
try: try:
LOG.trace('ConsumerBase.declare: '
'queue %s', self.queue_name)
self.queue.declare() self.queue.declare()
except conn.connection.channel_errors as exc: except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race # NOTE(jrosenboom): This exception may be triggered by a race
@ -245,6 +250,7 @@ class Consumer(object):
nowait=self.nowait) nowait=self.nowait)
def cancel(self, tag): def cancel(self, tag):
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
self.queue.cancel(six.text_type(tag)) self.queue.cancel(six.text_type(tag))
def _callback(self, message): def _callback(self, message):
@ -637,6 +643,8 @@ class Connection(object):
# should sufficient, because the underlying kombu transport # should sufficient, because the underlying kombu transport
# connection object freed. # connection object freed.
if self.kombu_reconnect_delay > 0: if self.kombu_reconnect_delay > 0:
LOG.trace('Delaying reconnect for %1.1f seconds ...',
self.kombu_reconnect_delay)
time.sleep(self.kombu_reconnect_delay) time.sleep(self.kombu_reconnect_delay)
def on_reconnection(new_channel): def on_reconnection(new_channel):
@ -982,6 +990,11 @@ class Connection(object):
# disconnect us, so raise timeout earlier ourself # disconnect us, so raise timeout earlier ourself
transport_timeout = heartbeat_timeout transport_timeout = heartbeat_timeout
log_info = {'msg': msg,
'who': exchange or 'default',
'key': routing_key}
LOG.trace('Connection._publish: sending message %(msg)s to'
' %(who)s with routing key %(key)s', log_info)
with self._transport_socket_timeout(transport_timeout): with self._transport_socket_timeout(transport_timeout):
producer.publish(msg, expiration=expiration) producer.publish(msg, expiration=expiration)
@ -1018,6 +1031,10 @@ class Connection(object):
name=routing_key, name=routing_key,
routing_key=routing_key, routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) queue_arguments=_get_queue_arguments(self.rabbit_ha_queues))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
'declare queue %(key)s on %(exchange)s exchange', log_info)
queue.declare() queue.declare()
self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)