diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 4a9361ceb..49aba929e 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -293,7 +293,7 @@ class Sender(pyngus.SenderEventHandler):
         if self._link:
             self._link.close()
 
-    def reset(self):
+    def reset(self, reason="Link reset"):
         """Called by the controller on connection failover. Release all link
         resources, abort any in-flight messages, and check the retry limit on
         all pending send requests.
@@ -304,16 +304,16 @@ class Sender(pyngus.SenderEventHandler):
         if self._link:
             self._link.destroy()
             self._link = None
-        self._abort_unacked("Link reset")
-        self._check_retry_limit()
+        self._abort_unacked(reason)
+        self._check_retry_limit(reason)
 
-    def destroy(self):
+    def destroy(self, reason="Link destroyed"):
         """Destroy the sender and all pending messages.  Called on driver
         shutdown.
         """
         LOG.debug("Sender %s destroyed", self._address)
-        self.reset()
-        self._abort_pending("Link destroyed")
+        self.reset(reason)
+        self._abort_pending(reason)
 
     def send_message(self, send_task):
         """Send a message out the link.
@@ -354,21 +354,24 @@ class Sender(pyngus.SenderEventHandler):
         # sender_closed() will be called once the link completes closing
 
     def sender_closed(self, sender_link):
-        self._abort_unacked("Sender closed")
-        if self._connection:
-            # still attached, so attempt to restart the link
-            self._check_retry_limit()
-            self._scheduler.defer(self._reopen_link, self._delay)
+        self._handle_sender_closed()
 
     def sender_failed(self, sender_link, error):
         """Protocol error occurred."""
         LOG.warning(_LW("sender %(addr)s failed error=%(error)s"),
                     {'addr': self._address, 'error': error})
-        self.sender_closed(sender_link)
+        self._handle_sender_closed(str(error))
 
     # end Pyngus callbacks
 
-    def _check_retry_limit(self):
+    def _handle_sender_closed(self, reason="Sender closed"):
+        self._abort_unacked(reason)
+        if self._connection:
+            # still attached, so attempt to restart the link
+            self._check_retry_limit(reason)
+            self._scheduler.defer(self._reopen_link, self._delay)
+
+    def _check_retry_limit(self, reason):
         # Called on recoverable connection or link failure.  Remove any pending
         # sends that have exhausted their retry count:
         expired = set()
@@ -377,7 +380,7 @@ class Sender(pyngus.SenderEventHandler):
                 send_task.retry -= 1
                 if send_task.retry <= 0:
                     expired.add(send_task)
-                    send_task._on_error("Send retries exhausted")
+                    send_task._on_error("Message send failed: %s" % reason)
         while expired:
             self._pending_sends.remove(expired.pop())
 
@@ -813,7 +816,7 @@ class Controller(pyngus.ConnectionEventHandler):
         self._closing = False
         # only schedule one outstanding reconnect attempt at a time
         self._reconnecting = False
-        self._delay = 1  # seconds between retries
+        self._delay = self.conn_retry_interval  # seconds between retries
         # prevent queuing up multiple requests to run _process_tasks()
         self._process_tasks_scheduled = False
         self._process_tasks_lock = threading.Lock()
@@ -843,7 +846,7 @@ class Controller(pyngus.ConnectionEventHandler):
             self.processor.wakeup(self._start_shutdown)
             LOG.debug("Waiting for eventloop to exit")
             self.processor.join(timeout)
-            self._hard_reset()
+            self._hard_reset("Shutting down")
             for sender in itervalues(self._all_senders):
                 sender.destroy()
             self._all_senders.clear()
@@ -1013,7 +1016,7 @@ class Controller(pyngus.ConnectionEventHandler):
     def socket_error(self, error):
         """Called by eventloop when a socket error occurs."""
         LOG.error(_LE("Socket failure: %s"), error)
-        self._handle_connection_loss()
+        self._handle_connection_loss(str(error))
 
     # Pyngus connection event callbacks (and their helpers), all invoked from
     # the eventloop thread:
@@ -1026,7 +1029,7 @@ class Controller(pyngus.ConnectionEventHandler):
             # pyngus bug: ignore failure callback on destroyed connections
             return
         LOG.debug("AMQP Connection failure: %s", error)
