diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 007ca3a88..4a9361ceb 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler):
                             self.link_retry_delay, send_task.service)
             self._all_senders[key] = sender
             if self.reply_link and self.reply_link.active:
-                sender.attach(self._socket_connection.connection,
+                sender.attach(self._socket_connection.pyngus_conn,
                               self.reply_link, self.addresser)
         self._active_senders.add(key)
         sender.send_message(send_task)
@@ -901,7 +901,7 @@ class Controller(pyngus.ConnectionEventHandler):
             self._servers[key] = servers
         servers[subscribe_task._subscriber_id] = server
         if self._active:
-            server.attach(self._socket_connection.connection,
+            server.attach(self._socket_connection.pyngus_conn,
                           self.addresser)
 
     # commands executed on the processor (eventloop) via 'wakeup()':
@@ -980,7 +980,7 @@ class Controller(pyngus.ConnectionEventHandler):
             self._detach_senders()
             self._detach_servers()
             self.reply_link.detach()
-            self._socket_connection.connection.close()
+            self._socket_connection.pyngus_conn.close()
         else:
             # don't wait for a close from the remote, may never happen
             self.processor.shutdown()
@@ -996,7 +996,7 @@ class Controller(pyngus.ConnectionEventHandler):
                  {'hostname': self.hosts.current.hostname,
                   'port': self.hosts.current.port})
         for sender in itervalues(self._all_senders):
-            sender.attach(self._socket_connection.connection,
+            sender.attach(self._socket_connection.pyngus_conn,
                           self.reply_link, self.addresser)
 
     def _reply_link_down(self):
@@ -1005,7 +1005,7 @@ class Controller(pyngus.ConnectionEventHandler):
         if not self._closing:
             self._detach_senders()
             self._detach_servers()
-            self._socket_connection.connection.close()
+            self._socket_connection.pyngus_conn.close()
             # once closed, _handle_connection_loss() will initiate reconnect
 
     # callback from eventloop on socket error
@@ -1022,7 +1022,7 @@ class Controller(pyngus.ConnectionEventHandler):
         """This is a Pyngus callback, invoked by Pyngus when a non-recoverable
         error occurs on the connection.
         """
-        if connection is not self._socket_connection.connection:
+        if connection is not self._socket_connection.pyngus_conn:
             # pyngus bug: ignore failure callback on destroyed connections
             return
         LOG.debug("AMQP Connection failure: %s", error)
@@ -1042,9 +1042,9 @@ class Controller(pyngus.ConnectionEventHandler):
         self.addresser = self.addresser_factory(props)
         for servers in itervalues(self._servers):
             for server in itervalues(servers):
-                server.attach(self._socket_connection.connection,
+                server.attach(self._socket_connection.pyngus_conn,
                               self.addresser)
-        self.reply_link = Replies(self._socket_connection.connection,
+        self.reply_link = Replies(self._socket_connection.pyngus_conn,
                                   self._reply_link_ready,
                                   self._reply_link_down,
                                   self._reply_credit)
@@ -1075,7 +1075,7 @@ class Controller(pyngus.ConnectionEventHandler):
         self._detach_senders()
         self._detach_servers()
         self.reply_link.detach()
-        self._socket_connection.connection.close()
+        self._socket_connection.pyngus_conn.close()
 
     def sasl_done(self, connection, pn_sasl, outcome):
         """This is a Pyngus callback invoked when the SASL handshake
@@ -1189,4 +1189,4 @@ class Controller(pyngus.ConnectionEventHandler):
     def _active(self):
         # Is the connection up
         return (self._socket_connection
-                and self._socket_connection.connection.active)
+                and self._socket_connection.pyngus_conn.active)
diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
index 71dce0827..0f3b5da02 100644
--- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py
+++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
@@ -22,6 +22,7 @@ protocol specific intelligence is provided by the Controller and executed on
 the background thread via callables.
 """
 
+import collections
 import errno
 import heapq
 import logging
@@ -34,7 +35,6 @@ import threading
 import uuid
 
 import pyngus
-from six import moves
 
 from oslo_messaging._i18n import _LE, _LI, _LW
 LOG = logging.getLogger(__name__)
@@ -54,44 +54,44 @@ class _SocketConnection(object):
     def __init__(self, name, container, properties, handler):
         self.name = name
         self.socket = None
+        self.pyngus_conn = None
         self._properties = properties
         # The handler is a pyngus ConnectionEventHandler, which is invoked by
         # pyngus on connection-related events (active, closed, error, etc).
         # Currently it is the Controller object.
         self._handler = handler
         self._container = container
-        self.connection = None
 
     def fileno(self):
         """Allows use of a _SocketConnection in a select() call.
         """
         return self.socket.fileno()
 
-    def read(self):
-        """Called when socket is read-ready."""
+    def read_socket(self):
+        """Called to read from the socket."""
         while True:
             try:
-                rc = pyngus.read_socket_input(self.connection, self.socket)
-                self.connection.process(now())
+                rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
+                self.pyngus_conn.process(now())
                 return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
-                self.connection.close_input()
-                self.connection.close_output()
+                self.pyngus_conn.close_input()
+                self.pyngus_conn.close_output()
                 self._handler.socket_error(str(e))
                 return pyngus.Connection.EOS
 
-    def write(self):
-        """Called when socket is write-ready."""
+    def write_socket(self):
+        """Called to write to the socket."""
         while True:
             try:
-                rc = pyngus.write_socket_output(self.connection, self.socket)
-                self.connection.process(now())
+                rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
+                self.pyngus_conn.process(now())
                 return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
-                self.connection.close_output()
-                self.connection.close_input()
+                self.pyngus_conn.close_output()
+                self.pyngus_conn.close_input()
                 self._handler.socket_error(str(e))
                 return pyngus.Connection.EOS
 
@@ -127,15 +127,16 @@ class _SocketConnection(object):
                 props['x-username'] = host.username
                 props['x-password'] = host.password or ""
 
-        c = self._container.create_connection(self.name, self._handler, props)
-        c.user_context = self
-        self.connection = c
+        self.pyngus_conn = self._container.create_connection(self.name,
+                                                             self._handler,
+                                                             props)
+        self.pyngus_conn.user_context = self
 
         if pyngus.VERSION < (2, 0, 0):
             # older versions of pyngus requires manual SASL configuration:
             # determine the proper SASL mechanism: PLAIN if a username/password
             # is present, else ANONYMOUS
-            pn_sasl = self.connection.pn_sasl
+            pn_sasl = self.pyngus_conn.pn_sasl
             if host.username:
                 password = host.password if host.password else ""
                 pn_sasl.plain(host.username, password)
@@ -143,7 +144,7 @@ class _SocketConnection(object):
                 pn_sasl.mechanisms("ANONYMOUS")
                 pn_sasl.client()
 
-        self.connection.open()
+        self.pyngus_conn.open()
 
     def reset(self, name=None):
         """Clean up the current state, expect 'connect()' to be recalled
@@ -151,9 +152,9 @@ class _SocketConnection(object):
         """
         # note well: since destroy() is called on the connection, do not invoke
         # this method from a pyngus callback!
-        if self.connection:
-            self.connection.destroy()
-            self.connection = None
+        if self.pyngus_conn:
+            self.pyngus_conn.destroy()
+            self.pyngus_conn = None
         self.close()
         if name:
             self.name = name
@@ -239,30 +240,36 @@ class Requests(object):
     loop.
     """
     def __init__(self):
-        self._requests = moves.queue.Queue(maxsize=10)
+        self._requests = collections.deque()
         self._wakeup_pipe = os.pipe()
+        self._pipe_ready = False  # prevents blocking on an empty pipe
+        self._pipe_lock = threading.Lock()
 
     def wakeup(self, request=None):
         """Enqueue a callable to be executed by the eventloop, and force the
         eventloop thread to wake up from select().
         """
-        if request:
-            self._requests.put(request)
-        os.write(self._wakeup_pipe[1], b'!')
+        with self._pipe_lock:
+            if request:
+                self._requests.append(request)
+            if not self._pipe_ready:
+                self._pipe_ready = True
+                os.write(self._wakeup_pipe[1], b'!')
 
     def fileno(self):
         """Allows this request queue to be used by select()."""
         return self._wakeup_pipe[0]
 
-    def read(self):
+    def process_requests(self):
         """Invoked by the eventloop thread, execute each queued callable."""
-        os.read(self._wakeup_pipe[0], 512)
-        # first pop of all current tasks
-        requests = []
-        while not self._requests.empty():
-            requests.append(self._requests.get())
-        # then process them, this allows callables to re-register themselves to
-        # be run on the next iteration of the I/O loop
+        with self._pipe_lock:
+            if not self._pipe_ready:
+                return
+            self._pipe_ready = False
+            os.read(self._wakeup_pipe[0], 512)
+            requests = self._requests
+            self._requests = collections.deque()
+
         for r in requests:
             r()
 
@@ -279,6 +286,8 @@ class Thread(threading.Thread):
         # delayed callables (only used on this thread for now):
         self._scheduler = Scheduler()
 
+        self._connection = None
+
         # Configure a container
         if container_name is None:
             container_name = ("openstack.org/om/container/%s/%s/%s/%s" %
@@ -334,6 +343,7 @@ class Thread(threading.Thread):
         sc = _SocketConnection(key, self._container,
                                properties, handler=handler)
         sc.connect(host)
+        self._connection = sc
         return sc
 
     def run(self):
@@ -342,18 +352,22 @@ class Thread(threading.Thread):
                   self._container.name)
 
         while not self._shutdown:
-            readers, writers, timers = self._container.need_processing()
 
-            readfds = [c.user_context for c in readers]
-            # additionally, always check for readability of pipe we
-            # are using to wakeup processing thread by other threads
-            readfds.append(self._requests)
-            writefds = [c.user_context for c in writers]
+            readfds = [self._requests]
+            writefds = []
+            deadline = self._scheduler._next_deadline
+
+            pyngus_conn = self._connection and self._connection.pyngus_conn
+            if pyngus_conn:
+                if pyngus_conn.needs_input:
+                    readfds.append(self._connection)
+                if pyngus_conn.has_output:
+                    writefds.append(self._connection)
+                if pyngus_conn.deadline:
+                    deadline = (pyngus_conn.deadline if not deadline else
+                                min(deadline, pyngus_conn.deadline))
 
             # force select to return in time to service the next expiring timer
-            d1 = self._scheduler._next_deadline
-            d2 = timers[0].deadline if timers else None
-            deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
             if deadline:
                 _now = now()
                 timeout = 0 if deadline <= _now else (deadline - _now)
@@ -362,7 +376,7 @@ class Thread(threading.Thread):
 
             # and now we wait...
             try:
-                results = select.select(readfds, writefds, [], timeout)
+                select.select(readfds, writefds, [], timeout)
             except select.error as serror:
                 if serror[0] == errno.EINTR:
                     LOG.warning(_LW("ignoring interrupt from select(): %s"),
@@ -370,20 +384,17 @@ class Thread(threading.Thread):
                     continue
                 raise  # assuming fatal...
 
-            readable, writable, ignore = results
-
-            for r in readable:
-                r.read()
-
-            if timers:
-                _now = now()
-                for t in timers:
-                    if t.deadline > _now:
-                        break
-                    t.process(_now)
-
-            for w in writable:
-                w.write()
+            # Ignore the select return value - simply poll the socket for I/O.
+            # Testing shows that polling improves latency over checking the
+            # lists returned by select()
+            self._requests.process_requests()
+            if pyngus_conn:
+                self._connection.read_socket()
+                if pyngus_conn.deadline:
+                    _now = now()
+                    if pyngus_conn.deadline <= _now:
+                        pyngus_conn.process(_now)
+                self._connection.write_socket()
 
             self._scheduler._process()  # run any deferred requests