From e3c5b99959a7f52dab4deb5b6fdd67083110db87 Mon Sep 17 00:00:00 2001
From: Mark McLoughlin <markmc@redhat.com>
Date: Tue, 23 Jul 2013 17:28:42 +0100
Subject: [PATCH] Port away from some eventlet infrastructure

Add a simple object pool implementation for our connection pool, in
place of eventlet.pools.Pool.

Also use threading.Lock in place of eventlet.Semaphore.

There are still some eventlet modules imported by the code, but we can
avoid using them at runtime and clean things up later. We can't remove
them now or it'll cause pep8 failures.

Change-Id: I380408d1321802de813de541cd0a2d4305c3627c
---
 oslo/messaging/_drivers/amqp.py | 22 ++++----
 oslo/messaging/_drivers/pool.py | 91 +++++++++++++++++++++++++++++++++
 2 files changed, 101 insertions(+), 12 deletions(-)
 create mode 100644 oslo/messaging/_drivers/pool.py

diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
index 6065267f4..b946e88e4 100644
--- a/oslo/messaging/_drivers/amqp.py
+++ b/oslo/messaging/_drivers/amqp.py
@@ -28,15 +28,15 @@ AMQP, but is deprecated and predates this code.
 import collections
 import inspect
 import sys
+import threading
 import uuid
 
 from eventlet import greenpool
-from eventlet import pools
 from eventlet import queue
-from eventlet import semaphore
 from oslo.config import cfg
 
 from oslo.messaging._drivers import common as rpc_common
+from oslo.messaging._drivers import pool
 from oslo.messaging.openstack.common import excutils
 from oslo.messaging.openstack.common.gettextutils import _  # noqa
 from oslo.messaging.openstack.common import local
@@ -58,14 +58,12 @@ UNIQUE_ID = '_unique_id'
 LOG = logging.getLogger(__name__)
 
 
-class Pool(pools.Pool):
+class ConnectionPool(pool.Pool):
     """Class that implements a Pool of Connections."""
-    def __init__(self, conf, connection_cls, *args, **kwargs):
+    def __init__(self, conf, connection_cls):
         self.connection_cls = connection_cls
         self.conf = conf
-        kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
-        kwargs.setdefault("order_as_stack", True)
-        super(Pool, self).__init__(*args, **kwargs)
+        super(ConnectionPool, self).__init__(self.conf.rpc_conn_pool_size)
         self.reply_proxy = None
 
     # TODO(comstud): Timeout connections not used in a while
@@ -74,8 +72,8 @@ class Pool(pools.Pool):
         return self.connection_cls(self.conf)
 
     def empty(self):
-        while self.free_items:
-            self.get().close()
+        for item in self.iter_free:
+            item.close()
         # Force a new connection pool to be created.
         # Note that this was added due to failing unit test cases. The issue
         # is the above "while loop" gets all the cached connections from the
@@ -88,14 +86,14 @@ class Pool(pools.Pool):
         self.connection_cls.pool = None
 
 
-_pool_create_sem = semaphore.Semaphore()
+_pool_create_sem = threading.Lock()
 
 
 def get_connection_pool(conf, connection_cls):
     with _pool_create_sem:
         # Make sure only one thread tries to create the connection pool.
         if not connection_cls.pool:
-            connection_cls.pool = Pool(conf, connection_cls)
+            connection_cls.pool = ConnectionPool(conf, connection_cls)
     return connection_cls.pool
 
 
@@ -517,7 +515,7 @@ def create_connection(conf, new, connection_pool):
     return ConnectionContext(conf, connection_pool, pooled=not new)
 
 
-_reply_proxy_create_sem = semaphore.Semaphore()
+_reply_proxy_create_sem = threading.Lock()
 
 
 def multicall(conf, context, topic, msg, timeout, connection_pool):
diff --git a/oslo/messaging/_drivers/pool.py b/oslo/messaging/_drivers/pool.py
new file mode 100644
index 000000000..b7877d005
--- /dev/null
+++ b/oslo/messaging/_drivers/pool.py
@@ -0,0 +1,91 @@
+
+# Copyright 2013 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import abc
+import collections
+import threading
+
+
+class Pool(object):
+
+    """A thread-safe object pool.
+
+    Modelled after the eventlet.pools.Pool interface, but designed to be safe
+    when using native threads without the GIL.
+
+    Resizing is not supported.
+    """
+
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self, max_size=4):
+        super(Pool, self).__init__()
+
+        self._max_size = max_size
+        self._current_size = 0
+        self._cond = threading.Condition()
+
+        self._items = collections.deque()
+
+    def put(self, item):
+        """Return an item to the pool."""
+        with self._cond:
+            self._items.appendleft(item)
+            self._cond.notify()
+
+    def get(self, only_free=False):
+        """Return an item from the pool, when one is available.
+
+        This may cause the calling thread to block.
+
+        :param only_free: if True, return None if no free item available
+        :type only_free: bool
+        """
+        with self._cond:
+            while True:
+                try:
+                    return self._items.popleft()
+                except IndexError:
+                    if only_free:
+                        return None
+
+                if self._current_size < self._max_size:
+                    self._current_size += 1
+                    break
+
+                # FIXME(markmc): timeout needed to allow keyboard interrupt
+                # http://bugs.python.org/issue8844
+                self._cond.wait(timeout=1)
+
+        # We've grabbed a slot and dropped the lock, now do the creation
+        try:
+            return self.create()
+        except Exception:
+            with self._cond:
+                self._current_size -= 1
+            raise
+
+    def iter_free(self):
+        """Iterate over free items."""
+        with self._cond:
+            while True:
+                try:
+                    yield self._items.popleft()
+                except IndexError:
+                    break
+
+    @abc.abstractmethod
+    def create(self):
+        """Construct a new item."""