diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py
index f673b9c06..6108e9519 100644
--- a/oslo_messaging/_drivers/impl_zmq.py
+++ b/oslo_messaging/_drivers/impl_zmq.py
@@ -35,7 +35,7 @@ from stevedore import driver
 
 from oslo_messaging._drivers import base
 from oslo_messaging._drivers import common as rpc_common
-from oslo_messaging._executors import base as executor_base  # FIXME(markmc)
+from oslo_messaging._executors import impl_pooledexecutor  # FIXME(markmc)
 from oslo_messaging._i18n import _, _LE, _LW
 from oslo_messaging._drivers import pool
 
@@ -1001,7 +1001,7 @@ class ZmqDriver(base.BaseDriver):
         if not zmq:
             raise ImportError("Failed to import eventlet.green.zmq")
         conf.register_opts(zmq_opts)
-        conf.register_opts(executor_base._pool_opts)
+        conf.register_opts(impl_pooledexecutor._pool_opts)
         conf.register_opts(base.base_opts)
 
         super(ZmqDriver, self).__init__(conf, url, default_exchange,
diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py
index 2cc667e27..6fbc15377 100644
--- a/oslo_messaging/_executors/base.py
+++ b/oslo_messaging/_executors/base.py
@@ -14,19 +14,15 @@
 
 import abc
 
-from oslo_config import cfg
 import six
 
-_pool_opts = [
-    cfg.IntOpt('rpc_thread_pool_size',
-               default=64,
-               help='Size of RPC thread pool.'),
-]
-
 
 @six.add_metaclass(abc.ABCMeta)
 class ExecutorBase(object):
 
+    # Executor can override how we run the application callback
+    _executor_callback = None
+
     def __init__(self, conf, listener, dispatcher):
         self.conf = conf
         self.listener = listener
@@ -43,11 +39,3 @@ class ExecutorBase(object):
     @abc.abstractmethod
     def wait(self):
         "Wait until the executor has stopped polling."
-
-
-class PooledExecutorBase(ExecutorBase):
-    """An executor that uses a rpc thread pool of a given size."""
-
-    def __init__(self, conf, listener, callback):
-        super(PooledExecutorBase, self).__init__(conf, listener, callback)
-        self.conf.register_opts(_pool_opts)
diff --git a/oslo_messaging/_executors/impl_aioeventlet.py b/oslo_messaging/_executors/impl_aioeventlet.py
index d0fc4aa31..8f67d7958 100644
--- a/oslo_messaging/_executors/impl_aioeventlet.py
+++ b/oslo_messaging/_executors/impl_aioeventlet.py
@@ -70,6 +70,4 @@ class AsyncioEventletExecutor(impl_eventlet.EventletExecutor):
             result = aioeventlet.yield_future(result, loop=self._loop)
         return result
 
-    def _dispatch(self, incoming):
-        ctx = self.dispatcher(incoming, self._coroutine_wrapper)
-        impl_eventlet.spawn_with(ctxt=ctx, pool=self._greenpool)
+    _executor_callback = _coroutine_wrapper
diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py
index 733403601..11ad81226 100644
--- a/oslo_messaging/_executors/impl_blocking.py
+++ b/oslo_messaging/_executors/impl_blocking.py
@@ -13,15 +13,28 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import logging
+import futurist
 
-from oslo_messaging._executors import base
-from oslo_messaging._i18n import _
-
-LOG = logging.getLogger(__name__)
+from oslo_messaging._executors import impl_pooledexecutor
 
 
-class BlockingExecutor(base.ExecutorBase):
+class FakeBlockingThread(object):
+    def __init__(self, target):
+        self._target = target
+
+    def start(self):
+        self._target()
+
+    @staticmethod
+    def join():
+        pass
+
+    @staticmethod
+    def stop():
+        pass
+
+
+class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
 
     """A message executor which blocks the current thread.
 
@@ -34,24 +47,5 @@ class BlockingExecutor(base.ExecutorBase):
     for simple demo programs.
     """
 
-    def __init__(self, conf, listener, dispatcher):
-        super(BlockingExecutor, self).__init__(conf, listener, dispatcher)
-        self._running = False
-
-    def start(self):
-        self._running = True
-        while self._running:
-            try:
-                incoming = self.listener.poll()
-                if incoming is not None:
-                    with self.dispatcher(incoming) as callback:
-                        callback()
-            except Exception:
-                LOG.exception(_("Unexpected exception occurred."))
-
-    def stop(self):
-        self._running = False
-        self.listener.stop()
-
-    def wait(self):
-        pass
+    _executor_cls = lambda __, ___: futurist.SynchronousExecutor()
+    _thread_cls = FakeBlockingThread
diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py
index 3333fe713..48771edc7 100644
--- a/oslo_messaging/_executors/impl_eventlet.py
+++ b/oslo_messaging/_executors/impl_eventlet.py
@@ -14,50 +14,17 @@
 #    under the License.
 
 import logging
-import sys
 
-import eventlet
 from eventlet.green import threading as greenthreading
-from eventlet import greenpool
-import greenlet
-from oslo_utils import excutils
+import futurist
 
-from oslo_messaging._executors import base
+from oslo_messaging._executors import impl_pooledexecutor
 from oslo_messaging import localcontext
 
 LOG = logging.getLogger(__name__)
 
 
-def spawn_with(ctxt, pool):
-    """This is the equivalent of a with statement
-    but with the content of the BLOCK statement executed
-    into a greenthread
-
-    exception path grab from:
-    http://www.python.org/dev/peps/pep-0343/
-    """
-
-    def complete(thread, exit):
-        exc = True
-        try:
-            try:
-                thread.wait()
-            except Exception:
-                exc = False
-                if not exit(*sys.exc_info()):
-                    raise
-        finally:
-            if exc:
-                exit(None, None, None)
-
-    callback = ctxt.__enter__()
-    thread = pool.spawn(callback)
-    thread.link(complete, ctxt.__exit__)
-
-    return thread
-
-
-class EventletExecutor(base.PooledExecutorBase):
+class EventletExecutor(impl_pooledexecutor.PooledExecutor):
 
     """A message executor which integrates with eventlet.
 
@@ -70,10 +37,6 @@ class EventletExecutor(base.PooledExecutorBase):
 
     def __init__(self, conf, listener, dispatcher):
         super(EventletExecutor, self).__init__(conf, listener, dispatcher)
-        self._thread = None
-        self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size)
-        self._running = False
-
         if not isinstance(localcontext._STORE, greenthreading.local):
             LOG.debug('eventlet executor in use but the threading module '
                       'has not been monkeypatched or has been '
@@ -82,39 +45,7 @@ class EventletExecutor(base.PooledExecutorBase):
                       'behavior. In the future, we will raise a '
                       'RuntimeException in this case.')
 
-    def _dispatch(self, incoming):
-        spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)
-
-    def start(self):
-        if self._thread is not None:
-            return
-
-        @excutils.forever_retry_uncaught_exceptions
-        def _executor_thread():
-            try:
-                while self._running:
-                    incoming = self.listener.poll()
-                    if incoming is not None:
-                        self._dispatch(incoming)
-            except greenlet.GreenletExit:
-                return
-
-        self._running = True
-        self._thread = eventlet.spawn(_executor_thread)
-
-    def stop(self):
-        if self._thread is None:
-            return
-        self._running = False
-        self.listener.stop()
-        self._thread.cancel()
-
-    def wait(self):
-        if self._thread is None:
-            return
-        self._greenpool.waitall()
-        try:
-            self._thread.wait()
-        except greenlet.GreenletExit:
-            pass
-        self._thread = None
+    _executor_cls = futurist.GreenThreadPoolExecutor
+    _lock_cls = greenthreading.Lock
+    _event_cls = greenthreading.Event
+    _thread_cls = greenthreading.Thread
diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py
new file mode 100644
index 000000000..7689bd14f
--- /dev/null
+++ b/oslo_messaging/_executors/impl_pooledexecutor.py
@@ -0,0 +1,112 @@
+# -*- coding: utf-8 -*-
+
+#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import collections
+import threading
+
+from concurrent import futures
+from oslo.config import cfg
+from oslo.utils import excutils
+
+from oslo_messaging._executors import base
+
+_pool_opts = [
+    cfg.IntOpt('rpc_thread_pool_size',
+               default=64,
+               help='Size of RPC thread pool.'),
+]
+
+
+class PooledExecutor(base.ExecutorBase):
+    """A message executor which integrates with threads.
+
+    A message process that polls for messages from a dispatching thread and
+    on reception of an incoming message places the message to be processed in
+    a thread pool to be executed at a later time.
+    """
+
+    # NOTE(harlowja): if eventlet is being used and the thread module is monkey
+    # patched this should/is supposed to work the same as the eventlet based
+    # executor.
+
+    # NOTE(harlowja): Make it somewhat easy to change this via
+    # inheritance (since there does exist other executor types that could be
+    # used/tried here).
+    _executor_cls = futures.ThreadPoolExecutor
+    _event_cls = threading.Event
+    _lock_cls = threading.Lock
+    _thread_cls = threading.Thread
+
+    def __init__(self, conf, listener, dispatcher):
+        super(PooledExecutor, self).__init__(conf, listener, dispatcher)
+        self.conf.register_opts(_pool_opts)
+        self._poller = None
+        self._executor = None
+        self._tombstone = self._event_cls()
+        self._incomplete = collections.deque()
+        self._mutator = self._lock_cls()
+
+    @excutils.forever_retry_uncaught_exceptions
+    def _runner(self):
+        while not self._tombstone.is_set():
+            incoming = self.listener.poll()
+            if incoming is None:
+                continue
+            callback = self.dispatcher(incoming, self._executor_callback)
+            try:
+                fut = self._executor.submit(callback.run)
+            except RuntimeError:
+                # This is triggered when the executor has been shutdown...
+                #
+                # TODO(harlowja): should we put whatever we pulled off back
+                # since when this is thrown it means the executor has been
+                # shutdown already??
+                callback.done()
+                return
+            else:
+                with self._mutator:
+                    self._incomplete.append(fut)
+                # Run the other post processing of the callback when done...
+                fut.add_done_callback(lambda f: callback.done())
+
+    def start(self):
+        if self._executor is None:
+            self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
+        self._tombstone.clear()
+        if self._poller is None or not self._poller.is_alive():
+            self._poller = self._thread_cls(target=self._runner)
+            self._poller.daemon = True
+            self._poller.start()
+
+    def stop(self):
+        if self._executor is not None:
+            self._executor.shutdown(wait=False)
+        self._tombstone.set()
+        self.listener.stop()
+
+    def wait(self):
+        # TODO(harlowja): this method really needs a timeout.
+        if self._poller is not None:
+            self._tombstone.wait()
+            self._poller.join()
+            self._poller = None
+        if self._executor is not None:
+            with self._mutator:
+                incomplete_fs = list(self._incomplete)
+                self._incomplete.clear()
+            if incomplete_fs:
+                futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
+            self._executor = None
diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py
index ca8ebc790..9a4651aa2 100644
--- a/oslo_messaging/_executors/impl_thread.py
+++ b/oslo_messaging/_executors/impl_thread.py
@@ -14,118 +14,16 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import collections
-import functools
-import sys
-import threading
-
 from concurrent import futures
-from oslo_utils import excutils
-import six
 
-from oslo_messaging._executors import base
+from oslo_messaging._executors import impl_pooledexecutor
 
 
-class ThreadExecutor(base.PooledExecutorBase):
+class ThreadExecutor(impl_pooledexecutor.PooledExecutor):
     """A message executor which integrates with threads.
 
     A message process that polls for messages from a dispatching thread and
     on reception of an incoming message places the message to be processed in
     a thread pool to be executed at a later time.
     """
-
-    # NOTE(harlowja): if eventlet is being used and the thread module is monkey
-    # patched this should/is supposed to work the same as the eventlet based
-    # executor.
-
-    # NOTE(harlowja): Make it somewhat easy to change this via
-    # inheritance (since there does exist other executor types that could be
-    # used/tried here).
     _executor_cls = futures.ThreadPoolExecutor
-
-    def __init__(self, conf, listener, dispatcher):
-        super(ThreadExecutor, self).__init__(conf, listener, dispatcher)
-        self._poller = None
-        self._executor = None
-        self._tombstone = threading.Event()
-        self._incomplete = collections.deque()
-        self._mutator = threading.Lock()
-
-    def _completer(self, exit_method, fut):
-        """Completes futures."""
-        try:
-            exc = fut.exception()
-            if exc is not None:
-                exc_type = type(exc)
-                # Not available on < 3.x due to this being an added feature
-                # of pep-3134 (exception chaining and embedded tracebacks).
-                if six.PY3:
-                    exc_tb = exc.__traceback__
-                else:
-                    exc_tb = None
-                if not exit_method(exc_type, exc, exc_tb):
-                    six.reraise(exc_type, exc, tb=exc_tb)
-            else:
-                exit_method(None, None, None)
-        finally:
-            with self._mutator:
-                try:
-                    self._incomplete.remove(fut)
-                except ValueError:
-                    pass
-
-    @excutils.forever_retry_uncaught_exceptions
-    def _runner(self):
-        while not self._tombstone.is_set():
-            incoming = self.listener.poll()
-            if incoming is None:
-                continue
-            # This is hacky, needs to be fixed....
-            context = self.dispatcher(incoming)
-            enter_method = context.__enter__()
-            exit_method = context.__exit__
-            try:
-                fut = self._executor.submit(enter_method)
-            except RuntimeError:
-                # This is triggered when the executor has been shutdown...
-                #
-                # TODO(harlowja): should we put whatever we pulled off back
-                # since when this is thrown it means the executor has been
-                # shutdown already??
-                exit_method(*sys.exc_info())
-                return
-            else:
-                with self._mutator:
-                    self._incomplete.append(fut)
-                # Run the other half (__exit__) when done...
-                fut.add_done_callback(functools.partial(self._completer,
-                                                        exit_method))
-
-    def start(self):
-        if self._executor is None:
-            self._executor = self._executor_cls(self.conf.rpc_thread_pool_size)
-        self._tombstone.clear()
-        if self._poller is None or not self._poller.is_alive():
-            self._poller = threading.Thread(target=self._runner)
-            self._poller.daemon = True
-            self._poller.start()
-
-    def stop(self):
-        if self._executor is not None:
-            self._executor.shutdown(wait=False)
-        self._tombstone.set()
-        self.listener.stop()
-
-    def wait(self):
-        # TODO(harlowja): this method really needs a timeout.
-        if self._poller is not None:
-            self._tombstone.wait()
-            self._poller.join()
-            self._poller = None
-        if self._executor is not None:
-            with self._mutator:
-                incomplete_fs = list(self._incomplete)
-                self._incomplete.clear()
-            if incomplete_fs:
-                futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED)
-            self._executor = None
diff --git a/oslo_messaging/_utils.py b/oslo_messaging/_utils.py
index 532c98b8a..ddec6d7a7 100644
--- a/oslo_messaging/_utils.py
+++ b/oslo_messaging/_utils.py
@@ -13,6 +13,10 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import logging
+
+LOG = logging.getLogger(__name__)
+
 
 def version_is_compatible(imp_version, version):
     """Determine whether versions are compatible.
@@ -39,3 +43,54 @@ def version_is_compatible(imp_version, version):
             int(rev) > int(imp_rev)):  # Revision
         return False
     return True
