From 41e23c24368fe1fba8f1b754bd310b0c7965a042 Mon Sep 17 00:00:00 2001
From: Kenneth Giusti <kgiusti@gmail.com>
Date: Mon, 28 Nov 2016 16:04:27 -0500
Subject: [PATCH] [AMQP 1.0] Simplify the I/O event loop code

Leverage the fact that there is never more than one socket active to
simplify the event loop's I/O processing.  This removes the need to
look up active connections.  Also removes the need to query the return
values from select since there is only one socket to process. This
change improves RPC throughput under the simulator by up to 20%

As part of this change I've clarified the code's use of the pyngus
connection by renaming the connection property to something a bit more
specific.

Change-Id: If7c020bb0bd96490af78bc06659db10073b02417
---
 .../_drivers/amqp1_driver/controller.py       |  20 +--
 .../_drivers/amqp1_driver/eventloop.py        | 127 ++++++++++--------
 2 files changed, 79 insertions(+), 68 deletions(-)

diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py
index 007ca3a88..4a9361ceb 100644
--- a/oslo_messaging/_drivers/amqp1_driver/controller.py
+++ b/oslo_messaging/_drivers/amqp1_driver/controller.py
@@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler):
                             self.link_retry_delay, send_task.service)
             self._all_senders[key] = sender
             if self.reply_link and self.reply_link.active:
