diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 9c44465d0..d35693827 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -294,8 +294,8 @@ class Consumer(object):
             queue_arguments=self.queue_arguments)
 
         try:
-            LOG.trace('ConsumerBase.declare: '
-                      'queue %s', self.queue_name)
+            LOG.debug('[%s] Queue.declare: %s',
+                      conn.connection_id, self.queue_name)
             self.queue.declare()
         except conn.connection.channel_errors as exc:
             # NOTE(jrosenboom): This exception may be triggered by a race
@@ -537,6 +537,10 @@ class Connection(object):
         else:
             self._connection_lock = DummyConnectionLock()
 
+        self.connection_id = str(uuid.uuid4())
+        self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
+                                  os.getpid(),
+                                  self.connection_id)
         self.connection = kombu.connection.Connection(
             self._url, ssl=self._fetch_ssl_params(),
             login_method=self.login_method,
@@ -544,17 +548,21 @@ class Connection(object):
             failover_strategy=self.kombu_failover_strategy,
             transport_options={
                 'confirm_publish': True,
-                'client_properties': {'capabilities': {
-                    'authentication_failure_close': True,
-                    'connection.blocked': True,
-                    'consumer_cancel_notify': True}},
+                'client_properties': {
+                    'capabilities': {
+                        'authentication_failure_close': True,
+                        'connection.blocked': True,
+                        'consumer_cancel_notify': True
+                    },
+                    'connection_name': self.name},
                 'on_blocked': self._on_connection_blocked,
                 'on_unblocked': self._on_connection_unblocked,
             },
         )
 
-        LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s',
-                  self.connection.info())
+        LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
+                  ' %(hostname)s:%(port)s',
+                  self._get_connection_info())
 
         # NOTE(sileht): kombu recommend to run heartbeat_check every
         # seconds, but we use a lock around the kombu connection
@@ -579,9 +587,10 @@ class Connection(object):
         if purpose == rpc_common.PURPOSE_SEND:
             self._heartbeat_start()
 
-        LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
-                  'via [%(transport)s] client',
-                  self.connection.info())
+        LOG.debug('[%(connection_id)s] Connected to AMQP server on '
+                  '%(hostname)s:%(port)s via [%(transport)s] client with'
+                  ' port %(client_port)s.',
+                  self._get_connection_info())
 
         # NOTE(sileht): value chosen according the best practice from kombu
         # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
@@ -697,7 +706,8 @@ class Connection(object):
             retry = None
 
         def on_error(exc, interval):
-            LOG.debug("Received recoverable error from kombu:",
+            LOG.debug("[%s] Received recoverable error from kombu:"
+                      % self.connection_id,
                       exc_info=True)
 
             recoverable_error_callback and recoverable_error_callback(exc)
@@ -707,16 +717,19 @@ class Connection(object):
                         else interval)
 
             info = {'err_str': exc, 'sleep_time': interval}
-            info.update(self.connection.info())
+            info.update(self._get_connection_info())
 
             if 'Socket closed' in six.text_type(exc):
-                LOG.error(_LE('AMQP server %(hostname)s:%(port)s closed'
+                LOG.error(_LE('[%(connection_id)s] AMQP server'
+                              ' %(hostname)s:%(port)s closed'
                               ' the connection. Check login credentials:'
                               ' %(err_str)s'), info)
             else:
-                LOG.error(_LE('AMQP server on %(hostname)s:%(port)s is '
-                              'unreachable: %(err_str)s. Trying again in '
-                              '%(sleep_time)d seconds.'), info)
+                LOG.error(_LE('[%(connection_id)s] AMQP server on '
+                              '%(hostname)s:%(port)s is unreachable: '
+                              '%(err_str)s. Trying again in '
+                              '%(sleep_time)d seconds. Client port: '
+                              '%(client_port)s'), info)
 
             # XXX(nic): when reconnecting to a RabbitMQ cluster
             # with mirrored queues in use, the attempt to release the
@@ -743,9 +756,10 @@ class Connection(object):
             for consumer in self._consumers:
                 consumer.declare(self)
 
-            LOG.info(_LI('Reconnected to AMQP server on '
-                         '%(hostname)s:%(port)s via [%(transport)s] client'),
-                     self.connection.info())
+            LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
+                         '%(hostname)s:%(port)s via [%(transport)s] client'
+                         'with port %(client_port)s.'),
+                     self._get_connection_info())
 
         def execute_method(channel):
             self._set_current_channel(channel)
@@ -885,7 +899,8 @@ class Connection(object):
             sock = self.channel.connection.sock
         except AttributeError as e:
             # Level is set to debug because otherwise we would spam the logs
-            LOG.debug('Failed to get socket attribute: %s' % str(e))
+            LOG.debug('[%s] Failed to get socket attribute: %s'
+                      % (self.connection_id, str(e)))
         else:
             sock.settimeout(timeout)
             # TCP_USER_TIMEOUT is not defined on Windows and Mac OS X
@@ -1141,6 +1156,15 @@ class Connection(object):
         with self._connection_lock:
             self.ensure(method, retry=retry, error_callback=_error_callback)
 
+    def _get_connection_info(self):
+        info = self.connection.info()
+        client_port = None
+        if self.channel and hasattr(self.channel.connection, 'sock'):
+            client_port = self.channel.connection.sock.getsockname()[1]
+        info.update({'client_port': client_port,
+                     'connection_id': self.connection_id})
+        return info
+
     def _publish(self, exchange, msg, routing_key=None, timeout=None):
         """Publish a message."""
 
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index ac8e9cbb1..4717ad7fd 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -189,16 +189,16 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
                                                  'kombu+memory:////')
         self.addCleanup(transport.cleanup)
 
-        transport._driver._get_connection()
+        connection = transport._driver._get_connection()
         connection_klass.assert_called_once_with(
             'memory:///', transport_options={
                 'client_properties': {
                     'capabilities': {
                         'connection.blocked': True,
                         'consumer_cancel_notify': True,
-                        'authentication_failure_close': True
-                    }
-                },
+                        'authentication_failure_close': True,
+                    },
+                    'connection_name': connection.name},
                 'confirm_publish': True,
                 'on_blocked': mock.ANY,
                 'on_unblocked': mock.ANY},