+
+
+class DispatcherExecutorContext(object):
+    """Dispatcher executor context helper
+
+    A dispatcher can have work to do before and after the dispatch of the
+    request in the main server thread while the dispatcher itself can be
+    done in its own thread.
+
+    The executor can use the helper like this:
+
+        callback = dispatcher(incoming)
+        callback.prepare()
+        thread = MyWhateverThread()
+        thread.on_done(callback.done)
+        thread.run(callback.run)
+
+    """
+    def __init__(self, incoming, dispatch, executor_callback=None,
+                 post=None):
+        self._result = None
+        self._incoming = incoming
+        self._dispatch = dispatch
+        self._post = post
+        self._executor_callback = executor_callback
+
+    def run(self):
+        """The incoming message dispath itself
+
+        Can be run in an other thread/greenlet/corotine if the executor is
+        able to do it.
+        """
+        try:
+            self._result = self._dispatch(self._incoming,
+                                          self._executor_callback)
+        except Exception:
+            msg = 'The dispatcher method must catches all exceptions'
+            LOG.exception(msg)
+            raise RuntimeError(msg)
+
+    def done(self):
+        """Callback after the incoming message have been dispathed
+
+        Should be runned in the main executor thread/greenlet/corotine
+        """
+        # FIXME(sileht): this is not currently true, this works only because
+        # the driver connection used for polling write on the wire only to
+        # ack/requeue message, but what if one day, the driver do something
+        # else
+        if self._post is not None:
+            self._post(self._incoming, self._result)
diff --git a/oslo_messaging/notify/dispatcher.py b/oslo_messaging/notify/dispatcher.py
index 6214b301a..46d53035e 100644
--- a/oslo_messaging/notify/dispatcher.py
+++ b/oslo_messaging/notify/dispatcher.py
@@ -14,11 +14,11 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import contextlib
 import itertools
 import logging
 import sys
 