-        self._handle_connection_loss()
+        self._handle_connection_loss(str(error))
 
     def connection_active(self, connection):
         """This is a Pyngus callback, invoked by Pyngus when the connection to
@@ -1048,7 +1051,7 @@ class Controller(pyngus.ConnectionEventHandler):
                                   self._reply_link_ready,
                                   self._reply_link_down,
                                   self._reply_credit)
-        self._delay = 1
+        self._delay = self.conn_retry_interval   # reset
         # schedule periodic maintenance of sender links
         self._link_maint_timer = self.processor.defer(self._purge_sender_links,
                                                       self._link_maint_timeout)
@@ -1061,7 +1064,7 @@ class Controller(pyngus.ConnectionEventHandler):
         """
         LOG.debug("AMQP connection closed.")
         # if the driver isn't being shutdown, failover and reconnect
-        self._handle_connection_loss()
+        self._handle_connection_loss("AMQP connection closed.")
 
     def connection_remote_closed(self, connection, reason):
         """This is a Pyngus callback, invoked by Pyngus when the peer has
@@ -1089,13 +1092,14 @@ class Controller(pyngus.ConnectionEventHandler):
                   {'hostname': self.hosts.current.hostname,
                    'port': self.hosts.current.port,
                    'username': self.hosts.current.username})
-        # connection failure will be handled later
+        # pyngus will invoke connection_failed() eventually
 
-    def _handle_connection_loss(self):
+    def _handle_connection_loss(self, reason):
         """The connection to the messaging service has been lost.  Try to
         reestablish the connection/failover if not shutting down the driver.
         """
         self.addresser = None
+        self._socket_connection.close()
         if self._closing:
             # we're in the middle of shutting down the driver anyways,
             # just consider it done:
@@ -1107,31 +1111,33 @@ class Controller(pyngus.ConnectionEventHandler):
                 self._reconnecting = True
                 LOG.info(_LI("delaying reconnect attempt for %d seconds"),
                          self._delay)
-                self.processor.defer(self._do_reconnect, self._delay)
-                self._delay = min(self._delay * 2, 60)
+                self.processor.defer(lambda: self._do_reconnect(reason),
+                                     self._delay)
+                self._delay = min(self._delay * self.conn_retry_backoff,
+                                  self.conn_retry_interval_max)
             if self._link_maint_timer:
                 self._link_maint_timer.cancel()
                 self._link_maint_timer = None
 
-    def _do_reconnect(self):
+    def _do_reconnect(self, reason):
         """Invoked on connection/socket failure, failover and re-connect to the
         messaging service.
         """
         self._reconnecting = False
         if not self._closing:
-            self._hard_reset()
+            self._hard_reset(reason)
             host = self.hosts.next()
             LOG.info(_LI("Reconnecting to: %(hostname)s:%(port)s"),
                      {'hostname': host.hostname, 'port': host.port})
             self._socket_connection.connect(host)
 
-    def _hard_reset(self):
+    def _hard_reset(self, reason):
         """Reset the controller to its pre-connection state"""
         # note well: since this method destroys the connection, it cannot be
         # invoked directly from a pyngus callback.  Use processor.defer() to
         # run this method on the main loop instead.
         for sender in self._purged_senders:
-            sender.destroy()
+            sender.destroy(reason)
         del self._purged_senders[:]
         self._active_senders.clear()
         unused = []
@@ -1140,10 +1146,10 @@ class Controller(pyngus.ConnectionEventHandler):
             if sender.pending_messages == 0:
                 unused.append(key)
             else:
-                sender.reset()
+                sender.reset(reason)
                 self._active_senders.add(key)
         for key in unused:
-            self._all_senders[key].destroy()
+            self._all_senders[key].destroy(reason)
             del self._all_senders[key]
         for servers in itervalues(self._servers):
             for server in itervalues(servers):
@@ -1170,7 +1176,7 @@ class Controller(pyngus.ConnectionEventHandler):
         if not self._closing:
             # destroy links that have already been closed
             for sender in self._purged_senders:
-                sender.destroy()
+                sender.destroy("Idle link purged")
             del self._purged_senders[:]
 
             # determine next set to purge
diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
index 0f3b5da02..c2e16fbe0 100644
--- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py
+++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
@@ -69,31 +69,27 @@ class _SocketConnection(object):
 
     def read_socket(self):
         """Called to read from the socket."""
-        while True:
+        if self.socket:
             try:
-                rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
+                pyngus.read_socket_input(self.pyngus_conn, self.socket)
                 self.pyngus_conn.process(now())
-                return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
                 self.pyngus_conn.close_input()
                 self.pyngus_conn.close_output()
                 self._handler.socket_error(str(e))
-                return pyngus.Connection.EOS
 
     def write_socket(self):
         """Called to write to the socket."""
-        while True:
+        if self.socket:
             try:
-                rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
+                pyngus.write_socket_output(self.pyngus_conn, self.socket)
                 self.pyngus_conn.process(now())
-                return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
                 self.pyngus_conn.close_output()
                 self.pyngus_conn.close_input()
                 self._handler.socket_error(str(e))
-                return pyngus.Connection.EOS
 
     def connect(self, host):
         """Connect to host and start the AMQP protocol."""
@@ -358,7 +354,7 @@ class Thread(threading.Thread):
             deadline = self._scheduler._next_deadline
 
             pyngus_conn = self._connection and self._connection.pyngus_conn
-            if pyngus_conn:
+            if pyngus_conn and self._connection.socket:
                 if pyngus_conn.needs_input:
                     readfds.append(self._connection)
                 if pyngus_conn.has_output:
@@ -388,13 +384,12 @@ class Thread(threading.Thread):
             # Testing shows that polling improves latency over checking the
             # lists returned by select()
             self._requests.process_requests()
-            if pyngus_conn:
-                self._connection.read_socket()
-                if pyngus_conn.deadline:
-                    _now = now()
-                    if pyngus_conn.deadline <= _now:
-                        pyngus_conn.process(_now)
-                self._connection.write_socket()
+            self._connection.read_socket()
+            if pyngus_conn and pyngus_conn.deadline:
+                _now = now()
+                if pyngus_conn.deadline <= _now:
+                    pyngus_conn.process(_now)
+            self._connection.write_socket()
 
             self._scheduler._process()  # run any deferred requests
 
diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py
index dd1f25bac..12db37a60 100644
--- a/oslo_messaging/tests/drivers/test_amqp_driver.py
+++ b/oslo_messaging/tests/drivers/test_amqp_driver.py
@@ -674,12 +674,12 @@ class TestAuthentication(test_utils.BaseTestCase):
         target = oslo_messaging.Target(topic="test-topic")
         _ListenerThread(
             driver.listen(target, None, None)._poll_style_listener, 1)
-        self.assertRaises(oslo_messaging.MessagingTimeout,
+        self.assertRaises(oslo_messaging.MessageDeliveryFailure,
                           driver.send,
                           target, {"context": True},
                           {"method": "echo"},
                           wait_for_reply=True,
-                          timeout=2.0)
+                          retry=2)
         driver.cleanup()
 
 
@@ -771,7 +771,6 @@ mech_list: ${mechs}
         """Verify that a bad password given in TransportHost is
         rejected by the broker.
         """
-
         addr = "amqp://joe:badpass@%s:%d" % (self._broker.host,
                                              self._broker.port)
         url = oslo_messaging.TransportURL.parse(self.conf, addr)
@@ -779,12 +778,15 @@ mech_list: ${mechs}
         target = oslo_messaging.Target(topic="test-topic")
         _ListenerThread(
             driver.listen(target, None, None)._poll_style_listener, 1)
-        self.assertRaises(oslo_messaging.MessagingTimeout,
-                          driver.send,
-                          target, {"context": True},
-                          {"method": "echo"},
-                          wait_for_reply=True,
-                          timeout=2.0)
+        try:
+            driver.send(target, {"context": True}, {"method": "echo"},
+                        wait_for_reply=True, retry=2)
+        except oslo_messaging.MessageDeliveryFailure as e:
+            # verify the exception indicates the failure was an authentication
+            # error
+            self.assertTrue('amqp:unauthorized-access' in str(e))
+        else:
+            self.assertIsNone("Expected authentication failure")
         driver.cleanup()
 
     def test_authentication_bad_mechs(self):
@@ -800,12 +802,12 @@ mech_list: ${mechs}
         target = oslo_messaging.Target(topic="test-topic")
         _ListenerThread(
             driver.listen(target, None, None)._poll_style_listener, 1)
-        self.assertRaises(oslo_messaging.MessagingTimeout,
+        self.assertRaises(oslo_messaging.MessageDeliveryFailure,
                           driver.send,
                           target, {"context": True},
                           {"method": "echo"},
                           wait_for_reply=True,
-                          timeout=2.0)
+                          retry=0)
         driver.cleanup()
 
     def test_authentication_default_username(self):