diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
new file mode 100644
index 000000000..1afd2abd0
--- /dev/null
+++ b/oslo/messaging/_drivers/amqp.py
@@ -0,0 +1,610 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 - 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+"""
+Shared code between AMQP based openstack.common.rpc implementations.
+
+The code in this module is shared between the rpc implemenations based on AMQP.
+Specifically, this includes impl_kombu and impl_qpid.  impl_carrot also uses
+AMQP, but is deprecated and predates this code.
+"""
+
+import collections
+import inspect
+import sys
+import uuid
+
+from eventlet import greenpool
+from eventlet import pools
+from eventlet import queue
+from eventlet import semaphore
+from oslo.config import cfg
+
+from openstack.common import excutils
+from openstack.common.gettextutils import _  # noqa
+from openstack.common import local
+from openstack.common import log as logging
+from openstack.common.rpc import common as rpc_common
+
+
+amqp_opts = [
+    cfg.BoolOpt('amqp_durable_queues',
+                default=False,
+                deprecated_name='rabbit_durable_queues',
+                deprecated_group='DEFAULT',
+                help='Use durable queues in amqp.'),
+    cfg.BoolOpt('amqp_auto_delete',
+                default=False,
+                help='Auto-delete queues in amqp.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
+
+UNIQUE_ID = '_unique_id'
+LOG = logging.getLogger(__name__)
+
+
+class Pool(pools.Pool):
+    """Class that implements a Pool of Connections."""
+    def __init__(self, conf, connection_cls, *args, **kwargs):
+        self.connection_cls = connection_cls
+        self.conf = conf
+        kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
+        kwargs.setdefault("order_as_stack", True)
+        super(Pool, self).__init__(*args, **kwargs)
+        self.reply_proxy = None
+
+    # TODO(comstud): Timeout connections not used in a while
+    def create(self):
+        LOG.debug(_('Pool creating new connection'))
+        return self.connection_cls(self.conf)
+
+    def empty(self):
+        while self.free_items:
+            self.get().close()
+        # Force a new connection pool to be created.
+        # Note that this was added due to failing unit test cases. The issue
+        # is the above "while loop" gets all the cached connections from the
+        # pool and closes them, but never returns them to the pool, a pool
+        # leak. The unit tests hang waiting for an item to be returned to the
+        # pool. The unit tests get here via the tearDown() method. In the run
+        # time code, it gets here via cleanup() and only appears in service.py
+        # just before doing a sys.exit(), so cleanup() only happens once and
+        # the leakage is not a problem.
+        self.connection_cls.pool = None
+
+
+_pool_create_sem = semaphore.Semaphore()
+
+
+def get_connection_pool(conf, connection_cls):
+    with _pool_create_sem:
+        # Make sure only one thread tries to create the connection pool.
+        if not connection_cls.pool:
+            connection_cls.pool = Pool(conf, connection_cls)
+    return connection_cls.pool
+
+
+class ConnectionContext(rpc_common.Connection):
+    """The class that is actually returned to the create_connection() caller.
+
+    This is essentially a wrapper around Connection that supports 'with'.
+    It can also return a new Connection, or one from a pool.
+
+    The function will also catch when an instance of this class is to be
+    deleted.  With that we can return Connections to the pool on exceptions
+    and so forth without making the caller be responsible for catching them.
+    If possible the function makes sure to return a connection to the pool.
+    """
+
+    def __init__(self, conf, connection_pool, pooled=True, server_params=None):
+        """Create a new connection, or get one from the pool."""
+        self.connection = None
+        self.conf = conf
+        self.connection_pool = connection_pool
+        if pooled:
+            self.connection = connection_pool.get()
+        else:
+            self.connection = connection_pool.connection_cls(
+                conf,
+                server_params=server_params)
+        self.pooled = pooled
+
+    def __enter__(self):
+        """When with ConnectionContext() is used, return self."""
+        return self
+
+    def _done(self):
+        """If the connection came from a pool, clean it up and put it back.
+        If it did not come from a pool, close it.
+        """
+        if self.connection:
+            if self.pooled:
+                # Reset the connection so it's ready for the next caller
+                # to grab from the pool
+                self.connection.reset()
+                self.connection_pool.put(self.connection)
+            else:
+                try:
+                    self.connection.close()
+                except Exception:
+                    pass
+            self.connection = None
+
+    def __exit__(self, exc_type, exc_value, tb):
+        """End of 'with' statement.  We're done here."""
+        self._done()
+
+    def __del__(self):
+        """Caller is done with this connection.  Make sure we cleaned up."""
+        self._done()
+
+    def close(self):
+        """Caller is done with this connection."""
+        self._done()
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        self.connection.create_consumer(topic, proxy, fanout)
+
+    def create_worker(self, topic, proxy, pool_name):
+        self.connection.create_worker(topic, proxy, pool_name)
+
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name,
+                           ack_on_error=True):
+        self.connection.join_consumer_pool(callback,
+                                           pool_name,
+                                           topic,
+                                           exchange_name,
+                                           ack_on_error)
+
+    def consume_in_thread(self):
+        self.connection.consume_in_thread()
+
+    def __getattr__(self, key):
+        """Proxy all other calls to the Connection instance."""
+        if self.connection:
+            return getattr(self.connection, key)
+        else:
+            raise rpc_common.InvalidRPCConnectionReuse()
+
+
+class ReplyProxy(ConnectionContext):
+    """Connection class for RPC replies / callbacks."""
+    def __init__(self, conf, connection_pool):
+        self._call_waiters = {}
+        self._num_call_waiters = 0
+        self._num_call_waiters_wrn_threshhold = 10
+        self._reply_q = 'reply_' + uuid.uuid4().hex
+        super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+        self.declare_direct_consumer(self._reply_q, self._process_data)
+        self.consume_in_thread()
+
+    def _process_data(self, message_data):
+        msg_id = message_data.pop('_msg_id', None)
+        waiter = self._call_waiters.get(msg_id)
+        if not waiter:
+            LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
+                       ', message : %(data)s'), {'msg_id': msg_id,
+                                                 'data': message_data})
+            LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
+        else:
+            waiter.put(message_data)
+
+    def add_call_waiter(self, waiter, msg_id):
+        self._num_call_waiters += 1
+        if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+            LOG.warn(_('Number of call waiters is greater than warning '
+                       'threshhold: %d. There could be a MulticallProxyWaiter '
+                       'leak.') % self._num_call_waiters_wrn_threshhold)
+            self._num_call_waiters_wrn_threshhold *= 2
+        self._call_waiters[msg_id] = waiter
+
+    def del_call_waiter(self, msg_id):
+        self._num_call_waiters -= 1
+        del self._call_waiters[msg_id]
+
+    def get_reply_q(self):
+        return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+              failure=None, ending=False, log_failure=True):
+    """Sends a reply or an error on the channel signified by msg_id.
+
+    Failure should be a sys.exc_info() tuple.
+
+    """
+    with ConnectionContext(conf, connection_pool) as conn:
+        if failure:
+            failure = rpc_common.serialize_remote_exception(failure,
+                                                            log_failure)
+
+        msg = {'result': reply, 'failure': failure}
+        if ending:
+            msg['ending'] = True
+        _add_unique_id(msg)
+        # If a reply_q exists, add the msg_id to the reply and pass the
+        # reply_q to direct_send() to use it as the response queue.
+        # Otherwise use the msg_id for backward compatibilty.
+        if reply_q:
+            msg['_msg_id'] = msg_id
+            conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+        else:
+            conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+
+
+class RpcContext(rpc_common.CommonRpcContext):
+    """Context that supports replying to a rpc.call."""
+    def __init__(self, **kwargs):
+        self.msg_id = kwargs.pop('msg_id', None)
+        self.reply_q = kwargs.pop('reply_q', None)
+        self.conf = kwargs.pop('conf')
+        super(RpcContext, self).__init__(**kwargs)
+
+    def deepcopy(self):
+        values = self.to_dict()
+        values['conf'] = self.conf
+        values['msg_id'] = self.msg_id
+        values['reply_q'] = self.reply_q
+        return self.__class__(**values)
+
+    def reply(self, reply=None, failure=None, ending=False,
+              connection_pool=None, log_failure=True):
+        if self.msg_id:
+            msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+                      reply, failure, ending, log_failure)
+            if ending:
+                self.msg_id = None
+
+
+def unpack_context(conf, msg):
+    """Unpack context from msg."""
+    context_dict = {}
+    for key in list(msg.keys()):
+        # NOTE(vish): Some versions of python don't like unicode keys
+        #             in kwargs.
+        key = str(key)
+        if key.startswith('_context_'):
+            value = msg.pop(key)
+            context_dict[key[9:]] = value
+    context_dict['msg_id'] = msg.pop('_msg_id', None)
+    context_dict['reply_q'] = msg.pop('_reply_q', None)
+    context_dict['conf'] = conf
+    ctx = RpcContext.from_dict(context_dict)
+    rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
+    return ctx
+
+
+def pack_context(msg, context):
+    """Pack context into msg.
+
+    Values for message keys need to be less than 255 chars, so we pull
+    context out into a bunch of separate keys. If we want to support
+    more arguments in rabbit messages, we may want to do the same
+    for args at some point.
+
+    """
+    context_d = dict([('_context_%s' % key, value)
+                      for (key, value) in context.to_dict().iteritems()])
+    msg.update(context_d)
+
+
+class _MsgIdCache(object):
+    """This class checks any duplicate messages."""
+
+    # NOTE: This value is considered can be a configuration item, but
+    #       it is not necessary to change its value in most cases,
+    #       so let this value as static for now.
+    DUP_MSG_CHECK_SIZE = 16
+
+    def __init__(self, **kwargs):
+        self.prev_msgids = collections.deque([],
+                                             maxlen=self.DUP_MSG_CHECK_SIZE)
+
+    def check_duplicate_message(self, message_data):
+        """AMQP consumers may read same message twice when exceptions occur
+           before ack is returned. This method prevents doing it.
+        """
+        if UNIQUE_ID in message_data:
+            msg_id = message_data[UNIQUE_ID]
+            if msg_id not in self.prev_msgids:
+                self.prev_msgids.append(msg_id)
+            else:
+                raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+    """Add unique_id for checking duplicate messages."""
+    unique_id = uuid.uuid4().hex
+    msg.update({UNIQUE_ID: unique_id})
+    LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
+class _ThreadPoolWithWait(object):
+    """Base class for a delayed invocation manager.
+
+    Used by the Connection class to start up green threads
+    to handle incoming messages.
+    """
+
+    def __init__(self, conf, connection_pool):
+        self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
+        self.connection_pool = connection_pool
+        self.conf = conf
+
+    def wait(self):
+        """Wait for all callback threads to exit."""
+        self.pool.waitall()
+
+
+class CallbackWrapper(_ThreadPoolWithWait):
+    """Wraps a straight callback.
+
+    Allows it to be invoked in a green thread.
+    """
+
+    def __init__(self, conf, callback, connection_pool):
+        """Initiates CallbackWrapper object.
+
+        :param conf: cfg.CONF instance
+        :param callback: a callable (probably a function)
+        :param connection_pool: connection pool as returned by
+                                get_connection_pool()
+        """
+        super(CallbackWrapper, self).__init__(
+            conf=conf,
+            connection_pool=connection_pool,
+        )
+        self.callback = callback
+
+    def __call__(self, message_data):
+        self.pool.spawn_n(self.callback, message_data)
+
+
+class ProxyCallback(_ThreadPoolWithWait):
+    """Calls methods on a proxy object based on method and args."""
+
+    def __init__(self, conf, proxy, connection_pool):
+        super(ProxyCallback, self).__init__(
+            conf=conf,
+            connection_pool=connection_pool,
+        )
+        self.proxy = proxy
+        self.msg_id_cache = _MsgIdCache()
+
+    def __call__(self, message_data):
+        """Consumer callback to call a method on a proxy object.
+
+        Parses the message for validity and fires off a thread to call the
+        proxy object method.
+
+        Message data should be a dictionary with two keys:
+            method: string representing the method to call
+            args: dictionary of arg: value
+
+        Example: {'method': 'echo', 'args': {'value': 42}}
+
+        """
+        # It is important to clear the context here, because at this point
+        # the previous context is stored in local.store.context
+        if hasattr(local.store, 'context'):
+            del local.store.context
+        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+        self.msg_id_cache.check_duplicate_message(message_data)
+        ctxt = unpack_context(self.conf, message_data)
+        method = message_data.get('method')
+        args = message_data.get('args', {})
+        version = message_data.get('version')
+        namespace = message_data.get('namespace')
+        if not method:
+            LOG.warn(_('no method for message: %s') % message_data)
+            ctxt.reply(_('No method for message: %s') % message_data,
+                       connection_pool=self.connection_pool)
+            return
+        self.pool.spawn_n(self._process_data, ctxt, version, method,
+                          namespace, args)
+
+    def _process_data(self, ctxt, version, method, namespace, args):
+        """Process a message in a new thread.
+
+        If the proxy object we have has a dispatch method
+        (see rpc.dispatcher.RpcDispatcher), pass it the version,
+        method, and args and let it dispatch as appropriate.  If not, use
+        the old behavior of magically calling the specified method on the
+        proxy we have here.
+        """
+        ctxt.update_store()
+        try:
+            rval = self.proxy.dispatch(ctxt, version, method, namespace,
+                                       **args)
+            # Check if the result was a generator
+            if inspect.isgenerator(rval):
+                for x in rval:
+                    ctxt.reply(x, None, connection_pool=self.connection_pool)
+            else:
+                ctxt.reply(rval, None, connection_pool=self.connection_pool)
+            # This final None tells multicall that it is done.
+            ctxt.reply(ending=True, connection_pool=self.connection_pool)
+        except rpc_common.ClientException as e:
+            LOG.debug(_('Expected exception during message handling (%s)') %
+                      e._exc_info[1])
+            ctxt.reply(None, e._exc_info,
+                       connection_pool=self.connection_pool,
+                       log_failure=False)
+        except Exception:
+            # sys.exc_info() is deleted by LOG.exception().
+            exc_info = sys.exc_info()
+            LOG.error(_('Exception during message handling'),
+                      exc_info=exc_info)
+            ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
+
+
+class MulticallProxyWaiter(object):
+    def __init__(self, conf, msg_id, timeout, connection_pool):
+        self._msg_id = msg_id
+        self._timeout = timeout or conf.rpc_response_timeout
+        self._reply_proxy = connection_pool.reply_proxy
+        self._done = False
+        self._got_ending = False
+        self._conf = conf
+        self._dataqueue = queue.LightQueue()
+        # Add this caller to the reply proxy's call_waiters
+        self._reply_proxy.add_call_waiter(self, self._msg_id)
+        self.msg_id_cache = _MsgIdCache()
+
+    def put(self, data):
+        self._dataqueue.put(data)
+
+    def done(self):
+        if self._done:
+            return
+        self._done = True
+        # Remove this caller from reply proxy's call_waiters
+        self._reply_proxy.del_call_waiter(self._msg_id)
+
+    def _process_data(self, data):
+        result = None
+        self.msg_id_cache.check_duplicate_message(data)
+        if data['failure']:
+            failure = data['failure']
+            result = rpc_common.deserialize_remote_exception(self._conf,
+                                                             failure)
+        elif data.get('ending', False):
+            self._got_ending = True
+        else:
+            result = data['result']
+        return result
+
+    def __iter__(self):
+        """Return a result until we get a reply with an 'ending' flag."""
+        if self._done:
+            raise StopIteration
+        while True:
+            try:
+                data = self._dataqueue.get(timeout=self._timeout)
+                result = self._process_data(data)
+            except queue.Empty:
+                self.done()
+                raise rpc_common.Timeout()
+            except Exception:
+                with excutils.save_and_reraise_exception():
+                    self.done()
+            if self._got_ending:
+                self.done()
+                raise StopIteration
+            if isinstance(result, Exception):
+                self.done()
+                raise result
+            yield result
+
+
+def create_connection(conf, new, connection_pool):
+    """Create a connection."""
+    return ConnectionContext(conf, connection_pool, pooled=not new)
+
+
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
+def multicall(conf, context, topic, msg, timeout, connection_pool):
+    """Make a call that returns multiple times."""
+    LOG.debug(_('Making synchronous call on %s ...'), topic)
+    msg_id = uuid.uuid4().hex
+    msg.update({'_msg_id': msg_id})
+    LOG.debug(_('MSG_ID is %s') % (msg_id))
+    _add_unique_id(msg)
+    pack_context(msg, context)
+
+    with _reply_proxy_create_sem:
+        if not connection_pool.reply_proxy:
+            connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+    msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+    wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+    with ConnectionContext(conf, connection_pool) as conn:
+        conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+    return wait_msg
+
+
+def call(conf, context, topic, msg, timeout, connection_pool):
+    """Sends a message on a topic and wait for a response."""
+    rv = multicall(conf, context, topic, msg, timeout, connection_pool)
+    # NOTE(vish): return the last result from the multicall
+    rv = list(rv)
+    if not rv:
+        return
+    return rv[-1]
+
+
+def cast(conf, context, topic, msg, connection_pool):
+    """Sends a message on a topic without waiting for a response."""
+    LOG.debug(_('Making asynchronous cast on %s...'), topic)
+    _add_unique_id(msg)
+    pack_context(msg, context)
+    with ConnectionContext(conf, connection_pool) as conn:
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
+
+
+def fanout_cast(conf, context, topic, msg, connection_pool):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    LOG.debug(_('Making asynchronous fanout cast...'))
+    _add_unique_id(msg)
+    pack_context(msg, context)
+    with ConnectionContext(conf, connection_pool) as conn:
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+
+
+def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
+    """Sends a message on a topic to a specific server."""
+    _add_unique_id(msg)
+    pack_context(msg, context)
+    with ConnectionContext(conf, connection_pool, pooled=False,
+                           server_params=server_params) as conn:
+        conn.topic_send(topic, rpc_common.serialize_msg(msg))
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg,
+                          connection_pool):
+    """Sends a message on a fanout exchange to a specific server."""
+    _add_unique_id(msg)
+    pack_context(msg, context)
+    with ConnectionContext(conf, connection_pool, pooled=False,
+                           server_params=server_params) as conn:
+        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+
+
+def notify(conf, context, topic, msg, connection_pool, envelope):
+    """Sends a notification event on a topic."""
+    LOG.debug(_('Sending %(event_type)s on %(topic)s'),
+              dict(event_type=msg.get('event_type'),
+                   topic=topic))
+    _add_unique_id(msg)
+    pack_context(msg, context)
+    with ConnectionContext(conf, connection_pool) as conn:
+        if envelope:
+            msg = rpc_common.serialize_msg(msg)
+        conn.notify_send(topic, msg)
+
+
+def cleanup(connection_pool):
+    if connection_pool:
+        connection_pool.empty()
+
+
+def get_control_exchange(conf):
+    return conf.control_exchange
diff --git a/oslo/messaging/_drivers/common.py b/oslo/messaging/_drivers/common.py
new file mode 100644
index 000000000..45bd7a83c
--- /dev/null
+++ b/oslo/messaging/_drivers/common.py
@@ -0,0 +1,509 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 United States Government as represented by the
+# Administrator of the National Aeronautics and Space Administration.
+# All Rights Reserved.
+# Copyright 2011 Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+import sys
+import traceback
+
+from oslo.config import cfg
+import six
+
+from openstack.common.gettextutils import _  # noqa
+from openstack.common import importutils
+from openstack.common import jsonutils
+from openstack.common import local
+from openstack.common import log as logging
+
+
+CONF = cfg.CONF
+LOG = logging.getLogger(__name__)
+
+
+'''RPC Envelope Version.
+
+This version number applies to the top level structure of messages sent out.
+It does *not* apply to the message payload, which must be versioned
+independently.  For example, when using rpc APIs, a version number is applied
+for changes to the API being exposed over rpc.  This version number is handled
+in the rpc proxy and dispatcher modules.
+
+This version number applies to the message envelope that is used in the
+serialization done inside the rpc layer.  See serialize_msg() and
+deserialize_msg().
+
+The current message format (version 2.0) is very simple.  It is:
+
+    {
+        'oslo.version': <RPC Envelope Version as a String>,
+        'oslo.message': <Application Message Payload, JSON encoded>
+    }
+
+Message format version '1.0' is just considered to be the messages we sent
+without a message envelope.
+
+So, the current message envelope just includes the envelope version.  It may
+eventually contain additional information, such as a signature for the message
+payload.
+
+We will JSON encode the application message payload.  The message envelope,
+which includes the JSON encoded application message body, will be passed down
+to the messaging libraries as a dict.
+'''
+_RPC_ENVELOPE_VERSION = '2.0'
+
+_VERSION_KEY = 'oslo.version'
+_MESSAGE_KEY = 'oslo.message'
+
+_REMOTE_POSTFIX = '_Remote'
+
+
+class RPCException(Exception):
+    msg_fmt = _("An unknown RPC related exception occurred.")
+
+    def __init__(self, message=None, **kwargs):
+        self.kwargs = kwargs
+
+        if not message:
+            try:
+                message = self.msg_fmt % kwargs
+
+            except Exception:
+                # kwargs doesn't match a variable in the message
+                # log the issue and the kwargs
+                LOG.exception(_('Exception in string format operation'))
+                for name, value in kwargs.iteritems():
+                    LOG.error("%s: %s" % (name, value))
+                # at least get the core message out if something happened
+                message = self.msg_fmt
+
+        super(RPCException, self).__init__(message)
+
+
+class RemoteError(RPCException):
+    """Signifies that a remote class has raised an exception.
+
+    Contains a string representation of the type of the original exception,
+    the value of the original exception, and the traceback.  These are
+    sent to the parent as a joined string so printing the exception
+    contains all of the relevant info.
+
+    """
+    msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
+
+    def __init__(self, exc_type=None, value=None, traceback=None):
+        self.exc_type = exc_type
+        self.value = value
+        self.traceback = traceback
+        super(RemoteError, self).__init__(exc_type=exc_type,
+                                          value=value,
+                                          traceback=traceback)
+
+
+class Timeout(RPCException):
+    """Signifies that a timeout has occurred.
+
+    This exception is raised if the rpc_response_timeout is reached while
+    waiting for a response from the remote side.
+    """
+    msg_fmt = _('Timeout while waiting on RPC response - '
+                'topic: "%(topic)s", RPC method: "%(method)s" '
+                'info: "%(info)s"')
+
+    def __init__(self, info=None, topic=None, method=None):
+        """Initiates Timeout object.
+
+        :param info: Extra info to convey to the user
+        :param topic: The topic that the rpc call was sent to
+        :param rpc_method_name: The name of the rpc method being
+                                called
+        """
+        self.info = info
+        self.topic = topic
+        self.method = method
+        super(Timeout, self).__init__(
+            None,
+            info=info or _('<unknown>'),
+            topic=topic or _('<unknown>'),
+            method=method or _('<unknown>'))
+
+
+class DuplicateMessageError(RPCException):
+    msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
+
+
+class InvalidRPCConnectionReuse(RPCException):
+    msg_fmt = _("Invalid reuse of an RPC connection.")
+
+
+class UnsupportedRpcVersion(RPCException):
+    msg_fmt = _("Specified RPC version, %(version)s, not supported by "
+                "this endpoint.")
+
+
+class UnsupportedRpcEnvelopeVersion(RPCException):
+    msg_fmt = _("Specified RPC envelope version, %(version)s, "
+                "not supported by this endpoint.")
+
+
+class RpcVersionCapError(RPCException):
+    msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
+
+
+class Connection(object):
+    """A connection, returned by rpc.create_connection().
+
+    This class represents a connection to the message bus used for rpc.
+    An instance of this class should never be created by users of the rpc API.
+    Use rpc.create_connection() instead.
+    """
+    def close(self):
+        """Close the connection.
+
+        This method must be called when the connection will no longer be used.
+        It will ensure that any resources associated with the connection, such
+        as a network connection, and cleaned up.
+        """
+        raise NotImplementedError()
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer on this connection.
+
+        A consumer is associated with a message queue on the backend message
+        bus.  The consumer will read messages from the queue, unpack them, and
+        dispatch them to the proxy object.  The contents of the message pulled
+        off of the queue will determine which method gets called on the proxy
+        object.
+
+        :param topic: This is a name associated with what to consume from.
+                      Multiple instances of a service may consume from the same
+                      topic. For example, all instances of nova-compute consume
+                      from a queue called "compute".  In that case, the
+                      messages will get distributed amongst the consumers in a
+                      round-robin fashion if fanout=False.  If fanout=True,
+                      every consumer associated with this topic will get a
+                      copy of every message.
+        :param proxy: The object that will handle all incoming messages.
+        :param fanout: Whether or not this is a fanout topic.  See the
+                       documentation for the topic parameter for some
+                       additional comments on this.
+        """
+        raise NotImplementedError()
+
+    def create_worker(self, topic, proxy, pool_name):
+        """Create a worker on this connection.
+
+        A worker is like a regular consumer of messages directed to a
+        topic, except that it is part of a set of such consumers (the
+        "pool") which may run in parallel. Every pool of workers will
+        receive a given message, but only one worker in the pool will
+        be asked to process it. Load is distributed across the members
+        of the pool in round-robin fashion.
+
+        :param topic: This is a name associated with what to consume from.
+                      Multiple instances of a service may consume from the same
+                      topic.
+        :param proxy: The object that will handle all incoming messages.
+        :param pool_name: String containing the name of the pool of workers
+        """
+        raise NotImplementedError()
+
+    def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
+        """Register as a member of a group of consumers.
+
+        Uses given topic from the specified exchange.
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+
+        :param callback: Callable to be invoked for each message.
+        :type callback: callable accepting one argument
+        :param pool_name: The name of the consumer pool.
+        :type pool_name: str
+        :param topic: The routing topic for desired messages.
+        :type topic: str
+        :param exchange_name: The name of the message exchange where
+                              the client should attach. Defaults to
+                              the configured exchange.
+        :type exchange_name: str
+        """
+        raise NotImplementedError()
+
+    def consume_in_thread(self):
+        """Spawn a thread to handle incoming messages.
+
+        Spawn a thread that will be responsible for handling all incoming
+        messages for consumers that were set up on this connection.
+
+        Message dispatching inside of this is expected to be implemented in a
+        non-blocking manner.  An example implementation would be having this
+        thread pull messages in for all of the consumers, but utilize a thread
+        pool for dispatching the messages to the proxy objects.
+        """
+        raise NotImplementedError()
+
+
+def _safe_log(log_func, msg, msg_data):
+    """Sanitizes the msg_data field before logging."""
+    SANITIZE = ['_context_auth_token', 'auth_token', 'new_pass']
+
+    def _fix_passwords(d):
+        """Sanitizes the password fields in the dictionary."""
+        for k in d.iterkeys():
+            if k.lower().find('password') != -1:
+                d[k] = '<SANITIZED>'
+            elif k.lower() in SANITIZE:
+                d[k] = '<SANITIZED>'
+            elif isinstance(d[k], dict):
+                _fix_passwords(d[k])
+        return d
+
+    return log_func(msg, _fix_passwords(copy.deepcopy(msg_data)))
+
+
+def serialize_remote_exception(failure_info, log_failure=True):
+    """Prepares exception data to be sent over rpc.
+
+    Failure_info should be a sys.exc_info() tuple.
+
+    """
+    tb = traceback.format_exception(*failure_info)
+    failure = failure_info[1]
+    if log_failure:
+        LOG.error(_("Returning exception %s to caller"),
+                  six.text_type(failure))
+        LOG.error(tb)
+
+    kwargs = {}
+    if hasattr(failure, 'kwargs'):
+        kwargs = failure.kwargs
+
+    # NOTE(matiu): With cells, it's possible to re-raise remote, remote
+    # exceptions. Lets turn it back into the original exception type.
+    cls_name = str(failure.__class__.__name__)
+    mod_name = str(failure.__class__.__module__)
+    if (cls_name.endswith(_REMOTE_POSTFIX) and
+            mod_name.endswith(_REMOTE_POSTFIX)):
+        cls_name = cls_name[:-len(_REMOTE_POSTFIX)]
+        mod_name = mod_name[:-len(_REMOTE_POSTFIX)]
+
+    data = {
+        'class': cls_name,
+        'module': mod_name,
+        'message': six.text_type(failure),
+        'tb': tb,
+        'args': failure.args,
+        'kwargs': kwargs
+    }
+
+    json_data = jsonutils.dumps(data)
+
+    return json_data
+
+
+def deserialize_remote_exception(conf, data):
+    failure = jsonutils.loads(str(data))
+
+    trace = failure.get('tb', [])
+    message = failure.get('message', "") + "\n" + "\n".join(trace)
+    name = failure.get('class')
+    module = failure.get('module')
+
+    # NOTE(ameade): We DO NOT want to allow just any module to be imported, in
+    # order to prevent arbitrary code execution.
+    if module not in conf.allowed_rpc_exception_modules:
+        return RemoteError(name, failure.get('message'), trace)
+
+    try:
+        mod = importutils.import_module(module)
+        klass = getattr(mod, name)
+        if not issubclass(klass, Exception):
+            raise TypeError("Can only deserialize Exceptions")
+
+        failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
+    except (AttributeError, TypeError, ImportError):
+        return RemoteError(name, failure.get('message'), trace)
+
+    ex_type = type(failure)
+    str_override = lambda self: message
+    new_ex_type = type(ex_type.__name__ + _REMOTE_POSTFIX, (ex_type,),
+                       {'__str__': str_override, '__unicode__': str_override})
+    new_ex_type.__module__ = '%s%s' % (module, _REMOTE_POSTFIX)
+    try:
+        # NOTE(ameade): Dynamically create a new exception type and swap it in
+        # as the new type for the exception. This only works on user defined
+        # Exceptions and not core python exceptions. This is important because
+        # we cannot necessarily change an exception message so we must override
+        # the __str__ method.
+        failure.__class__ = new_ex_type
+    except TypeError:
+        # NOTE(ameade): If a core exception then just add the traceback to the
+        # first exception argument.
+        failure.args = (message,) + failure.args[1:]
+    return failure
+
+
+class CommonRpcContext(object):
+    def __init__(self, **kwargs):
+        self.values = kwargs
+
+    def __getattr__(self, key):
+        try:
+            return self.values[key]
+        except KeyError:
+            raise AttributeError(key)
+
+    def to_dict(self):
+        return copy.deepcopy(self.values)
+
+    @classmethod
+    def from_dict(cls, values):
+        return cls(**values)
+
+    def deepcopy(self):
+        return self.from_dict(self.to_dict())
+
+    def update_store(self):
+        local.store.context = self
+
+    def elevated(self, read_deleted=None, overwrite=False):
+        """Return a version of this context with admin flag set."""
+        # TODO(russellb) This method is a bit of a nova-ism.  It makes
+        # some assumptions about the data in the request context sent
+        # across rpc, while the rest of this class does not.  We could get
+        # rid of this if we changed the nova code that uses this to
+        # convert the RpcContext back to its native RequestContext doing
+        # something like nova.context.RequestContext.from_dict(ctxt.to_dict())
+
+        context = self.deepcopy()
+        context.values['is_admin'] = True
+
+        context.values.setdefault('roles', [])
+
+        if 'admin' not in context.values['roles']:
+            context.values['roles'].append('admin')
+
+        if read_deleted is not None:
+            context.values['read_deleted'] = read_deleted
+
+        return context
+
+
+class ClientException(Exception):
+    """Encapsulates actual exception expected to be hit by a RPC proxy object.
+
+    Merely instantiating it records the current exception information, which
+    will be passed back to the RPC client without exceptional logging.
+    """
+    def __init__(self):
+        self._exc_info = sys.exc_info()
+
+
+def catch_client_exception(exceptions, func, *args, **kwargs):
+    try:
+        return func(*args, **kwargs)
+    except Exception as e:
+        if type(e) in exceptions:
+            raise ClientException()
+        else:
+            raise
+
+
+def client_exceptions(*exceptions):
+    """Decorator for manager methods that raise expected exceptions.
+
+    Marking a Manager method with this decorator allows the declaration
+    of expected exceptions that the RPC layer should not consider fatal,
+    and not log as if they were generated in a real error scenario. Note
+    that this will cause listed exceptions to be wrapped in a
+    ClientException, which is used internally by the RPC layer.
+    """
+    def outer(func):
+        def inner(*args, **kwargs):
+            return catch_client_exception(exceptions, func, *args, **kwargs)
+        return inner
+    return outer
+
+
+def version_is_compatible(imp_version, version):
+    """Determine whether versions are compatible.
+
+    :param imp_version: The version implemented
+    :param version: The version requested by an incoming message.
+    """
+    version_parts = version.split('.')
+    imp_version_parts = imp_version.split('.')
+    if int(version_parts[0]) != int(imp_version_parts[0]):  # Major
+        return False
+    if int(version_parts[1]) > int(imp_version_parts[1]):  # Minor
+        return False
+    return True
+
+
+def serialize_msg(raw_msg):
+    # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
+    # information about this format.
+    msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
+           _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+
+    return msg
+
+
+def deserialize_msg(msg):
+    # NOTE(russellb): Hang on to your hats, this road is about to
+    # get a little bumpy.
+    #
+    # Robustness Principle:
+    #    "Be strict in what you send, liberal in what you accept."
+    #
+    # At this point we have to do a bit of guessing about what it
+    # is we just received.  Here is the set of possibilities:
+    #
+    # 1) We received a dict.  This could be 2 things:
+    #
+    #   a) Inspect it to see if it looks like a standard message envelope.
+    #      If so, great!
+    #
+    #   b) If it doesn't look like a standard message envelope, it could either
+    #      be a notification, or a message from before we added a message
+    #      envelope (referred to as version 1.0).
+    #      Just return the message as-is.
+    #
+    # 2) It's any other non-dict type.  Just return it and hope for the best.
+    #    This case covers return values from rpc.call() from before message
+    #    envelopes were used.  (messages to call a method were always a dict)
+
+    if not isinstance(msg, dict):
+        # See #2 above.
+        return msg
+
+    base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+    if not all(map(lambda key: key in msg, base_envelope_keys)):
+        #  See #1.b above.
+        return msg
+
+    # At this point we think we have the message envelope
+    # format we were expecting. (#1.a above)
+
+    if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+        raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
+
+    raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+
+    return raw_msg
diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py
new file mode 100644
index 000000000..99c461943
--- /dev/null
+++ b/oslo/messaging/_drivers/impl_qpid.py
@@ -0,0 +1,739 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack Foundation
+#    Copyright 2011 - 2012, Red Hat, Inc.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import functools
+import itertools
+import time
+import uuid
+
+import eventlet
+import greenlet
+from oslo.config import cfg
+
+from openstack.common import excutils
+from openstack.common.gettextutils import _  # noqa
+from openstack.common import importutils
+from openstack.common import jsonutils
+from openstack.common import log as logging
+from openstack.common.rpc import amqp as rpc_amqp
+from openstack.common.rpc import common as rpc_common
+
+qpid_codec = importutils.try_import("qpid.codec010")
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
+LOG = logging.getLogger(__name__)
+
+qpid_opts = [
+    cfg.StrOpt('qpid_hostname',
+               default='localhost',
+               help='Qpid broker hostname'),
+    cfg.IntOpt('qpid_port',
+               default=5672,
+               help='Qpid broker port'),
+    cfg.ListOpt('qpid_hosts',
+                default=['$qpid_hostname:$qpid_port'],
+                help='Qpid HA cluster host:port pairs'),
+    cfg.StrOpt('qpid_username',
+               default='',
+               help='Username for qpid connection'),
+    cfg.StrOpt('qpid_password',
+               default='',
+               help='Password for qpid connection',
+               secret=True),
+    cfg.StrOpt('qpid_sasl_mechanisms',
+               default='',
+               help='Space separated list of SASL mechanisms to use for auth'),
+    cfg.IntOpt('qpid_heartbeat',
+               default=60,
+               help='Seconds between connection keepalive heartbeats'),
+    cfg.StrOpt('qpid_protocol',
+               default='tcp',
+               help="Transport to use, either 'tcp' or 'ssl'"),
+    cfg.BoolOpt('qpid_tcp_nodelay',
+                default=True,
+                help='Disable Nagle algorithm'),
+]
+
+cfg.CONF.register_opts(qpid_opts)
+
+JSON_CONTENT_TYPE = 'application/json; charset=utf8'
+
+
+class ConsumerBase(object):
+    """Consumer base class."""
+
+    def __init__(self, session, callback, node_name, node_opts,
+                 link_name, link_opts):
+        """Declare a queue on an amqp session.
+
+        'session' is the amqp session to use
+        'callback' is the callback to call when messages are received
+        'node_name' is the first part of the Qpid address string, before ';'
+        'node_opts' will be applied to the "x-declare" section of "node"
+                    in the address string.
+        'link_name' goes into the "name" field of the "link" in the address
+                    string
+        'link_opts' will be applied to the "x-declare" section of "link"
+                    in the address string.
+        """
+        self.callback = callback
+        self.receiver = None
+        self.session = None
+
+        addr_opts = {
+            "create": "always",
+            "node": {
+                "type": "topic",
+                "x-declare": {
+                    "durable": True,
+                    "auto-delete": True,
+                },
+            },
+            "link": {
+                "name": link_name,
+                "durable": True,
+                "x-declare": {
+                    "durable": False,
+                    "auto-delete": True,
+                    "exclusive": False,
+                },
+            },
+        }
+        addr_opts["node"]["x-declare"].update(node_opts)
+        addr_opts["link"]["x-declare"].update(link_opts)
+
+        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+
+        self.connect(session)
+
+    def connect(self, session):
+        """Declare the reciever on connect."""
+        self._declare_receiver(session)
+
+    def reconnect(self, session):
+        """Re-declare the receiver after a qpid reconnect."""
+        self._declare_receiver(session)
+
+    def _declare_receiver(self, session):
+        self.session = session
+        self.receiver = session.receiver(self.address)
+        self.receiver.capacity = 1
+
+    def _unpack_json_msg(self, msg):
+        """Load the JSON data in msg if msg.content_type indicates that it
+           is necessary.  Put the loaded data back into msg.content and
+           update msg.content_type appropriately.
+
+        A Qpid Message containing a dict will have a content_type of
+        'amqp/map', whereas one containing a string that needs to be converted
+        back from JSON will have a content_type of JSON_CONTENT_TYPE.
+
+        :param msg: a Qpid Message object
+        :returns: None
+        """
+        if msg.content_type == JSON_CONTENT_TYPE:
+            msg.content = jsonutils.loads(msg.content)
+            msg.content_type = 'amqp/map'
+
+    def consume(self):
+        """Fetch the message and pass it to the callback object."""
+        message = self.receiver.fetch()
+        try:
+            self._unpack_json_msg(message)
+            msg = rpc_common.deserialize_msg(message.content)
+            self.callback(msg)
+        except Exception:
+            LOG.exception(_("Failed to process message... skipping it."))
+        finally:
+            # TODO(sandy): Need support for optional ack_on_error.
+            self.session.acknowledge(message)
+
+    def get_receiver(self):
+        return self.receiver
+
+    def get_node_name(self):
+        return self.address.split(';')[0]
+
+
+class DirectConsumer(ConsumerBase):
+    """Queue/consumer class for 'direct'."""
+
+    def __init__(self, conf, session, msg_id, callback):
+        """Init a 'direct' queue.
+
+        'session' is the amqp session to use
+        'msg_id' is the msg_id to listen on
+        'callback' is the callback to call when messages are received
+        """
+
+        super(DirectConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (msg_id, msg_id),
+            {"type": "direct"},
+            msg_id,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "exclusive": True,
+                "durable": conf.amqp_durable_queues,
+            })
+
+
+class TopicConsumer(ConsumerBase):
+    """Consumer class for 'topic'."""
+
+    def __init__(self, conf, session, topic, callback, name=None,
+                 exchange_name=None):
+        """Init a 'topic' queue.
+
+        :param session: the amqp session to use
+        :param topic: is the topic to listen on
+        :paramtype topic: str
+        :param callback: the callback to call when messages are received
+        :param name: optional queue name, defaults to topic
+        """
+
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        super(TopicConsumer, self).__init__(
+            session, callback,
+            "%s/%s" % (exchange_name, topic),
+            {}, name or topic,
+            {
+                "auto-delete": conf.amqp_auto_delete,
+                "durable": conf.amqp_durable_queues,
+            })
+
+
+class FanoutConsumer(ConsumerBase):
+    """Consumer class for 'fanout'."""
+
+    def __init__(self, conf, session, topic, callback):
+        """Init a 'fanout' queue.
+
+        'session' is the amqp session to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        """
+        self.conf = conf
+
+        super(FanoutConsumer, self).__init__(
+            session, callback,
+            "%s_fanout" % topic,
+            {"durable": False, "type": "fanout"},
+            "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+            {"exclusive": True})
+
+    def reconnect(self, session):
+        topic = self.get_node_name().rpartition('_fanout')[0]
+        params = {
+            'session': session,
+            'topic': topic,
+            'callback': self.callback,
+        }
+
+        self.__init__(conf=self.conf, **params)
+
+        super(FanoutConsumer, self).reconnect(session)
+
+
+class Publisher(object):
+    """Base Publisher class."""
+
+    def __init__(self, session, node_name, node_opts=None):
+        """Init the Publisher class with the exchange_name, routing_key,
+        and other options
+        """
+        self.sender = None
+        self.session = session
+
+        addr_opts = {
+            "create": "always",
+            "node": {
+                "type": "topic",
+                "x-declare": {
+                    "durable": False,
+                    # auto-delete isn't implemented for exchanges in qpid,
+                    # but put in here anyway
+                    "auto-delete": True,
+                },
+            },
+        }
+        if node_opts:
+            addr_opts["node"]["x-declare"].update(node_opts)
+
+        self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
+
+        self.reconnect(session)
+
+    def reconnect(self, session):
+        """Re-establish the Sender after a reconnection."""
+        self.sender = session.sender(self.address)
+
+    def _pack_json_msg(self, msg):
+        """Qpid cannot serialize dicts containing strings longer than 65535
+           characters.  This function dumps the message content to a JSON
+           string, which Qpid is able to handle.
+
+        :param msg: May be either a Qpid Message object or a bare dict.
+        :returns: A Qpid Message with its content field JSON encoded.
+        """
+        try:
+            msg.content = jsonutils.dumps(msg.content)
+        except AttributeError:
+            # Need to have a Qpid message so we can set the content_type.
+            msg = qpid_messaging.Message(jsonutils.dumps(msg))
+        msg.content_type = JSON_CONTENT_TYPE
+        return msg
+
+    def send(self, msg):
+        """Send a message."""
+        try:
+            # Check if Qpid can encode the message
+            check_msg = msg
+            if not hasattr(check_msg, 'content_type'):
+                check_msg = qpid_messaging.Message(msg)
+            content_type = check_msg.content_type
+            enc, dec = qpid_messaging.message.get_codec(content_type)
+            enc(check_msg.content)
+        except qpid_codec.CodecException:
+            # This means the message couldn't be serialized as a dict.
+            msg = self._pack_json_msg(msg)
+        self.sender.send(msg)
+
+
+class DirectPublisher(Publisher):
+    """Publisher class for 'direct'."""
+    def __init__(self, conf, session, msg_id):
+        """Init a 'direct' publisher."""
+        super(DirectPublisher, self).__init__(session, msg_id,
+                                              {"type": "Direct"})
+
+
+class TopicPublisher(Publisher):
+    """Publisher class for 'topic'."""
+    def __init__(self, conf, session, topic):
+        """init a 'topic' publisher.
+        """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(TopicPublisher, self).__init__(session,
+                                             "%s/%s" % (exchange_name, topic))
+
+
+class FanoutPublisher(Publisher):
+    """Publisher class for 'fanout'."""
+    def __init__(self, conf, session, topic):
+        """init a 'fanout' publisher.
+        """
+        super(FanoutPublisher, self).__init__(
+            session,
+            "%s_fanout" % topic, {"type": "fanout"})
+
+
+class NotifyPublisher(Publisher):
+    """Publisher class for notifications."""
+    def __init__(self, conf, session, topic):
+        """init a 'topic' publisher.
+        """
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(NotifyPublisher, self).__init__(session,
+                                              "%s/%s" % (exchange_name, topic),
+                                              {"durable": True})
+
+
+class Connection(object):
+    """Connection object."""
+
+    pool = None
+
+    def __init__(self, conf, server_params=None):
+        if not qpid_messaging:
+            raise ImportError("Failed to import qpid.messaging")
+
+        self.session = None
+        self.consumers = {}
+        self.consumer_thread = None
+        self.proxy_callbacks = []
+        self.conf = conf
+
+        if server_params and 'hostname' in server_params:
+            # NOTE(russellb) This enables support for cast_to_server.
+            server_params['qpid_hosts'] = [
+                '%s:%d' % (server_params['hostname'],
+                           server_params.get('port', 5672))
+            ]
+
+        params = {
+            'qpid_hosts': self.conf.qpid_hosts,
+            'username': self.conf.qpid_username,
+            'password': self.conf.qpid_password,
+        }
+        params.update(server_params or {})
+
+        self.brokers = params['qpid_hosts']
+        self.username = params['username']
+        self.password = params['password']
+        self.connection_create(self.brokers[0])
+        self.reconnect()
+
+    def connection_create(self, broker):
+        # Create the connection - this does not open the connection
+        self.connection = qpid_messaging.Connection(broker)
+
+        # Check if flags are set and if so set them for the connection
+        # before we call open
+        self.connection.username = self.username
+        self.connection.password = self.password
+
+        self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
+        # Reconnection is done by self.reconnect()
+        self.connection.reconnect = False
+        self.connection.heartbeat = self.conf.qpid_heartbeat
+        self.connection.transport = self.conf.qpid_protocol
+        self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
+
+    def _register_consumer(self, consumer):
+        self.consumers[str(consumer.get_receiver())] = consumer
+
+    def _lookup_consumer(self, receiver):
+        return self.consumers[str(receiver)]
+
+    def reconnect(self):
+        """Handles reconnecting and re-establishing sessions and queues."""
+        attempt = 0
+        delay = 1
+        while True:
+            # Close the session if necessary
+            if self.connection.opened():
+                try:
+                    self.connection.close()
+                except qpid_exceptions.ConnectionError:
+                    pass
+
+            broker = self.brokers[attempt % len(self.brokers)]
+            attempt += 1
+
+            try:
+                self.connection_create(broker)
+                self.connection.open()
+            except qpid_exceptions.ConnectionError as e:
+                msg_dict = dict(e=e, delay=delay)
+                msg = _("Unable to connect to AMQP server: %(e)s. "
+                        "Sleeping %(delay)s seconds") % msg_dict
+                LOG.error(msg)
+                time.sleep(delay)
+                delay = min(2 * delay, 60)
+            else:
+                LOG.info(_('Connected to AMQP server on %s'), broker)
+                break
+
+        self.session = self.connection.session()
+
+        if self.consumers:
+            consumers = self.consumers
+            self.consumers = {}
+
+            for consumer in consumers.itervalues():
+                consumer.reconnect(self.session)
+                self._register_consumer(consumer)
+
+            LOG.debug(_("Re-established AMQP queues"))
+
+    def ensure(self, error_callback, method, *args, **kwargs):
+        while True:
+            try:
+                return method(*args, **kwargs)
+            except (qpid_exceptions.Empty,
+                    qpid_exceptions.ConnectionError) as e:
+                if error_callback:
+                    error_callback(e)
+                self.reconnect()
+
+    def close(self):
+        """Close/release this connection."""
+        self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
+        try:
+            self.connection.close()
+        except Exception:
+            # NOTE(dripton) Logging exceptions that happen during cleanup just
+            # causes confusion; there's really nothing useful we can do with
+            # them.
+            pass
+        self.connection = None
+
+    def reset(self):
+        """Reset a connection so it can be used again."""
+        self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
+        self.session.close()
+        self.session = self.connection.session()
+        self.consumers = {}
+
+    def declare_consumer(self, consumer_cls, topic, callback):
+        """Create a Consumer using the class that was passed in and
+        add it to our list of consumers
+        """
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+                      "%(err_str)s") % log_info)
+
+        def _declare_consumer():
+            consumer = consumer_cls(self.conf, self.session, topic, callback)
+            self._register_consumer(consumer)
+            return consumer
+
+        return self.ensure(_connect_error, _declare_consumer)
+
+    def iterconsume(self, limit=None, timeout=None):
+        """Return an iterator that will consume from all queues/consumers."""
+
+        def _error_callback(exc):
+            if isinstance(exc, qpid_exceptions.Empty):
+                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                          str(exc))
+                raise rpc_common.Timeout()
+            else:
+                LOG.exception(_('Failed to consume message from queue: %s') %
+                              str(exc))
+
+        def _consume():
+            nxt_receiver = self.session.next_receiver(timeout=timeout)
+            try:
+                self._lookup_consumer(nxt_receiver).consume()
+            except Exception:
+                LOG.exception(_("Error processing message.  Skipping it."))
+
+        for iteration in itertools.count(0):
+            if limit and iteration >= limit:
+                raise StopIteration
+            yield self.ensure(_error_callback, _consume)
+
+    def cancel_consumer_thread(self):
+        """Cancel a consumer thread."""
+        if self.consumer_thread is not None:
+            self.consumer_thread.kill()
+            try:
+                self.consumer_thread.wait()
+            except greenlet.GreenletExit:
+                pass
+            self.consumer_thread = None
+
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
+    def publisher_send(self, cls, topic, msg):
+        """Send to a publisher based on the publisher class."""
+
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.exception(_("Failed to publish message to topic "
+                          "'%(topic)s': %(err_str)s") % log_info)
+
+        def _publisher_send():
+            publisher = cls(self.conf, self.session, topic)
+            publisher.send(msg)
+
+        return self.ensure(_connect_error, _publisher_send)
+
+    def declare_direct_consumer(self, topic, callback):
+        """Create a 'direct' queue.
+        In nova's use, this is generally a msg_id queue used for
+        responses for call/multicall
+        """
+        self.declare_consumer(DirectConsumer, topic, callback)
+
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None):
+        """Create a 'topic' consumer."""
+        self.declare_consumer(functools.partial(TopicConsumer,
+                                                name=queue_name,
+                                                exchange_name=exchange_name,
+                                                ),
+                              topic, callback)
+
+    def declare_fanout_consumer(self, topic, callback):
+        """Create a 'fanout' consumer."""
+        self.declare_consumer(FanoutConsumer, topic, callback)
+
+    def direct_send(self, msg_id, msg):
+        """Send a 'direct' message."""
+        self.publisher_send(DirectPublisher, msg_id, msg)
+
+    def topic_send(self, topic, msg, timeout=None):
+        """Send a 'topic' message."""
+        #
+        # We want to create a message with attributes, e.g. a TTL. We
+        # don't really need to keep 'msg' in its JSON format any longer
+        # so let's create an actual qpid message here and get some
+        # value-add on the go.
+        #
+        # WARNING: Request timeout happens to be in the same units as
+        # qpid's TTL (seconds). If this changes in the future, then this
+        # will need to be altered accordingly.
+        #
+        qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+        self.publisher_send(TopicPublisher, topic, qpid_message)
+
+    def fanout_send(self, topic, msg):
+        """Send a 'fanout' message."""
+        self.publisher_send(FanoutPublisher, topic, msg)
+
+    def notify_send(self, topic, msg, **kwargs):
+        """Send a notify message on a topic."""
+        self.publisher_send(NotifyPublisher, topic, msg)
+
+    def consume(self, limit=None):
+        """Consume from all queues/consumers."""
+        it = self.iterconsume(limit=limit)
+        while True:
+            try:
+                it.next()
+            except StopIteration:
+                return
+
+    def consume_in_thread(self):
+        """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
+        def _consumer_thread():
+            try:
+                self.consume()
+            except greenlet.GreenletExit:
+                return
+        if self.consumer_thread is None:
+            self.consumer_thread = eventlet.spawn(_consumer_thread)
+        return self.consumer_thread
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer that calls a method in a proxy object."""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
+
+        if fanout:
+            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
+        else:
+            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
+
+        self._register_consumer(consumer)
+
+        return consumer
+
+    def create_worker(self, topic, proxy, pool_name):
+        """Create a worker that calls a method in a proxy object."""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
+
+        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
+                                 name=pool_name)
+
+        self._register_consumer(consumer)
+
+        return consumer
+
+    def join_consumer_pool(self, callback, pool_name, topic,
+                           exchange_name=None, ack_on_error=True):
+        """Register as a member of a group of consumers for a given topic from
+        the specified exchange.
+
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+        """
+        callback_wrapper = rpc_amqp.CallbackWrapper(
+            conf=self.conf,
+            callback=callback,
+            connection_pool=rpc_amqp.get_connection_pool(self.conf,
+                                                         Connection),
+        )
+        self.proxy_callbacks.append(callback_wrapper)
+
+        consumer = TopicConsumer(conf=self.conf,
+                                 session=self.session,
+                                 topic=topic,
+                                 callback=callback_wrapper,
+                                 name=pool_name,
+                                 exchange_name=exchange_name)
+
+        self._register_consumer(consumer)
+        return consumer
+
+
+def create_connection(conf, new=True):
+    """Create a connection."""
+    return rpc_amqp.create_connection(
+        conf, new,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+    """Make a call that returns multiple times."""
+    return rpc_amqp.multicall(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def call(conf, context, topic, msg, timeout=None):
+    """Sends a message on a topic and wait for a response."""
+    return rpc_amqp.call(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast(conf, context, topic, msg):
+    """Sends a message on a topic without waiting for a response."""
+    return rpc_amqp.cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast(conf, context, topic, msg):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    return rpc_amqp.fanout_cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast_to_server(conf, context, server_params, topic, msg):
+    """Sends a message on a topic to a specific server."""
+    return rpc_amqp.cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
+    """Sends a message on a fanout exchange to a specific server."""
+    return rpc_amqp.fanout_cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def notify(conf, context, topic, msg, envelope):
+    """Sends a notification event on a topic."""
+    return rpc_amqp.notify(conf, context, topic, msg,
+                           rpc_amqp.get_connection_pool(conf, Connection),
+                           envelope)
+
+
+def cleanup():
+    return rpc_amqp.cleanup(Connection.pool)
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
new file mode 100644
index 000000000..6b1ae9388
--- /dev/null
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -0,0 +1,865 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+#    Copyright 2011 OpenStack Foundation
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import functools
+import itertools
+import socket
+import ssl
+import time
+import uuid
+
+import eventlet
+import greenlet
+import kombu
+import kombu.connection
+import kombu.entity
+import kombu.messaging
+from oslo.config import cfg
+
+from openstack.common import excutils
+from openstack.common.gettextutils import _  # noqa
+from openstack.common import network_utils
+from openstack.common.rpc import amqp as rpc_amqp
+from openstack.common.rpc import common as rpc_common
+from openstack.common import sslutils
+
+kombu_opts = [
+    cfg.StrOpt('kombu_ssl_version',
+               default='',
+               help='SSL version to use (valid only if SSL enabled). '
+                    'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
+                    'be available on some distributions'
+               ),
+    cfg.StrOpt('kombu_ssl_keyfile',
+               default='',
+               help='SSL key file (valid only if SSL enabled)'),
+    cfg.StrOpt('kombu_ssl_certfile',
+               default='',
+               help='SSL cert file (valid only if SSL enabled)'),
+    cfg.StrOpt('kombu_ssl_ca_certs',
+               default='',
+               help=('SSL certification authority file '
+                     '(valid only if SSL enabled)')),
+    cfg.StrOpt('rabbit_host',
+               default='localhost',
+               help='The RabbitMQ broker address where a single node is used'),
+    cfg.IntOpt('rabbit_port',
+               default=5672,
+               help='The RabbitMQ broker port where a single node is used'),
+    cfg.ListOpt('rabbit_hosts',
+                default=['$rabbit_host:$rabbit_port'],
+                help='RabbitMQ HA cluster host:port pairs'),
+    cfg.BoolOpt('rabbit_use_ssl',
+                default=False,
+                help='connect over SSL for RabbitMQ'),
+    cfg.StrOpt('rabbit_userid',
+               default='guest',
+               help='the RabbitMQ userid'),
+    cfg.StrOpt('rabbit_password',
+               default='guest',
+               help='the RabbitMQ password',
+               secret=True),
+    cfg.StrOpt('rabbit_virtual_host',
+               default='/',
+               help='the RabbitMQ virtual host'),
+    cfg.IntOpt('rabbit_retry_interval',
+               default=1,
+               help='how frequently to retry connecting with RabbitMQ'),
+    cfg.IntOpt('rabbit_retry_backoff',
+               default=2,
+               help='how long to backoff for between retries when connecting '
+                    'to RabbitMQ'),
+    cfg.IntOpt('rabbit_max_retries',
+               default=0,
+               help='maximum retries with trying to connect to RabbitMQ '
+                    '(the default of 0 implies an infinite retry count)'),
+    cfg.BoolOpt('rabbit_ha_queues',
+                default=False,
+                help='use H/A queues in RabbitMQ (x-ha-policy: all).'
+                     'You need to wipe RabbitMQ database when '
+                     'changing this option.'),
+
+]
+
+cfg.CONF.register_opts(kombu_opts)
+
+LOG = rpc_common.LOG
+
+
+def _get_queue_arguments(conf):
+    """Construct the arguments for declaring a queue.
+
+    If the rabbit_ha_queues option is set, we declare a mirrored queue
+    as described here:
+
+      http://www.rabbitmq.com/ha.html
+
+    Setting x-ha-policy to all means that the queue will be mirrored
+    to all nodes in the cluster.
+    """
+    return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
+
+
+class ConsumerBase(object):
+    """Consumer base class."""
+
+    def __init__(self, channel, callback, tag, **kwargs):
+        """Declare a queue on an amqp channel.
+
+        'channel' is the amqp channel to use
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        queue name, exchange name, and other kombu options are
+        passed in here as a dictionary.
+        """
+        self.callback = callback
+        self.tag = str(tag)
+        self.kwargs = kwargs
+        self.queue = None
+        self.ack_on_error = kwargs.get('ack_on_error', True)
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-declare the queue after a rabbit reconnect."""
+        self.channel = channel
+        self.kwargs['channel'] = channel
+        self.queue = kombu.entity.Queue(**self.kwargs)
+        self.queue.declare()
+
+    def _callback_handler(self, message, callback):
+        """Call callback with deserialized message.
+
+        Messages that are processed without exception are ack'ed.
+
+        If the message processing generates an exception, it will be
+        ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
+        Rejection is better than waiting for the message to timeout.
+        Rejected messages are immediately requeued.
+        """
+
+        ack_msg = False
+        try:
+            msg = rpc_common.deserialize_msg(message.payload)
+            callback(msg)
+            ack_msg = True
+        except Exception:
+            if self.ack_on_error:
+                ack_msg = True
+                LOG.exception(_("Failed to process message"
+                                " ... skipping it."))
+            else:
+                LOG.exception(_("Failed to process message"
+                                " ... will requeue."))
+        finally:
+            if ack_msg:
+                message.ack()
+            else:
+                message.reject()
+
+    def consume(self, *args, **kwargs):
+        """Actually declare the consumer on the amqp channel.  This will
+        start the flow of messages from the queue.  Using the
+        Connection.iterconsume() iterator will process the messages,
+        calling the appropriate callback.
+
+        If a callback is specified in kwargs, use that.  Otherwise,
+        use the callback passed during __init__()
+
+        If kwargs['nowait'] is True, then this call will block until
+        a message is read.
+
+        """
+
+        options = {'consumer_tag': self.tag}
+        options['nowait'] = kwargs.get('nowait', False)
+        callback = kwargs.get('callback', self.callback)
+        if not callback:
+            raise ValueError("No callback defined")
+
+        def _callback(raw_message):
+            message = self.channel.message_to_python(raw_message)
+            self._callback_handler(message, callback)
+
+        self.queue.consume(*args, callback=_callback, **options)
+
+    def cancel(self):
+        """Cancel the consuming from the queue, if it has started."""
+        try:
+            self.queue.cancel(self.tag)
+        except KeyError as e:
+            # NOTE(comstud): Kludge to get around a amqplib bug
+            if str(e) != "u'%s'" % self.tag:
+                raise
+        self.queue = None
+
+
+class DirectConsumer(ConsumerBase):
+    """Queue/consumer class for 'direct'."""
+
+    def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
+        """Init a 'direct' queue.
+
+        'channel' is the amqp channel to use
+        'msg_id' is the msg_id to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        # Default options
+        options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(name=msg_id,
+                                         type='direct',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(DirectConsumer, self).__init__(channel,
+                                             callback,
+                                             tag,
+                                             name=msg_id,
+                                             exchange=exchange,
+                                             routing_key=msg_id,
+                                             **options)
+
+
+class TopicConsumer(ConsumerBase):
+    """Consumer class for 'topic'."""
+
+    def __init__(self, conf, channel, topic, callback, tag, name=None,
+                 exchange_name=None, **kwargs):
+        """Init a 'topic' queue.
+
+        :param channel: the amqp channel to use
+        :param topic: the topic to listen on
+        :paramtype topic: str
+        :param callback: the callback to call when messages are received
+        :param tag: a unique ID for the consumer on the channel
+        :param name: optional queue name, defaults to topic
+        :paramtype name: str
+
+        Other kombu options may be passed as keyword arguments
+        """
+        # Default options
+        options = {'durable': conf.amqp_durable_queues,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': conf.amqp_auto_delete,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
+        exchange = kombu.entity.Exchange(name=exchange_name,
+                                         type='topic',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(TopicConsumer, self).__init__(channel,
+                                            callback,
+                                            tag,
+                                            name=name or topic,
+                                            exchange=exchange,
+                                            routing_key=topic,
+                                            **options)
+
+
+class FanoutConsumer(ConsumerBase):
+    """Consumer class for 'fanout'."""
+
+    def __init__(self, conf, channel, topic, callback, tag, **kwargs):
+        """Init a 'fanout' queue.
+
+        'channel' is the amqp channel to use
+        'topic' is the topic to listen on
+        'callback' is the callback to call when messages are received
+        'tag' is a unique ID for the consumer on the channel
+
+        Other kombu options may be passed
+        """
+        unique = uuid.uuid4().hex
+        exchange_name = '%s_fanout' % topic
+        queue_name = '%s_fanout_%s' % (topic, unique)
+
+        # Default options
+        options = {'durable': False,
+                   'queue_arguments': _get_queue_arguments(conf),
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
+                                         durable=options['durable'],
+                                         auto_delete=options['auto_delete'])
+        super(FanoutConsumer, self).__init__(channel, callback, tag,
+                                             name=queue_name,
+                                             exchange=exchange,
+                                             routing_key=topic,
+                                             **options)
+
+
+class Publisher(object):
+    """Base Publisher class."""
+
+    def __init__(self, channel, exchange_name, routing_key, **kwargs):
+        """Init the Publisher class with the exchange_name, routing_key,
+        and other options
+        """
+        self.exchange_name = exchange_name
+        self.routing_key = routing_key
+        self.kwargs = kwargs
+        self.reconnect(channel)
+
+    def reconnect(self, channel):
+        """Re-establish the Producer after a rabbit reconnection."""
+        self.exchange = kombu.entity.Exchange(name=self.exchange_name,
+                                              **self.kwargs)
+        self.producer = kombu.messaging.Producer(exchange=self.exchange,
+                                                 channel=channel,
+                                                 routing_key=self.routing_key)
+
+    def send(self, msg, timeout=None):
+        """Send a message."""
+        if timeout:
+            #
+            # AMQP TTL is in milliseconds when set in the header.
+            #
+            self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+        else:
+            self.producer.publish(msg)
+
+
+class DirectPublisher(Publisher):
+    """Publisher class for 'direct'."""
+    def __init__(self, conf, channel, msg_id, **kwargs):
+        """init a 'direct' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+
+        options = {'durable': False,
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
+                                              type='direct', **options)
+
+
+class TopicPublisher(Publisher):
+    """Publisher class for 'topic'."""
+    def __init__(self, conf, channel, topic, **kwargs):
+        """init a 'topic' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': conf.amqp_durable_queues,
+                   'auto_delete': conf.amqp_auto_delete,
+                   'exclusive': False}
+        options.update(kwargs)
+        exchange_name = rpc_amqp.get_control_exchange(conf)
+        super(TopicPublisher, self).__init__(channel,
+                                             exchange_name,
+                                             topic,
+                                             type='topic',
+                                             **options)
+
+
+class FanoutPublisher(Publisher):
+    """Publisher class for 'fanout'."""
+    def __init__(self, conf, channel, topic, **kwargs):
+        """init a 'fanout' publisher.
+
+        Kombu options may be passed as keyword args to override defaults
+        """
+        options = {'durable': False,
+                   'auto_delete': True,
+                   'exclusive': False}
+        options.update(kwargs)
+        super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
+                                              None, type='fanout', **options)
+
+
+class NotifyPublisher(TopicPublisher):
+    """Publisher class for 'notify'."""
+
+    def __init__(self, conf, channel, topic, **kwargs):
+        self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
+        self.queue_arguments = _get_queue_arguments(conf)
+        super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
+
+    def reconnect(self, channel):
+        super(NotifyPublisher, self).reconnect(channel)
+
+        # NOTE(jerdfelt): Normally the consumer would create the queue, but
+        # we do this to ensure that messages don't get dropped if the
+        # consumer is started after we do
+        queue = kombu.entity.Queue(channel=channel,
+                                   exchange=self.exchange,
+                                   durable=self.durable,
+                                   name=self.routing_key,
+                                   routing_key=self.routing_key,
+                                   queue_arguments=self.queue_arguments)
+        queue.declare()
+
+
+class Connection(object):
+    """Connection object."""
+
+    pool = None
+
+    def __init__(self, conf, server_params=None):
+        self.consumers = []
+        self.consumer_thread = None
+        self.proxy_callbacks = []
+        self.conf = conf
+        self.max_retries = self.conf.rabbit_max_retries
+        # Try forever?
+        if self.max_retries <= 0:
+            self.max_retries = None
+        self.interval_start = self.conf.rabbit_retry_interval
+        self.interval_stepping = self.conf.rabbit_retry_backoff
+        # max retry-interval = 30 seconds
+        self.interval_max = 30
+        self.memory_transport = False
+
+        if server_params is None:
+            server_params = {}
+        # Keys to translate from server_params to kombu params
+        server_params_to_kombu_params = {'username': 'userid'}
+
+        ssl_params = self._fetch_ssl_params()
+        params_list = []
+        for adr in self.conf.rabbit_hosts:
+            hostname, port = network_utils.parse_host_port(
+                adr, default_port=self.conf.rabbit_port)
+
+            params = {
+                'hostname': hostname,
+                'port': port,
+                'userid': self.conf.rabbit_userid,
+                'password': self.conf.rabbit_password,
+                'virtual_host': self.conf.rabbit_virtual_host,
+            }
+
+            for sp_key, value in server_params.iteritems():
+                p_key = server_params_to_kombu_params.get(sp_key, sp_key)
+                params[p_key] = value
+
+            if self.conf.fake_rabbit:
+                params['transport'] = 'memory'
+            if self.conf.rabbit_use_ssl:
+                params['ssl'] = ssl_params
+
+            params_list.append(params)
+
+        self.params_list = params_list
+
+        self.memory_transport = self.conf.fake_rabbit
+
+        self.connection = None
+        self.reconnect()
+
+    def _fetch_ssl_params(self):
+        """Handles fetching what ssl params should be used for the connection
+        (if any).
+        """
+        ssl_params = dict()
+
+        # http://docs.python.org/library/ssl.html - ssl.wrap_socket
+        if self.conf.kombu_ssl_version:
+            ssl_params['ssl_version'] = sslutils.validate_ssl_version(
+                self.conf.kombu_ssl_version)
+        if self.conf.kombu_ssl_keyfile:
+            ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile
+        if self.conf.kombu_ssl_certfile:
+            ssl_params['certfile'] = self.conf.kombu_ssl_certfile
+        if self.conf.kombu_ssl_ca_certs:
+            ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs
+            # We might want to allow variations in the
+            # future with this?
+            ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
+
+        if not ssl_params:
+            # Just have the default behavior
+            return True
+        else:
+            # Return the extended behavior
+            return ssl_params
+
+    def _connect(self, params):
+        """Connect to rabbit.  Re-establish any queues that may have
+        been declared before if we are reconnecting.  Exceptions should
+        be handled by the caller.
+        """
+        if self.connection:
+            LOG.info(_("Reconnecting to AMQP server on "
+                     "%(hostname)s:%(port)d") % params)
+            try:
+                self.connection.release()
+            except self.connection_errors:
+                pass
+            # Setting this in case the next statement fails, though
+            # it shouldn't be doing any network operations, yet.
+            self.connection = None
+        self.connection = kombu.connection.BrokerConnection(**params)
+        self.connection_errors = self.connection.connection_errors
+        if self.memory_transport:
+            # Kludge to speed up tests.
+            self.connection.transport.polling_interval = 0.0
+        self.consumer_num = itertools.count(1)
+        self.connection.connect()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        for consumer in self.consumers:
+            consumer.reconnect(self.channel)
+        LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
+                 params)
+
+    def reconnect(self):
+        """Handles reconnecting and re-establishing queues.
+        Will retry up to self.max_retries number of times.
+        self.max_retries = 0 means to retry forever.
+        Sleep between tries, starting at self.interval_start
+        seconds, backing off self.interval_stepping number of seconds
+        each attempt.
+        """
+
+        attempt = 0
+        while True:
+            params = self.params_list[attempt % len(self.params_list)]
+            attempt += 1
+            try:
+                self._connect(params)
+                return
+            except (IOError, self.connection_errors) as e:
+                pass
+            except Exception as e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+
+            log_info = {}
+            log_info['err_str'] = str(e)
+            log_info['max_retries'] = self.max_retries
+            log_info.update(params)
+
+            if self.max_retries and attempt == self.max_retries:
+                msg = _('Unable to connect to AMQP server on '
+                        '%(hostname)s:%(port)d after %(max_retries)d '
+                        'tries: %(err_str)s') % log_info
+                LOG.error(msg)
+                raise rpc_common.RPCException(msg)
+
+            if attempt == 1:
+                sleep_time = self.interval_start or 1
+            elif attempt > 1:
+                sleep_time += self.interval_stepping
+            if self.interval_max:
+                sleep_time = min(sleep_time, self.interval_max)
+
+            log_info['sleep_time'] = sleep_time
+            LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
+                        'unreachable: %(err_str)s. Trying again in '
+                        '%(sleep_time)d seconds.') % log_info)
+            time.sleep(sleep_time)
+
+    def ensure(self, error_callback, method, *args, **kwargs):
+        while True:
+            try:
+                return method(*args, **kwargs)
+            except (self.connection_errors, socket.timeout, IOError) as e:
+                if error_callback:
+                    error_callback(e)
+            except Exception as e:
+                # NOTE(comstud): Unfortunately it's possible for amqplib
+                # to return an error not covered by its transport
+                # connection_errors in the case of a timeout waiting for
+                # a protocol response.  (See paste link in LP888621)
+                # So, we check all exceptions for 'timeout' in them
+                # and try to reconnect in this case.
+                if 'timeout' not in str(e):
+                    raise
+                if error_callback:
+                    error_callback(e)
+            self.reconnect()
+
+    def get_channel(self):
+        """Convenience call for bin/clear_rabbit_queues."""
+        return self.channel
+
+    def close(self):
+        """Close/release this connection."""
+        self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
+        self.connection.release()
+        self.connection = None
+
+    def reset(self):
+        """Reset a connection so it can be used again."""
+        self.cancel_consumer_thread()
+        self.wait_on_proxy_callbacks()
+        self.channel.close()
+        self.channel = self.connection.channel()
+        # work around 'memory' transport bug in 1.1.3
+        if self.memory_transport:
+            self.channel._new_queue('ae.undeliver')
+        self.consumers = []
+
+    def declare_consumer(self, consumer_cls, topic, callback):
+        """Create a Consumer using the class that was passed in and
+        add it to our list of consumers
+        """
+
+        def _connect_error(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
+                      "%(err_str)s") % log_info)
+
+        def _declare_consumer():
+            consumer = consumer_cls(self.conf, self.channel, topic, callback,
+                                    self.consumer_num.next())
+            self.consumers.append(consumer)
+            return consumer
+
+        return self.ensure(_connect_error, _declare_consumer)
+
+    def iterconsume(self, limit=None, timeout=None):
+        """Return an iterator that will consume from all queues/consumers."""
+
+        info = {'do_consume': True}
+
+        def _error_callback(exc):
+            if isinstance(exc, socket.timeout):
+                LOG.debug(_('Timed out waiting for RPC response: %s') %
+                          str(exc))
+                raise rpc_common.Timeout()
+            else:
+                LOG.exception(_('Failed to consume message from queue: %s') %
+                              str(exc))
+                info['do_consume'] = True
+
+        def _consume():
+            if info['do_consume']:
+                queues_head = self.consumers[:-1]  # not fanout.
+                queues_tail = self.consumers[-1]  # fanout
+                for queue in queues_head:
+                    queue.consume(nowait=True)
+                queues_tail.consume(nowait=False)
+                info['do_consume'] = False
+            return self.connection.drain_events(timeout=timeout)
+
+        for iteration in itertools.count(0):
+            if limit and iteration >= limit:
+                raise StopIteration
+            yield self.ensure(_error_callback, _consume)
+
+    def cancel_consumer_thread(self):
+        """Cancel a consumer thread."""
+        if self.consumer_thread is not None:
+            self.consumer_thread.kill()
+            try:
+                self.consumer_thread.wait()
+            except greenlet.GreenletExit:
+                pass
+            self.consumer_thread = None
+
+    def wait_on_proxy_callbacks(self):
+        """Wait for all proxy callback threads to exit."""
+        for proxy_cb in self.proxy_callbacks:
+            proxy_cb.wait()
+
+    def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
+        """Send to a publisher based on the publisher class."""
+
+        def _error_callback(exc):
+            log_info = {'topic': topic, 'err_str': str(exc)}
+            LOG.exception(_("Failed to publish message to topic "
+                          "'%(topic)s': %(err_str)s") % log_info)
+
+        def _publish():
+            publisher = cls(self.conf, self.channel, topic, **kwargs)
+            publisher.send(msg, timeout)
+
+        self.ensure(_error_callback, _publish)
+
+    def declare_direct_consumer(self, topic, callback):
+        """Create a 'direct' queue.
+        In nova's use, this is generally a msg_id queue used for
+        responses for call/multicall
+        """
+        self.declare_consumer(DirectConsumer, topic, callback)
+
+    def declare_topic_consumer(self, topic, callback=None, queue_name=None,
+                               exchange_name=None, ack_on_error=True):
+        """Create a 'topic' consumer."""
+        self.declare_consumer(functools.partial(TopicConsumer,
+                                                name=queue_name,
+                                                exchange_name=exchange_name,
+                                                ack_on_error=ack_on_error,
+                                                ),
+                              topic, callback)
+
+    def declare_fanout_consumer(self, topic, callback):
+        """Create a 'fanout' consumer."""
+        self.declare_consumer(FanoutConsumer, topic, callback)
+
+    def direct_send(self, msg_id, msg):
+        """Send a 'direct' message."""
+        self.publisher_send(DirectPublisher, msg_id, msg)
+
+    def topic_send(self, topic, msg, timeout=None):
+        """Send a 'topic' message."""
+        self.publisher_send(TopicPublisher, topic, msg, timeout)
+
+    def fanout_send(self, topic, msg):
+        """Send a 'fanout' message."""
+        self.publisher_send(FanoutPublisher, topic, msg)
+
+    def notify_send(self, topic, msg, **kwargs):
+        """Send a notify message on a topic."""
+        self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
+
+    def consume(self, limit=None):
+        """Consume from all queues/consumers."""
+        it = self.iterconsume(limit=limit)
+        while True:
+            try:
+                it.next()
+            except StopIteration:
+                return
+
+    def consume_in_thread(self):
+        """Consumer from all queues/consumers in a greenthread."""
+        @excutils.forever_retry_uncaught_exceptions
+        def _consumer_thread():
+            try:
+                self.consume()
+            except greenlet.GreenletExit:
+                return
+        if self.consumer_thread is None:
+            self.consumer_thread = eventlet.spawn(_consumer_thread)
+        return self.consumer_thread
+
+    def create_consumer(self, topic, proxy, fanout=False):
+        """Create a consumer that calls a method in a proxy object."""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
+
+        if fanout:
+            self.declare_fanout_consumer(topic, proxy_cb)
+        else:
+            self.declare_topic_consumer(topic, proxy_cb)
+
+    def create_worker(self, topic, proxy, pool_name):
+        """Create a worker that calls a method in a proxy object."""
+        proxy_cb = rpc_amqp.ProxyCallback(
+            self.conf, proxy,
+            rpc_amqp.get_connection_pool(self.conf, Connection))
+        self.proxy_callbacks.append(proxy_cb)
+        self.declare_topic_consumer(topic, proxy_cb, pool_name)
+
+    def join_consumer_pool(self, callback, pool_name, topic,
+                           exchange_name=None, ack_on_error=True):
+        """Register as a member of a group of consumers for a given topic from
+        the specified exchange.
+
+        Exactly one member of a given pool will receive each message.
+
+        A message will be delivered to multiple pools, if more than
+        one is created.
+        """
+        callback_wrapper = rpc_amqp.CallbackWrapper(
+            conf=self.conf,
+            callback=callback,
+            connection_pool=rpc_amqp.get_connection_pool(self.conf,
+                                                         Connection),
+        )
+        self.proxy_callbacks.append(callback_wrapper)
+        self.declare_topic_consumer(
+            queue_name=pool_name,
+            topic=topic,
+            exchange_name=exchange_name,
+            callback=callback_wrapper,
+            ack_on_error=ack_on_error,
+        )
+
+
+def create_connection(conf, new=True):
+    """Create a connection."""
+    return rpc_amqp.create_connection(
+        conf, new,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def multicall(conf, context, topic, msg, timeout=None):
+    """Make a call that returns multiple times."""
+    return rpc_amqp.multicall(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def call(conf, context, topic, msg, timeout=None):
+    """Sends a message on a topic and wait for a response."""
+    return rpc_amqp.call(
+        conf, context, topic, msg, timeout,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast(conf, context, topic, msg):
+    """Sends a message on a topic without waiting for a response."""
+    return rpc_amqp.cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast(conf, context, topic, msg):
+    """Sends a message on a fanout exchange without waiting for a response."""
+    return rpc_amqp.fanout_cast(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def cast_to_server(conf, context, server_params, topic, msg):
+    """Sends a message on a topic to a specific server."""
+    return rpc_amqp.cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def fanout_cast_to_server(conf, context, server_params, topic, msg):
+    """Sends a message on a fanout exchange to a specific server."""
+    return rpc_amqp.fanout_cast_to_server(
+        conf, context, server_params, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection))
+
+
+def notify(conf, context, topic, msg, envelope):
+    """Sends a notification event on a topic."""
+    return rpc_amqp.notify(
+        conf, context, topic, msg,
+        rpc_amqp.get_connection_pool(conf, Connection),
+        envelope)
+
+
+def cleanup():
+    return rpc_amqp.cleanup(Connection.pool)