+from oslo_messaging import _utils as utils
 from oslo_messaging import localcontext
 from oslo_messaging import serializer as msg_serializer
 
@@ -68,14 +68,15 @@ class NotificationDispatcher(object):
         return transport._listen_for_notifications(self._targets_priorities,
                                                    pool=self.pool)
 
-    @contextlib.contextmanager
     def __call__(self, incoming, executor_callback=None):
-        result_wrapper = []
+        return utils.DispatcherExecutorContext(
+            incoming, self._dispatch_and_handle_error,
+            executor_callback=executor_callback,
+            post=self._post_dispatch)
 
-        yield lambda: result_wrapper.append(
-            self._dispatch_and_handle_error(incoming, executor_callback))
-
-        if result_wrapper[0] == NotificationResult.HANDLED:
+    @staticmethod
+    def _post_dispatch(incoming, result):
+        if result == NotificationResult.HANDLED:
             incoming.acknowledge()
         else:
             incoming.requeue()
diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py
index 5911b69d7..1f065b5ea 100644
--- a/oslo_messaging/opts.py
+++ b/oslo_messaging/opts.py
@@ -29,7 +29,7 @@ from oslo_messaging._drivers import matchmaker
 from oslo_messaging._drivers import matchmaker_redis
 from oslo_messaging._drivers import matchmaker_ring
 from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
