diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index d10ac2247..dcb2b7c02 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -201,12 +201,12 @@ class ConsumerBase(object):
             # Simply retrying will solve the error most of the time and
             # should work well enough as a workaround until the race condition
             # itself can be fixed.
-            # TODO(jrosenboom): In order to be able to match the Execption
+            # TODO(jrosenboom): In order to be able to match the Exception
             # more specifically, we have to refactor ConsumerBase to use
             # 'channel_errors' of the kombu connection object that
             # has created the channel.
             # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
-            LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
+            LOG.error(_("Declaring queue failed with (%s), retrying"), e)
             self.queue.declare()
 
     def _callback_handler(self, message, callback):
@@ -750,10 +750,10 @@ class Connection(object):
         return False
 
     def ensure_connection(self):
-        self.ensure(error_callback=None,
-                    method=lambda: True)
+        self.ensure(method=lambda: True)
 
-    def ensure(self, error_callback, method, retry=None,
+    def ensure(self, method, retry=None,
+               recoverable_error_callback=None, error_callback=None,
                timeout_is_error=True):
         """Will retry up to retry number of times.
         retry = None means use the value of rabbit_max_retries
@@ -778,7 +778,10 @@ class Connection(object):
             retry = None
 
         def on_error(exc, interval):
-            error_callback and error_callback(exc)
+            LOG.debug(_("Received recoverable error from kombu:"),
+                      exc_info=True)
+
+            recoverable_error_callback and recoverable_error_callback(exc)
 
             interval = (self.driver_conf.kombu_reconnect_delay + interval
                         if self.driver_conf.kombu_reconnect_delay > 0
@@ -788,13 +791,13 @@ class Connection(object):
             info.update(self.connection.info())
 
             if 'Socket closed' in six.text_type(exc):
-                LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
-                            ' the connection. Check login credentials:'
-                            ' %(err_str)s'), info)
+                LOG.error(_LE('AMQP server %(hostname)s:%(port)d closed'
+                              ' the connection. Check login credentials:'
+                              ' %(err_str)s'), info)
             else:
-                LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
-                            'unreachable: %(err_str)s. Trying again in '
-                            '%(sleep_time)d seconds.'), info)
+                LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
+                              'unreachable: %(err_str)s. Trying again in '
+                              '%(sleep_time)d seconds.'), info)
 
             # XXX(nic): when reconnecting to a RabbitMQ cluster
             # with mirrored queues in use, the attempt to release the
@@ -843,6 +846,9 @@ class Connection(object):
             self._set_current_channel(channel)
             return ret
         except recoverable_errors as exc:
+            LOG.debug(_("Received recoverable error from kombu:"),
+                      exc_info=True)
+            error_callback and error_callback(exc)
             self._set_current_channel(None)
             # NOTE(sileht): number of retry exceeded and the connection
             # is still broken
@@ -855,6 +861,9 @@ class Connection(object):
                         'retry': retry}
             LOG.error(msg)
             raise exceptions.MessageDeliveryFailure(msg)
+        except Exception as exc:
+            error_callback and error_callback(exc)
+            raise
 
     def _set_current_channel(self, new_channel):
         """Change the channel to use.
@@ -960,7 +969,8 @@ class Connection(object):
             return consumer
 
         with self._connection_lock:
-            return self.ensure(_connect_error, _declare_consumer)
+            return self.ensure(_declare_consumer,
+                               error_callback=_connect_error)
 
     def iterconsume(self, limit=None, timeout=None):
         """Return an iterator that will consume from all queues/consumers.
@@ -975,9 +985,12 @@ class Connection(object):
             LOG.debug('Timed out waiting for RPC response: %s', exc)
             raise rpc_common.Timeout()
 
-        def _error_callback(exc):
+        def _recoverable_error_callback(exc):
             self.do_consume = True
             timer.check_return(_raise_timeout, exc)
+
+        def _error_callback(exc):
+            _recoverable_error_callback(exc)
             LOG.exception(_('Failed to consume message from queue: %s'),
                           exc)
 
@@ -1009,16 +1022,28 @@ class Connection(object):
         for iteration in itertools.count(0):
             if limit and iteration >= limit:
                 raise StopIteration
-            yield self.ensure(_error_callback, _consume)
+            yield self.ensure(
+                _consume,
+                recoverable_error_callback=_recoverable_error_callback,
+                error_callback=_error_callback)
+
+    @staticmethod
+    def _log_publisher_send_error(topic, exc):
+        log_info = {'topic': topic, 'err_str': exc}
+        LOG.exception(_("Failed to publish message to topic "
+                        "'%(topic)s': %(err_str)s"), log_info)
+
+    default_marker = object()
 
     def publisher_send(self, cls, topic, msg, timeout=None, retry=None,
-                       **kwargs):
+                       error_callback=default_marker, **kwargs):
         """Send to a publisher based on the publisher class."""
 
-        def _error_callback(exc):
-            log_info = {'topic': topic, 'err_str': exc}
-            LOG.exception(_("Failed to publish message to topic "
-                          "'%(topic)s': %(err_str)s"), log_info)
+        def _default_error_callback(exc):
+            self._log_publisher_send_error(topic, exc)
+
+        if error_callback is self.default_marker:
+            error_callback = _default_error_callback
 
         def _publish():
             publisher = cls(self.driver_conf, self.channel, topic=topic,
@@ -1026,7 +1051,7 @@ class Connection(object):
             publisher.send(msg, timeout)
 
         with self._connection_lock:
-            self.ensure(_error_callback, _publish, retry=retry)
+            self.ensure(_publish, retry=retry, error_callback=error_callback)
 
     def declare_direct_consumer(self, topic, callback):
         """Create a 'direct' queue.
@@ -1058,7 +1083,9 @@ class Connection(object):
 
         while True:
             try:
-                self.publisher_send(DirectPublisher, msg_id, msg)
+                self.publisher_send(DirectPublisher, msg_id, msg,
+                                    error_callback=None)
+                return
             except self.connection.channel_errors as exc:
                 # NOTE(noelbk/sileht):
                 # If rabbit dies, the consumer can be disconnected before the
@@ -1073,8 +1100,11 @@ class Connection(object):
                                  "exist yet, retrying...") % msg_id)
                     time.sleep(1)
                     continue
