diff --git a/oslo_messaging/_drivers/amqp.py b/oslo_messaging/_drivers/amqp.py
index 2308b80d1..86f41adde 100644
--- a/oslo_messaging/_drivers/amqp.py
+++ b/oslo_messaging/_drivers/amqp.py
@@ -31,7 +31,6 @@ from oslo_config import cfg
 import six
 
 from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._drivers import pool
 
 deprecated_durable_opts = [
     cfg.DeprecatedOpt('amqp_durable_queues',
@@ -66,122 +65,6 @@ amqp_opts = [
 UNIQUE_ID = '_unique_id'
 LOG = logging.getLogger(__name__)
 
-# NOTE(sileht): Even if rabbit has only one Connection class,
-# this connection can be used for two purposes:
-# * wait and receive amqp messages (only do read stuffs on the socket)
-# * send messages to the broker (only do write stuffs on the socket)
-# The code inside a connection class is not concurrency safe.
-# Using one Connection class instance for doing both, will result
-# of eventlet complaining of multiple greenthreads that read/write the
-# same fd concurrently... because 'send' and 'listen' run in different
-# greenthread.
-# So, a connection cannot be shared between thread/greenthread and
-# this two variables permit to define the purpose of the connection
-# to allow drivers to add special handling if needed (like heatbeat).
-# amqp drivers create 3 kind of connections:
-# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
-# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
-# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
-#   to wait replies of rpc call
-PURPOSE_LISTEN = 'listen'
-PURPOSE_SEND = 'send'
-
-
-class ConnectionPool(pool.Pool):
-    """Class that implements a Pool of Connections."""
-    def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
-        self.connection_cls = connection_cls
-        self.conf = conf
-        self.url = url
-        super(ConnectionPool, self).__init__(rpc_conn_pool_size)
-        self.reply_proxy = None
-
-    # TODO(comstud): Timeout connections not used in a while
-    def create(self, purpose=None):
-        if purpose is None:
-            purpose = PURPOSE_SEND
-        LOG.debug('Pool creating new connection')
-        return self.connection_cls(self.conf, self.url, purpose)
-
-    def empty(self):
-        for item in self.iter_free():
-            item.close()
-
-
-class ConnectionContext(rpc_common.Connection):
-    """The class that is actually returned to the create_connection() caller.
-
-    This is essentially a wrapper around Connection that supports 'with'.
-    It can also return a new Connection, or one from a pool.
-
-    The function will also catch when an instance of this class is to be
-    deleted.  With that we can return Connections to the pool on exceptions
-    and so forth without making the caller be responsible for catching them.
-    If possible the function makes sure to return a connection to the pool.
-    """
-
-    def __init__(self, connection_pool, purpose):
-        """Create a new connection, or get one from the pool."""
-        self.connection = None
-        self.connection_pool = connection_pool
-        pooled = purpose == PURPOSE_SEND
-        if pooled:
-            self.connection = connection_pool.get()
-        else:
-            # a non-pooled connection is requested, so create a new connection
-            self.connection = connection_pool.create(purpose)
-        self.pooled = pooled
-        self.connection.pooled = pooled
-
-    def __enter__(self):
-        """When with ConnectionContext() is used, return self."""
-        return self
-
-    def _done(self):
-        """If the connection came from a pool, clean it up and put it back.
-        If it did not come from a pool, close it.
-        """
-        if self.connection:
-            if self.pooled:
-                # Reset the connection so it's ready for the next caller
-                # to grab from the pool
-                try:
-                    self.connection.reset()
-                except Exception:
-                    LOG.exception("Fail to reset the connection, drop it")
-                    try:
-                        self.connection.close()
-                    except Exception:
-                        pass
-                    self.connection = self.connection_pool.create()
-                finally:
-                    self.connection_pool.put(self.connection)
-            else:
-                try:
-                    self.connection.close()
-                except Exception:
-                    pass
-            self.connection = None
-
-    def __exit__(self, exc_type, exc_value, tb):
-        """End of 'with' statement.  We're done here."""
-        self._done()
-
-    def __del__(self):
-        """Caller is done with this connection.  Make sure we cleaned up."""
-        self._done()
-
-    def close(self):
-        """Caller is done with this connection."""
-        self._done()
-
-    def __getattr__(self, key):
-        """Proxy all other calls to the Connection instance."""
-        if self.connection:
-            return getattr(self.connection, key)
-        else:
-            raise rpc_common.InvalidRPCConnectionReuse()
-
 
 class RpcContext(rpc_common.CommonRpcContext):
     """Context that supports replying to a rpc.call."""
diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py
index d3405086a..420587c45 100644
--- a/oslo_messaging/_drivers/amqpdriver.py
+++ b/oslo_messaging/_drivers/amqpdriver.py
@@ -100,7 +100,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
             return
 
         with self.listener.driver._get_connection(
-                rpc_amqp.PURPOSE_SEND) as conn:
+                rpc_common.PURPOSE_SEND) as conn:
             if self.listener.driver.send_single_reply:
                 self._send_reply(conn, reply, failure, log_failure=log_failure,
                                  ending=True)
@@ -366,9 +366,9 @@ class AMQPDriverBase(base.BaseDriver):
     def _get_exchange(self, target):
         return target.exchange or self._default_exchange
 
-    def _get_connection(self, purpose=rpc_amqp.PURPOSE_SEND):
-        return rpc_amqp.ConnectionContext(self._connection_pool,
-                                          purpose=purpose)
+    def _get_connection(self, purpose=rpc_common.PURPOSE_SEND):
+        return rpc_common.ConnectionContext(self._connection_pool,
+                                            purpose=purpose)
 
     def _get_reply_q(self):
         with self._reply_q_lock:
@@ -377,7 +377,7 @@ class AMQPDriverBase(base.BaseDriver):
 
             reply_q = 'reply_' + uuid.uuid4().hex
 
-            conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
+            conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
             self._waiter = ReplyWaiter(reply_q, conn,
                                        self._allowed_remote_exmods)
@@ -422,7 +422,7 @@ class AMQPDriverBase(base.BaseDriver):
             log_msg = "CAST unique_id: %s " % unique_id
 
         try:
-            with self._get_connection(rpc_amqp.PURPOSE_SEND) as conn:
+            with self._get_connection(rpc_common.PURPOSE_SEND) as conn:
                 if notify:
                     exchange = self._get_exchange(target)
                     log_msg += "NOTIFY exchange '%(exchange)s'" \
@@ -468,7 +468,7 @@ class AMQPDriverBase(base.BaseDriver):
                           envelope=(version == 2.0), notify=True, retry=retry)
 
     def listen(self, target):
-        conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
+        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
         listener = AMQPListener(self, conn)
 
@@ -484,7 +484,7 @@ class AMQPDriverBase(base.BaseDriver):
         return listener
 
     def listen_for_notifications(self, targets_and_priorities, pool):
-        conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)
+        conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
 
         listener = AMQPListener(self, conn)
         for target, priority in targets_and_priorities:
diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py
index 78bdd9239..02c04805d 100644
--- a/oslo_messaging/_drivers/common.py
+++ b/oslo_messaging/_drivers/common.py
@@ -348,3 +348,99 @@ class DecayingTimer(object):
         if left <= 0 and timeout_callback is not None:
             timeout_callback(*args, **kwargs)
         return left if maximum is None else min(left, maximum)
+
+
+# NOTE(sileht): Even if rabbit has only one Connection class,
+# this connection can be used for two purposes:
+# * wait and receive amqp messages (only do read stuffs on the socket)
+# * send messages to the broker (only do write stuffs on the socket)
+# The code inside a connection class is not concurrency safe.
+# Using one Connection class instance for doing both, will result
+# of eventlet complaining of multiple greenthreads that read/write the
+# same fd concurrently... because 'send' and 'listen' run in different
+# greenthread.
+# So, a connection cannot be shared between thread/greenthread and
+# this two variables permit to define the purpose of the connection
+# to allow drivers to add special handling if needed (like heatbeat).
+# amqp drivers create 3 kind of connections:
+# * driver.listen*(): each call create a new 'PURPOSE_LISTEN' connection
+# * driver.send*(): a pool of 'PURPOSE_SEND' connections is used
+# * driver internally have another 'PURPOSE_LISTEN' connection dedicated
+#   to wait replies of rpc call
+PURPOSE_LISTEN = 'listen'
+PURPOSE_SEND = 'send'
+
+
+class ConnectionContext(Connection):
+    """The class that is actually returned to the create_connection() caller.
+
+    This is essentially a wrapper around Connection that supports 'with'.
+    It can also return a new Connection, or one from a pool.
+
+    The function will also catch when an instance of this class is to be
+    deleted.  With that we can return Connections to the pool on exceptions
+    and so forth without making the caller be responsible for catching them.
+    If possible the function makes sure to return a connection to the pool.
+    """
+
+    def __init__(self, connection_pool, purpose):
+        """Create a new connection, or get one from the pool."""
+        self.connection = None
+        self.connection_pool = connection_pool
+        pooled = purpose == PURPOSE_SEND
+        if pooled:
+            self.connection = connection_pool.get()
+        else:
+            # a non-pooled connection is requested, so create a new connection
+            self.connection = connection_pool.create(purpose)
+        self.pooled = pooled
+        self.connection.pooled = pooled
+
+    def __enter__(self):
+        """When with ConnectionContext() is used, return self."""
+        return self
+
+    def _done(self):
+        """If the connection came from a pool, clean it up and put it back.
+        If it did not come from a pool, close it.
+        """
+        if self.connection:
+            if self.pooled:
+                # Reset the connection so it's ready for the next caller
+                # to grab from the pool
+                try:
+                    self.connection.reset()
+                except Exception:
+                    LOG.exception("Fail to reset the connection, drop it")
+                    try:
+                        self.connection.close()
+                    except Exception:
+                        pass
+                    self.connection = self.connection_pool.create()
+                finally:
+                    self.connection_pool.put(self.connection)
+            else:
+                try:
+                    self.connection.close()
+                except Exception:
+                    pass
+            self.connection = None
+
+    def __exit__(self, exc_type, exc_value, tb):
+        """End of 'with' statement.  We're done here."""
+        self._done()
+
+    def __del__(self):
+        """Caller is done with this connection.  Make sure we cleaned up."""
+        self._done()
+
+    def close(self):
+        """Caller is done with this connection."""
+        self._done()
+
+    def __getattr__(self, key):
+        """Proxy all other calls to the Connection instance."""
+        if self.connection:
+            return getattr(self.connection, key)
+        else:
+            raise InvalidRPCConnectionReuse()
diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py
index 58cd3cd4d..d96a39838 100644
--- a/oslo_messaging/_drivers/impl_rabbit.py
+++ b/oslo_messaging/_drivers/impl_rabbit.py
@@ -37,6 +37,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
 from oslo_messaging._drivers import amqpdriver
 from oslo_messaging._drivers import base
 from oslo_messaging._drivers import common as rpc_common