-                sender.attach(self._socket_connection.connection,
+                sender.attach(self._socket_connection.pyngus_conn,
                               self.reply_link, self.addresser)
         self._active_senders.add(key)
         sender.send_message(send_task)
@@ -901,7 +901,7 @@ class Controller(pyngus.ConnectionEventHandler):
             self._servers[key] = servers
         servers[subscribe_task._subscriber_id] = server
         if self._active:
-            server.attach(self._socket_connection.connection,
+            server.attach(self._socket_connection.pyngus_conn,
                           self.addresser)
 
     # commands executed on the processor (eventloop) via 'wakeup()':
@@ -980,7 +980,7 @@ class Controller(pyngus.ConnectionEventHandler):
             self._detach_senders()
             self._detach_servers()
             self.reply_link.detach()
-            self._socket_connection.connection.close()
+            self._socket_connection.pyngus_conn.close()
         else:
             # don't wait for a close from the remote, may never happen
             self.processor.shutdown()
@@ -996,7 +996,7 @@ class Controller(pyngus.ConnectionEventHandler):
                  {'hostname': self.hosts.current.hostname,
                   'port': self.hosts.current.port})
         for sender in itervalues(self._all_senders):
-            sender.attach(self._socket_connection.connection,
+            sender.attach(self._socket_connection.pyngus_conn,
                           self.reply_link, self.addresser)
 
     def _reply_link_down(self):
@@ -1005,7 +1005,7 @@ class Controller(pyngus.ConnectionEventHandler):
         if not self._closing:
             self._detach_senders()
             self._detach_servers()
-            self._socket_connection.connection.close()
+            self._socket_connection.pyngus_conn.close()
             # once closed, _handle_connection_loss() will initiate reconnect
 
     # callback from eventloop on socket error
@@ -1022,7 +1022,7 @@ class Controller(pyngus.ConnectionEventHandler):
         """This is a Pyngus callback, invoked by Pyngus when a non-recoverable
         error occurs on the connection.
         """
-        if connection is not self._socket_connection.connection:
+        if connection is not self._socket_connection.pyngus_conn:
             # pyngus bug: ignore failure callback on destroyed connections
             return
         LOG.debug("AMQP Connection failure: %s", error)
@@ -1042,9 +1042,9 @@ class Controller(pyngus.ConnectionEventHandler):
         self.addresser = self.addresser_factory(props)
         for servers in itervalues(self._servers):
             for server in itervalues(servers):
-                server.attach(self._socket_connection.connection,
+                server.attach(self._socket_connection.pyngus_conn,
                               self.addresser)
-        self.reply_link = Replies(self._socket_connection.connection,
+        self.reply_link = Replies(self._socket_connection.pyngus_conn,
                                   self._reply_link_ready,
                                   self._reply_link_down,
                                   self._reply_credit)
@@ -1075,7 +1075,7 @@ class Controller(pyngus.ConnectionEventHandler):
         self._detach_senders()
         self._detach_servers()
         self.reply_link.detach()
-        self._socket_connection.connection.close()
+        self._socket_connection.pyngus_conn.close()
 
     def sasl_done(self, connection, pn_sasl, outcome):
         """This is a Pyngus callback invoked when the SASL handshake
@@ -1189,4 +1189,4 @@ class Controller(pyngus.ConnectionEventHandler):
     def _active(self):
         # Is the connection up
         return (self._socket_connection
-                and self._socket_connection.connection.active)
+                and self._socket_connection.pyngus_conn.active)
diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
index 71dce0827..0f3b5da02 100644
--- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py
+++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py
@@ -22,6 +22,7 @@ protocol specific intelligence is provided by the Controller and executed on
 the background thread via callables.
 """
 
+import collections
 import errno
 import heapq
 import logging
@@ -34,7 +35,6 @@ import threading
 import uuid
 
 import pyngus
-from six import moves
 
 from oslo_messaging._i18n import _LE, _LI, _LW
 LOG = logging.getLogger(__name__)
@@ -54,44 +54,44 @@ class _SocketConnection(object):
     def __init__(self, name, container, properties, handler):
         self.name = name
         self.socket = None
+        self.pyngus_conn = None
         self._properties = properties
         # The handler is a pyngus ConnectionEventHandler, which is invoked by
         # pyngus on connection-related events (active, closed, error, etc).
         # Currently it is the Controller object.
         self._handler = handler
         self._container = container
-        self.connection = None
 
     def fileno(self):
         """Allows use of a _SocketConnection in a select() call.
         """
         return self.socket.fileno()
 
-    def read(self):
-        """Called when socket is read-ready."""
+    def read_socket(self):
+        """Called to read from the socket."""
         while True:
             try:
-                rc = pyngus.read_socket_input(self.connection, self.socket)
-                self.connection.process(now())
+                rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
+                self.pyngus_conn.process(now())
                 return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
-                self.connection.close_input()
-                self.connection.close_output()
+                self.pyngus_conn.close_input()
+                self.pyngus_conn.close_output()
                 self._handler.socket_error(str(e))
                 return pyngus.Connection.EOS
 
-    def write(self):
-        """Called when socket is write-ready."""
+    def write_socket(self):
+        """Called to write to the socket."""
         while True:
             try:
-                rc = pyngus.write_socket_output(self.connection, self.socket)
-                self.connection.process(now())
+                rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
+                self.pyngus_conn.process(now())
                 return rc
             except (socket.timeout, socket.error) as e:
                 # pyngus handles EAGAIN/EWOULDBLOCK and EINTER
-                self.connection.close_output()
-                self.connection.close_input()
+                self.pyngus_conn.close_output()
+                self.pyngus_conn.close_input()
                 self._handler.socket_error(str(e))
                 return pyngus.Connection.EOS
 
@@ -127,15 +127,16 @@ class _SocketConnection(object):
                 props['x-username'] = host.username
                 props['x-password'] = host.password or ""
 
-        c = self._container.create_connection(self.name, self._handler, props)
-        c.user_context = self
-        self.connection = c
+        self.pyngus_conn = self._container.create_connection(self.name,
+                                                             self._handler,
+                                                             props)
+        self.pyngus_conn.user_context = self
 
         if pyngus.VERSION < (2, 0, 0):
             # older versions of pyngus requires manual SASL configuration:
             # determine the proper SASL mechanism: PLAIN if a username/password
             # is present, else ANONYMOUS
-            pn_sasl = self.connection.pn_sasl
+            pn_sasl = self.pyngus_conn.pn_sasl
             if host.username:
                 password = host.password if host.password else ""
                 pn_sasl.plain(host.username, password)
@@ -143,7 +144,7 @@ class _SocketConnection(object):
                 pn_sasl.mechanisms("ANONYMOUS")
                 pn_sasl.client()
 
-        self.connection.open()
+        self.pyngus_conn.open()
 
     def reset(self, name=None):
         """Clean up the current state, expect 'connect()' to be recalled
@@ -151,9 +152,9 @@ class _SocketConnection(object):
         """
         # note well: since destroy() is called on the connection, do not invoke
         # this method from a pyngus callback!
-        if self.connection:
-            self.connection.destroy()
-            self.connection = None
+        if self.pyngus_conn:
+            self.pyngus_conn.destroy()
+            self.pyngus_conn = None
         self.close()
         if name:
             self.name = name
@@ -239,30 +240,36 @@ class Requests(object):
     loop.
     """
     def __init__(self):
-        self._requests = moves.queue.Queue(maxsize=10)
+        self._requests = collections.deque()
         self._wakeup_pipe = os.pipe()
+        self._pipe_ready = False  # prevents blocking on an empty pipe
+        self._pipe_lock = threading.Lock()
 
     def wakeup(self, request=None):
         """Enqueue a callable to be executed by the eventloop, and force the
         eventloop thread to wake up from select().
         """
-        if request:
-            self._requests.put(request)
-        os.write(self._wakeup_pipe[1], b'!')
+        with self._pipe_lock:
+            if request:
+                self._requests.append(request)
+            if not self._pipe_ready:
+                self._pipe_ready = True
+                os.write(self._wakeup_pipe[1], b'!')
 
     def fileno(self):
         """Allows this request queue to be used by select()."""
         return self._wakeup_pipe[0]
 
-    def read(self):
+    def process_requests(self):
         """Invoked by the eventloop thread, execute each queued callable."""
-        os.read(self._wakeup_pipe[0], 512)
-        # first pop of all current tasks
-        requests = []
-        while not self._requests.empty():
-            requests.append(self._requests.get())
-        # then process them, this allows callables to re-register themselves to
-        # be run on the next iteration of the I/O loop
+        with self._pipe_lock:
+            if not self._pipe_ready:
+                return
+            self._pipe_ready = False
+            os.read(self._wakeup_pipe[0], 512)
+            requests = self._requests
+            self._requests = collections.deque()
+
         for r in requests:
             r()
 
@@ -279,6 +286,8 @@ class Thread(threading.Thread):
         # delayed callables (only used on this thread for now):
         self._scheduler = Scheduler()
 
+        self._connection = None
+
         # Configure a container
         if container_name is None:
             container_name = ("openstack.org/om/container/%s/%s/%s/%s" %
@@ -334,6 +343,7 @@ class Thread(threading.Thread):
         sc = _SocketConnection(key, self._container,
                                properties, handler=handler)
         sc.connect(host)
+        self._connection = sc
         return sc
 
     def run(self):
@@ -342,18 +352,22 @@ class Thread(threading.Thread):
                   self._container.name)
 
         while not self._shutdown:
-            readers, writers, timers = self._container.need_processing()
 
-            readfds = [c.user_context for c in readers]
-            # additionally, always check for readability of pipe we
-            # are using to wakeup processing thread by other threads
-            readfds.append(self._requests)
-            writefds = [c.user_context for c in writers]
+            readfds = [self._requests]
+            writefds = []
+            deadline = self._scheduler._next_deadline
+
+            pyngus_conn = self._connection and self._connection.pyngus_conn
+            if pyngus_conn:
+                if pyngus_conn.needs_input:
+                    readfds.append(self._connection)
+                if pyngus_conn.has_output:
+                    writefds.append(self._connection)
+                if pyngus_conn.deadline:
+                    deadline = (pyngus_conn.deadline if not deadline else
+                                min(deadline, pyngus_conn.deadline))
 
             # force select to return in time to service the next expiring timer
-            d1 = self._scheduler._next_deadline
-            d2 = timers[0].deadline if timers else None
-            deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
             if deadline:
                 _now = now()
                 timeout = 0 if deadline <= _now else (deadline - _now)
@@ -362,7 +376,7 @@ class Thread(threading.Thread):
 
             # and now we wait...
             try:
-                results = select.select(readfds, writefds, [], timeout)
+                select.select(readfds, writefds, [], timeout)
             except select.error as serror:
                 if serror[0] == errno.EINTR:
                     LOG.warning(_LW("ignoring interrupt from select(): %s"),
@@ -370,20 +384,17 @@ class Thread(threading.Thread):
                     continue
                 raise  # assuming fatal...
 
-            readable, writable, ignore = results
-
-            for r in readable:
-                r.read()
-
-            if timers:
-                _now = now()
-                for t in timers:
-                    if t.deadline > _now:
-                        break
-                    t.process(_now)
-
-            for w in writable:
-                w.write()
+            # Ignore the select return value - simply poll the socket for I/O.
+            # Testing shows that polling improves latency over checking the
+            # lists returned by select()
+            self._requests.process_requests()
+            if pyngus_conn:
+                self._connection.read_socket()
+                if pyngus_conn.deadline:
+                    _now = now()
+                    if pyngus_conn.deadline <= _now:
+                        pyngus_conn.process(_now)
+                self._connection.write_socket()
 
             self._scheduler._process()  # run any deferred requests