+                self._log_publisher_send_error(msg_id, exc)
+                raise
+            except Exception as exc:
+                self._log_publisher_send_error(msg_id, exc)
                 raise
-            return
 
     def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
         """Send a 'topic' message."""
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index b4e691de2..81c2fe6fc 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -781,7 +781,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_four_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(oslo_messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=4)
         self.assertEqual(5, self.kombu_connect.call_count)
         self.assertEqual(6, mock_callback.call_count)
@@ -789,7 +789,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_one_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(oslo_messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=1)
         self.assertEqual(2, self.kombu_connect.call_count)
         self.assertEqual(3, mock_callback.call_count)
@@ -797,7 +797,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_no_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(oslo_messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=0)
         self.assertEqual(1, self.kombu_connect.call_count)
         self.assertEqual(2, mock_callback.call_count)
diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py
index 783afd855..d383babb7 100644
--- a/tests/drivers/test_impl_rabbit.py
+++ b/tests/drivers/test_impl_rabbit.py
@@ -726,7 +726,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_four_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=4)
         self.assertEqual(5, self.kombu_connect.call_count)
         self.assertEqual(6, mock_callback.call_count)
@@ -734,7 +734,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_one_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=1)
         self.assertEqual(2, self.kombu_connect.call_count)
         self.assertEqual(3, mock_callback.call_count)
@@ -742,7 +742,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
     def test_ensure_no_retry(self):
         mock_callback = mock.Mock(side_effect=IOError)
         self.assertRaises(messaging.MessageDeliveryFailure,
-                          self.connection.ensure, None, mock_callback,
+                          self.connection.ensure, mock_callback,
                           retry=0)
         self.assertEqual(1, self.kombu_connect.call_count)
         self.assertEqual(2, mock_callback.call_count)