+from oslo_messaging._drivers import pool
 from oslo_messaging._i18n import _
 from oslo_messaging._i18n import _LE
 from oslo_messaging._i18n import _LI
@@ -456,7 +457,7 @@ class Connection(object):
         # NOTE(sileht): if purpose is PURPOSE_LISTEN
         # we don't need the lock because we don't
         # have a heartbeat thread
-        if purpose == rpc_amqp.PURPOSE_SEND:
+        if purpose == rpc_common.PURPOSE_SEND:
             self._connection_lock = ConnectionLock()
         else:
             self._connection_lock = DummyConnectionLock()
@@ -496,7 +497,7 @@ class Connection(object):
         # the consume code does the heartbeat stuff
         # we don't need a thread
         self._heartbeat_thread = None
-        if purpose == rpc_amqp.PURPOSE_SEND:
+        if purpose == rpc_common.PURPOSE_SEND:
             self._heartbeat_start()
 
         LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s '
@@ -1159,7 +1160,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
         conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
         conf.register_opts(base.base_opts, group=opt_group)
 
-        connection_pool = rpc_amqp.ConnectionPool(
+        connection_pool = pool.ConnectionPool(
             conf, conf.oslo_messaging_rabbit.rpc_conn_pool_size,
             url, Connection)
 
diff --git a/oslo_messaging/_drivers/pool.py b/oslo_messaging/_drivers/pool.py
index e689d678a..699ce5c10 100644
--- a/oslo_messaging/_drivers/pool.py
+++ b/oslo_messaging/_drivers/pool.py
@@ -17,8 +17,13 @@ import abc
 import collections
 import threading
 
+from oslo_log import log as logging
 import six
 
+from oslo_messaging._drivers import common
+
+LOG = logging.getLogger(__name__)
+
 
 @six.add_metaclass(abc.ABCMeta)
 class Pool(object):
@@ -86,3 +91,24 @@ class Pool(object):
     @abc.abstractmethod
     def create(self):
         """Construct a new item."""
+
+
+class ConnectionPool(Pool):
+    """Class that implements a Pool of Connections."""
+    def __init__(self, conf, rpc_conn_pool_size, url, connection_cls):
+        self.connection_cls = connection_cls
+        self.conf = conf
+        self.url = url
+        super(ConnectionPool, self).__init__(rpc_conn_pool_size)
+        self.reply_proxy = None
+
+    # TODO(comstud): Timeout connections not used in a while
+    def create(self, purpose=None):
+        if purpose is None:
+            purpose = common.PURPOSE_SEND
+        LOG.debug('Pool creating new connection')
+        return self.connection_cls(self.conf, self.url, purpose)
+
+    def empty(self):
+        for item in self.iter_free():
+            item.close()
diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py
index db1a8c3df..63ce0662a 100644
--- a/oslo_messaging/tests/drivers/test_impl_rabbit.py
+++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py
@@ -177,7 +177,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
     def test_send_with_timeout(self, fake_publish):
         transport = oslo_messaging.get_transport(self.conf,
                                                  'kombu+memory:////')
-        with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
+        with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
             conn = pool_conn.connection
             conn._publish(mock.Mock(), 'msg', routing_key='routing_key',
                           timeout=1)
@@ -187,7 +187,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
     def test_send_no_timeout(self, fake_publish):
         transport = oslo_messaging.get_transport(self.conf,
                                                  'kombu+memory:////')
-        with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
+        with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
             conn = pool_conn.connection
             conn._publish(mock.Mock(), 'msg', routing_key='routing_key')
         fake_publish.assert_called_with('msg', expiration=None)
@@ -207,7 +207,7 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
             type='topic',
             passive=False)
 