-from oslo_messaging._executors import base
+from oslo_messaging._executors import impl_pooledexecutor
 from oslo_messaging.notify import notifier
 from oslo_messaging.rpc import client
 from oslo_messaging import transport
@@ -38,7 +38,7 @@ _global_opt_lists = [
     drivers_base.base_opts,
     impl_zmq.zmq_opts,
     matchmaker.matchmaker_opts,
-    base._pool_opts,
+    impl_pooledexecutor._pool_opts,
     notifier._notifier_opts,
     client._client_opts,
     transport._transport_opts,
diff --git a/oslo_messaging/rpc/dispatcher.py b/oslo_messaging/rpc/dispatcher.py
index 0277d8e3a..e679cebef 100644
--- a/oslo_messaging/rpc/dispatcher.py
+++ b/oslo_messaging/rpc/dispatcher.py
@@ -24,7 +24,6 @@ __all__ = [
     'ExpectedException',
 ]
 
-import contextlib
 import logging
 import sys
 
@@ -130,10 +129,11 @@ class RPCDispatcher(object):
             result = func(ctxt, **new_args)
         return self.serializer.serialize_entity(ctxt, result)
 
-    @contextlib.contextmanager
     def __call__(self, incoming, executor_callback=None):
         incoming.acknowledge()
-        yield lambda: self._dispatch_and_reply(incoming, executor_callback)
+        return utils.DispatcherExecutorContext(
+            incoming, self._dispatch_and_reply,
+            executor_callback=executor_callback)
 
     def _dispatch_and_reply(self, incoming, executor_callback):
         try:
diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py
index 221f4931a..3a6b00d9c 100644
--- a/oslo_messaging/tests/executors/test_executor.py
+++ b/oslo_messaging/tests/executors/test_executor.py
@@ -14,7 +14,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import contextlib
 import threading
 
 # eventlet 0.16 with monkey patching does not work yet on Python 3,
@@ -28,7 +27,6 @@ try:
 except ImportError:
     eventlet = None
 import testscenarios
-import testtools
 try:
     import trollius
 except ImportError:
@@ -45,6 +43,7 @@ try:
 except ImportError:
     impl_eventlet = None
 from oslo_messaging._executors import impl_thread
+from oslo_messaging import _utils as utils
 from oslo_messaging.tests import utils as test_utils
 from six.moves import mock
 
@@ -106,7 +105,6 @@ class TestExecutor(test_utils.BaseTestCase):
 
             @trollius.coroutine
             def simple_coroutine(value):
-                yield None
                 raise trollius.Return(value)
 
             endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
@@ -123,30 +121,29 @@ class TestExecutor(test_utils.BaseTestCase):
                 self.endpoint = endpoint
                 self.result = "not set"
 
-            @contextlib.contextmanager
-            def __call__(self, incoming, executor_callback=None):
-                if executor_callback is not None:
-                    def callback():
-                        result = executor_callback(self.endpoint,
-                                                   incoming.ctxt,
-                                                   incoming.message)
-                        self.result = result
-                        return result
-                    yield callback
-                    event.send()
+            def callback(self, incoming, executor_callback):
+                if executor_callback is None:
+                    result = self.endpoint(incoming.ctxt,
+                                           incoming.message)
                 else:
-                    def callback():
-                        result = self.endpoint(incoming.ctxt, incoming.message)
-                        self.result = result
-                        return result
-                    yield callback
+                    result = executor_callback(self.endpoint,
+                                               incoming.ctxt,
+                                               incoming.message)
+                if is_aioeventlet:
+                    event.send()
+                self.result = result
+                return result
 
-        listener = mock.Mock(spec=['poll'])
+            def __call__(self, incoming, executor_callback=None):
+                return utils.DispatcherExecutorContext(incoming,
+                                                       self.callback,
+                                                       executor_callback)
+
+        listener = mock.Mock(spec=['poll', 'stop'])
         dispatcher = Dispatcher(endpoint)
         executor = self.executor(self.conf, listener, dispatcher)
 
-        incoming_message = mock.MagicMock(ctxt={},
-                                          message={'payload': 'data'})
+        incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
 
         def fake_poll(timeout=None):
             if is_aioeventlet:
@@ -167,60 +164,3 @@ class TestExecutor(test_utils.BaseTestCase):
         self.assertEqual(dispatcher.result, 'result')
 
 TestExecutor.generate_scenarios()
-
-
-class ExceptedException(Exception):
-    pass
-
-
-class EventletContextManagerSpawnTest(test_utils.BaseTestCase):
-    @testtools.skipIf(impl_eventlet is None, "Eventlet not available")
-    def setUp(self):
-        super(EventletContextManagerSpawnTest, self).setUp()
-        self.before = mock.Mock()
-        self.callback = mock.Mock()
-        self.after = mock.Mock()
-        self.exception_call = mock.Mock()
-
-        @contextlib.contextmanager
-        def context_mgr():
-            self.before()
-            try:
-                yield lambda: self.callback()
-            except ExceptedException:
-                self.exception_call()
-            self.after()
-
-        self.mgr = context_mgr()
-
-    def test_normal_run(self):
-        thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
-        thread.wait()
-        self.assertEqual(1, self.before.call_count)
-        self.assertEqual(1, self.callback.call_count)
-        self.assertEqual(1, self.after.call_count)
-        self.assertEqual(0, self.exception_call.call_count)
-
-    def test_excepted_exception(self):
-        self.callback.side_effect = ExceptedException
-        thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
-        try:
-            thread.wait()
-        except ExceptedException:
-            pass
-        self.assertEqual(1, self.before.call_count)
-        self.assertEqual(1, self.callback.call_count)
-        self.assertEqual(1, self.after.call_count)
-        self.assertEqual(1, self.exception_call.call_count)
-
-    def test_unexcepted_exception(self):
-        self.callback.side_effect = Exception
-        thread = impl_eventlet.spawn_with(self.mgr, pool=eventlet)
-        try:
-            thread.wait()
-        except Exception:
-            pass
-        self.assertEqual(1, self.before.call_count)
-        self.assertEqual(1, self.callback.call_count)
-        self.assertEqual(0, self.after.call_count)
-        self.assertEqual(0, self.exception_call.call_count)
diff --git a/oslo_messaging/tests/notify/test_dispatcher.py b/oslo_messaging/tests/notify/test_dispatcher.py
index ee86491a0..f0da90d89 100644
--- a/oslo_messaging/tests/notify/test_dispatcher.py
+++ b/oslo_messaging/tests/notify/test_dispatcher.py
@@ -107,8 +107,9 @@ class TestDispatcher(test_utils.BaseTestCase):
                          sorted(dispatcher._targets_priorities))
 
         incoming = mock.Mock(ctxt={}, message=msg)
