diff --git a/oslo/messaging/_drivers/amqp.py b/oslo/messaging/_drivers/amqp.py
index d6ee03a17..8c073bc5e 100644
--- a/oslo/messaging/_drivers/amqp.py
+++ b/oslo/messaging/_drivers/amqp.py
@@ -24,19 +24,14 @@ uses AMQP, but is deprecated and predates this code.
 """
 
 import collections
-import inspect
 import logging
-import sys
 import threading
 import uuid
 
-from eventlet import greenpool
-from eventlet import queue
 from oslo.config import cfg
 
 from oslo.messaging._drivers import common as rpc_common
 from oslo.messaging._drivers import pool
-from oslo.messaging.openstack.common import excutils
 
 # FIXME(markmc): remove this
 _ = lambda s: s
@@ -338,275 +333,5 @@ def _add_unique_id(msg):
     LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
 
 
-class _ThreadPoolWithWait(object):
-    """Base class for a delayed invocation manager.
-
-    Used by the Connection class to start up green threads
-    to handle incoming messages.
-    """
-
-    def __init__(self, conf, connection_pool):
-        self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size)
-        self.connection_pool = connection_pool
-        self.conf = conf
-
-    def wait(self):
-        """Wait for all callback threads to exit."""
-        self.pool.waitall()
-
-
-class CallbackWrapper(_ThreadPoolWithWait):
-    """Wraps a straight callback.
-
-    Allows it to be invoked in a green thread.
-    """
-
-    def __init__(self, conf, callback, connection_pool):
-        """Initiates CallbackWrapper object.
-
-        :param conf: cfg.CONF instance
-        :param callback: a callable (probably a function)
-        :param connection_pool: connection pool as returned by
-                                get_connection_pool()
-        """
-        super(CallbackWrapper, self).__init__(
-            conf=conf,
-            connection_pool=connection_pool,
-        )
-        self.callback = callback
-
-    def __call__(self, message_data):
-        self.pool.spawn_n(self.callback, message_data)
-
-
-class ProxyCallback(_ThreadPoolWithWait):
-    """Calls methods on a proxy object based on method and args."""
-
-    def __init__(self, conf, proxy, connection_pool):
-        super(ProxyCallback, self).__init__(
-            conf=conf,
-            connection_pool=connection_pool,
-        )
-        self.proxy = proxy
-        self.msg_id_cache = _MsgIdCache()
-
-    def __call__(self, message_data):
-        """Consumer callback to call a method on a proxy object.
-
-        Parses the message for validity and fires off a thread to call the
-        proxy object method.
-
-        Message data should be a dictionary with two keys:
-            method: string representing the method to call
-            args: dictionary of arg: value
-
-        Example: {'method': 'echo', 'args': {'value': 42}}
-
-        """
-        # It is important to clear the context here, because at this point
-        # the previous context is stored in local.store.context
-        #if hasattr(local.store, 'context'):
-        #    del local.store.context
-        rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
-        self.msg_id_cache.check_duplicate_message(message_data)
-        ctxt = unpack_context(self.conf, message_data)
-        method = message_data.get('method')
-        args = message_data.get('args', {})
-        version = message_data.get('version')
-        namespace = message_data.get('namespace')
-        if not method:
-            LOG.warn(_('no method for message: %s') % message_data)
-            ctxt.reply(_('No method for message: %s') % message_data,
-                       connection_pool=self.connection_pool)
-            return
-        self.pool.spawn_n(self._process_data, ctxt, version, method,
-                          namespace, args)
-
-    def _process_data(self, ctxt, version, method, namespace, args):
-        """Process a message in a new thread.
-
-        If the proxy object we have has a dispatch method
-        (see rpc.dispatcher.RpcDispatcher), pass it the version,
-        method, and args and let it dispatch as appropriate.  If not, use
-        the old behavior of magically calling the specified method on the
-        proxy we have here.
-        """
-        ctxt.update_store()
-        try:
-            rval = self.proxy.dispatch(ctxt, version, method, namespace,
-                                       **args)
-            # Check if the result was a generator
-            if inspect.isgenerator(rval):
-                for x in rval:
-                    ctxt.reply(x, None, connection_pool=self.connection_pool)
-            else:
-                ctxt.reply(rval, None, connection_pool=self.connection_pool)
-            # This final None tells multicall that it is done.
-            ctxt.reply(ending=True, connection_pool=self.connection_pool)
-        except rpc_common.ClientException as e:
-            LOG.debug(_('Expected exception during message handling (%s)') %
-                      e._exc_info[1])
-            ctxt.reply(None, e._exc_info,
-                       connection_pool=self.connection_pool,
-                       log_failure=False)
-        except Exception:
-            # sys.exc_info() is deleted by LOG.exception().
-            exc_info = sys.exc_info()
-            LOG.error(_('Exception during message handling'),
-                      exc_info=exc_info)
-            ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
-
-
-class MulticallProxyWaiter(object):
-    def __init__(self, conf, msg_id, timeout, connection_pool):
-        self._msg_id = msg_id
-        self._timeout = timeout or conf.rpc_response_timeout
-        self._reply_proxy = connection_pool.reply_proxy
-        self._done = False
-        self._got_ending = False
-        self._conf = conf
-        self._dataqueue = queue.LightQueue()
-        # Add this caller to the reply proxy's call_waiters
-        self._reply_proxy.add_call_waiter(self, self._msg_id)
-        self.msg_id_cache = _MsgIdCache()
-
-    def put(self, data):
-        self._dataqueue.put(data)
-
-    def done(self):
-        if self._done:
-            return
-        self._done = True
-        # Remove this caller from reply proxy's call_waiters
-        self._reply_proxy.del_call_waiter(self._msg_id)
-
-    def _process_data(self, data):
-        result = None
-        self.msg_id_cache.check_duplicate_message(data)
-        if data['failure']:
-            failure = data['failure']
-            result = rpc_common.deserialize_remote_exception(self._conf,
-                                                             failure)
-        elif data.get('ending', False):
-            self._got_ending = True
-        else:
-            result = data['result']
-        return result
-
-    def __iter__(self):
-        """Return a result until we get a reply with an 'ending' flag."""
-        if self._done:
-            raise StopIteration
-        while True:
-            try:
-                data = self._dataqueue.get(timeout=self._timeout)
-                result = self._process_data(data)
-            except queue.Empty:
-                self.done()
-                raise rpc_common.Timeout()
-            except Exception:
-                with excutils.save_and_reraise_exception():
-                    self.done()
-            if self._got_ending:
-                self.done()
-                raise StopIteration
-            if isinstance(result, Exception):
-                self.done()
-                raise result
-            yield result
-
-
-def create_connection(conf, new, connection_pool):
-    """Create a connection."""
-    return ConnectionContext(conf, connection_pool, pooled=not new)
-
-
-_reply_proxy_create_sem = threading.Lock()
-
-
-def multicall(conf, context, topic, msg, timeout, connection_pool):
-    """Make a call that returns multiple times."""
-    LOG.debug(_('Making synchronous call on %s ...'), topic)
-    msg_id = uuid.uuid4().hex
-    msg.update({'_msg_id': msg_id})
-    LOG.debug(_('MSG_ID is %s') % (msg_id))
-    _add_unique_id(msg)
-    pack_context(msg, context)
-
-    with _reply_proxy_create_sem:
-        if not connection_pool.reply_proxy:
-            connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
-    msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
-    wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
-    with ConnectionContext(conf, connection_pool) as conn:
-        conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
-    return wait_msg
-
-
-def call(conf, context, topic, msg, timeout, connection_pool):
-    """Sends a message on a topic and wait for a response."""
-    rv = multicall(conf, context, topic, msg, timeout, connection_pool)
-    # NOTE(vish): return the last result from the multicall
-    rv = list(rv)
-    if not rv:
-        return
-    return rv[-1]
-
-
-def cast(conf, context, topic, msg, connection_pool):
-    """Sends a message on a topic without waiting for a response."""
-    LOG.debug(_('Making asynchronous cast on %s...'), topic)
-    _add_unique_id(msg)
-    pack_context(msg, context)
-    with ConnectionContext(conf, connection_pool) as conn:
-        conn.topic_send(topic, rpc_common.serialize_msg(msg))
-
-
-def fanout_cast(conf, context, topic, msg, connection_pool):
-    """Sends a message on a fanout exchange without waiting for a response."""
-    LOG.debug(_('Making asynchronous fanout cast...'))
-    _add_unique_id(msg)
-    pack_context(msg, context)
-    with ConnectionContext(conf, connection_pool) as conn:
-        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-
-
-def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
-    """Sends a message on a topic to a specific server."""
-    _add_unique_id(msg)
-    pack_context(msg, context)
-    with ConnectionContext(conf, connection_pool, pooled=False,
-                           server_params=server_params) as conn:
-        conn.topic_send(topic, rpc_common.serialize_msg(msg))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg,
-                          connection_pool):
-    """Sends a message on a fanout exchange to a specific server."""
-    _add_unique_id(msg)
-    pack_context(msg, context)
-    with ConnectionContext(conf, connection_pool, pooled=False,
-                           server_params=server_params) as conn:
-        conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-
-
-def notify(conf, context, topic, msg, connection_pool, envelope):
-    """Sends a notification event on a topic."""
-    LOG.debug(_('Sending %(event_type)s on %(topic)s'),
-              dict(event_type=msg.get('event_type'),
-                   topic=topic))
-    _add_unique_id(msg)
-    pack_context(msg, context)
-    with ConnectionContext(conf, connection_pool) as conn:
-        if envelope:
-            msg = rpc_common.serialize_msg(msg)
-        conn.notify_send(topic, msg)
-
-
-def cleanup(connection_pool):
-    if connection_pool:
-        connection_pool.empty()
-
-
 def get_control_exchange(conf):
     return conf.control_exchange
diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py
index cac4d7b4a..b3717b9a4 100644
--- a/oslo/messaging/_drivers/impl_qpid.py
+++ b/oslo/messaging/_drivers/impl_qpid.py
@@ -698,124 +698,6 @@ class Connection(object):
             self.consumer_thread = eventlet.spawn(_consumer_thread)
         return self.consumer_thread
 
-    def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer that calls a method in a proxy object."""
-        proxy_cb = rpc_amqp.ProxyCallback(
-            self.conf, proxy,
-            rpc_amqp.get_connection_pool(self.conf, Connection))
-        self.proxy_callbacks.append(proxy_cb)
-
-        if fanout:
-            consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
-        else:
-            consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb)
-
-        self._register_consumer(consumer)
-
-        return consumer
-
-    def create_worker(self, topic, proxy, pool_name):
-        """Create a worker that calls a method in a proxy object."""
-        proxy_cb = rpc_amqp.ProxyCallback(
-            self.conf, proxy,
-            rpc_amqp.get_connection_pool(self.conf, Connection))
-        self.proxy_callbacks.append(proxy_cb)
-
-        consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
-                                 name=pool_name)
-
-        self._register_consumer(consumer)
-
-        return consumer
-
-    def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
-        """Register as a member of a group of consumers for a given topic from
-        the specified exchange.
-
-        Exactly one member of a given pool will receive each message.
-
-        A message will be delivered to multiple pools, if more than
-        one is created.
-        """
-        callback_wrapper = rpc_amqp.CallbackWrapper(
-            conf=self.conf,
-            callback=callback,
-            connection_pool=rpc_amqp.get_connection_pool(self.conf,
-                                                         Connection),
-        )
-        self.proxy_callbacks.append(callback_wrapper)
-
-        consumer = TopicConsumer(conf=self.conf,
-                                 session=self.session,
-                                 topic=topic,
-                                 callback=callback_wrapper,
-                                 name=pool_name,
-                                 exchange_name=exchange_name)
-
-        self._register_consumer(consumer)
-        return consumer
-
-
-def create_connection(conf, new=True):
-    """Create a connection."""
-    return rpc_amqp.create_connection(
-        conf, new,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def multicall(conf, context, topic, msg, timeout=None):
-    """Make a call that returns multiple times."""
-    return rpc_amqp.multicall(
-        conf, context, topic, msg, timeout,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def call(conf, context, topic, msg, timeout=None):
-    """Sends a message on a topic and wait for a response."""
-    return rpc_amqp.call(
-        conf, context, topic, msg, timeout,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast(conf, context, topic, msg):
-    """Sends a message on a topic without waiting for a response."""
-    return rpc_amqp.cast(
-        conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast(conf, context, topic, msg):
-    """Sends a message on a fanout exchange without waiting for a response."""
-    return rpc_amqp.fanout_cast(
-        conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast_to_server(conf, context, server_params, topic, msg):
-    """Sends a message on a topic to a specific server."""
-    return rpc_amqp.cast_to_server(
-        conf, context, server_params, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg):
-    """Sends a message on a fanout exchange to a specific server."""
-    return rpc_amqp.fanout_cast_to_server(
-        conf, context, server_params, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def notify(conf, context, topic, msg, envelope):
-    """Sends a notification event on a topic."""
-    return rpc_amqp.notify(conf, context, topic, msg,
-                           rpc_amqp.get_connection_pool(conf, Connection),
-                           envelope)
-
-
-def cleanup():
-    return rpc_amqp.cleanup(Connection.pool)
-
 
 class QpidDriver(amqpdriver.AMQPDriverBase):
 
diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py
index 6f4c9904f..c0084f92e 100644
--- a/oslo/messaging/_drivers/impl_rabbit.py
+++ b/oslo/messaging/_drivers/impl_rabbit.py
@@ -741,111 +741,6 @@ class Connection(object):
             self.consumer_thread = eventlet.spawn(_consumer_thread)
         return self.consumer_thread
 
-    def create_consumer(self, topic, proxy, fanout=False):
-        """Create a consumer that calls a method in a proxy object."""
-        proxy_cb = rpc_amqp.ProxyCallback(
-            self.conf, proxy,
-            rpc_amqp.get_connection_pool(self.conf, Connection))
-        self.proxy_callbacks.append(proxy_cb)
-
-        if fanout:
-            self.declare_fanout_consumer(topic, proxy_cb)
-        else:
-            self.declare_topic_consumer(topic, proxy_cb)
-
-    def create_worker(self, topic, proxy, pool_name):
-        """Create a worker that calls a method in a proxy object."""
-        proxy_cb = rpc_amqp.ProxyCallback(
-            self.conf, proxy,
-            rpc_amqp.get_connection_pool(self.conf, Connection))
-        self.proxy_callbacks.append(proxy_cb)
-        self.declare_topic_consumer(topic, proxy_cb, pool_name)
-
-    def join_consumer_pool(self, callback, pool_name, topic,
-                           exchange_name=None):
-        """Register as a member of a group of consumers for a given topic from
-        the specified exchange.
-
-        Exactly one member of a given pool will receive each message.
-
-        A message will be delivered to multiple pools, if more than
-        one is created.
-        """
-        callback_wrapper = rpc_amqp.CallbackWrapper(
-            conf=self.conf,
-            callback=callback,
-            connection_pool=rpc_amqp.get_connection_pool(self.conf,
-                                                         Connection),
-        )
-        self.proxy_callbacks.append(callback_wrapper)
-        self.declare_topic_consumer(
-            queue_name=pool_name,
-            topic=topic,
-            exchange_name=exchange_name,
-            callback=callback_wrapper,
-        )
-
-
-def create_connection(conf, new=True):
-    """Create a connection."""
-    return rpc_amqp.create_connection(
-        conf, new,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def multicall(conf, context, topic, msg, timeout=None):
-    """Make a call that returns multiple times."""
-    return rpc_amqp.multicall(
-        conf, context, topic, msg, timeout,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def call(conf, context, topic, msg, timeout=None):
-    """Sends a message on a topic and wait for a response."""
-    return rpc_amqp.call(
-        conf, context, topic, msg, timeout,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast(conf, context, topic, msg):
-    """Sends a message on a topic without waiting for a response."""
-    return rpc_amqp.cast(
-        conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast(conf, context, topic, msg):
-    """Sends a message on a fanout exchange without waiting for a response."""
-    return rpc_amqp.fanout_cast(
-        conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def cast_to_server(conf, context, server_params, topic, msg):
-    """Sends a message on a topic to a specific server."""
-    return rpc_amqp.cast_to_server(
-        conf, context, server_params, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def fanout_cast_to_server(conf, context, server_params, topic, msg):
-    """Sends a message on a fanout exchange to a specific server."""
-    return rpc_amqp.fanout_cast_to_server(
-        conf, context, server_params, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection))
-
-
-def notify(conf, context, topic, msg, envelope):
-    """Sends a notification event on a topic."""
-    return rpc_amqp.notify(
-        conf, context, topic, msg,
-        rpc_amqp.get_connection_pool(conf, Connection),
-        envelope)
-
-
-def cleanup():
-    return rpc_amqp.cleanup(Connection.pool)
-
 
 class RabbitDriver(amqpdriver.AMQPDriverBase):