-        with transport._driver._get_connection(amqp.PURPOSE_SEND) as pool_conn:
+        with transport._driver._get_connection(driver_common.PURPOSE_SEND) as pool_conn:
             conn = pool_conn.connection
             exc = conn.connection.channel_errors[0]
 
@@ -240,7 +240,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
                                                  'kombu+memory:////')
         self.addCleanup(transport.cleanup)
         deadline = time.time() + 6
-        with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
+        with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
             self.assertRaises(driver_common.Timeout,
                               conn.consume, timeout=3)
 
@@ -259,7 +259,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
         transport = oslo_messaging.get_transport(self.conf,
                                                  'kombu+memory:////')
         self.addCleanup(transport.cleanup)
-        with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
+        with transport._driver._get_connection(driver_common.PURPOSE_LISTEN) as conn:
             channel = conn.connection.channel
             with mock.patch('kombu.connection.Connection.connected',
                             new_callable=mock.PropertyMock,
@@ -902,7 +902,7 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
         # starting from the first broker in the list
         url = oslo_messaging.TransportURL.parse(self.conf, None)
         self.connection = rabbit_driver.Connection(self.conf, url,
-                                                   amqp.PURPOSE_SEND)
+                                                   driver_common.PURPOSE_SEND)
         self.addCleanup(self.connection.close)
 
     def test_ensure_four_retry(self):