-        with dispatcher(incoming) as callback:
-            callback()
+        callback = dispatcher(incoming)
+        callback.run()
+        callback.done()
 
         # check endpoint callbacks are called or not
         for i, endpoint_methods in enumerate(self.endpoints):
@@ -143,8 +144,9 @@ class TestDispatcher(test_utils.BaseTestCase):
         msg['priority'] = 'what???'
         dispatcher = notify_dispatcher.NotificationDispatcher(
             [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
-        with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
-            callback()
+        callback = dispatcher(mock.Mock(ctxt={}, message=msg))
+        callback.run()
+        callback.done()
         mylog.warning.assert_called_once_with('Unknown priority "%s"',
                                               'what???')
 
@@ -244,8 +246,9 @@ class TestDispatcherFilter(test_utils.BaseTestCase):
                    'timestamp': '2014-03-03 18:21:04.369234',
                    'message_id': '99863dda-97f0-443a-a0c1-6ed317b7fd45'}
         incoming = mock.Mock(ctxt=self.context, message=message)
-        with dispatcher(incoming) as callback:
-            callback()
+        callback = dispatcher(incoming)
+        callback.run()
+        callback.done()
 
         if self.match:
             self.assertEqual(1, endpoint.info.call_count)
diff --git a/oslo_messaging/tests/rpc/test_dispatcher.py b/oslo_messaging/tests/rpc/test_dispatcher.py
index 03181d1d0..f81be0b9c 100644
--- a/oslo_messaging/tests/rpc/test_dispatcher.py
+++ b/oslo_messaging/tests/rpc/test_dispatcher.py
@@ -133,8 +133,9 @@ class TestDispatcher(test_utils.BaseTestCase):
         incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
         incoming.reply.side_effect = check_reply
 
-        with dispatcher(incoming) as callback:
-            callback()
+        callback = dispatcher(incoming)
+        callback.run()
+        callback.done()
 
         for n, endpoint in enumerate(endpoints):
             for method_name in ['foo', 'bar']:
diff --git a/requirements.txt b/requirements.txt
index 19651eea0..960e6a5b5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,6 +4,7 @@
 
 pbr<2.0,>=0.11
 
+futurist>=0.1.1 # Apache-2.0
 oslo.config>=1.11.0 # Apache-2.0
 oslo.context>=0.2.0 # Apache-2.0
 oslo.log>=1.2.0  # Apache-2.0
diff --git a/tests/notify/test_dispatcher.py b/tests/notify/test_dispatcher.py
index 5c61840a8..061c29bff 100644
--- a/tests/notify/test_dispatcher.py
+++ b/tests/notify/test_dispatcher.py
@@ -107,8 +107,9 @@ class TestDispatcherScenario(test_utils.BaseTestCase):
                          sorted(dispatcher._targets_priorities))
 
         incoming = mock.Mock(ctxt={}, message=msg)
-        with dispatcher(incoming) as callback:
-            callback()
+        callback = dispatcher(incoming)
+        callback.run()
+        callback.done()
 
         # check endpoint callbacks are called or not
         for i, endpoint_methods in enumerate(self.endpoints):
@@ -146,8 +147,9 @@ class TestDispatcher(test_utils.BaseTestCase):
         msg['priority'] = 'what???'
         dispatcher = notify_dispatcher.NotificationDispatcher(
             [mock.Mock()], [mock.Mock()], None, allow_requeue=True, pool=None)
-        with dispatcher(mock.Mock(ctxt={}, message=msg)) as callback:
-            callback()
+        callback = dispatcher(mock.Mock(ctxt={}, message=msg))
+        callback.run()
+        callback.done()
         mylog.warning.assert_called_once_with('Unknown priority "%s"',
                                               'what???')
 
@@ -165,7 +167,8 @@ class TestDispatcher(test_utils.BaseTestCase):
 
         incoming = mock.Mock(ctxt={}, message=msg)
         executor_callback = mock.Mock()
-        with dispatcher(incoming, executor_callback) as callback:
-            callback()
+        callback = dispatcher(incoming, executor_callback)
+        callback.run()
+        callback.done()
         self.assertTrue(executor_callback.called)
         self.assertEqual(executor_callback.call_args[0][0], endpoint_method)
diff --git a/tests/rpc/test_dispatcher.py b/tests/rpc/test_dispatcher.py
index 64181f026..acd87bf5f 100644
--- a/tests/rpc/test_dispatcher.py
+++ b/tests/rpc/test_dispatcher.py
@@ -120,8 +120,9 @@ class TestDispatcher(test_utils.BaseTestCase):
         incoming = mock.Mock(ctxt=self.ctxt, message=self.msg)
         incoming.reply.side_effect = check_reply
 
-        with dispatcher(incoming) as callback:
-            callback()
+        callback = dispatcher(incoming)
+        callback.run()
+        callback.done()
 
         for n, endpoint in enumerate(endpoints):
             for method_name in ['foo', 'bar']: