diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index 11ad81226..cc40edd56 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -35,7 +35,6 @@ class FakeBlockingThread(object): class BlockingExecutor(impl_pooledexecutor.PooledExecutor): - """A message executor which blocks the current thread. The blocking executor's start() method functions as a request processing diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py index 48771edc7..093a389ca 100644 --- a/oslo_messaging/_executors/impl_eventlet.py +++ b/oslo_messaging/_executors/impl_eventlet.py @@ -25,7 +25,6 @@ LOG = logging.getLogger(__name__) class EventletExecutor(impl_pooledexecutor.PooledExecutor): - """A message executor which integrates with eventlet. This is an executor which polls for incoming messages from a greenthread diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py index 7689bd14f..1a8af1af5 100644 --- a/oslo_messaging/_executors/impl_pooledexecutor.py +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -15,6 +15,7 @@ # under the License. import collections +import functools import threading from concurrent import futures @@ -31,25 +32,30 @@ _pool_opts = [ class PooledExecutor(base.ExecutorBase): - """A message executor which integrates with threads. + """A message executor which integrates with some async executor. - A message process that polls for messages from a dispatching thread and - on reception of an incoming message places the message to be processed in - a thread pool to be executed at a later time. + This will create a message thread that polls for messages from a + dispatching thread and on reception of an incoming message places the + message to be processed into a async executor to be executed at a later + time. """ - # NOTE(harlowja): if eventlet is being used and the thread module is monkey - # patched this should/is supposed to work the same as the eventlet based - # executor. - - # NOTE(harlowja): Make it somewhat easy to change this via - # inheritance (since there does exist other executor types that could be - # used/tried here). - _executor_cls = futures.ThreadPoolExecutor + # These may be overridden by subclasses (and implemented using whatever + # objects make most sense for the provided async execution model). _event_cls = threading.Event _lock_cls = threading.Lock + + # Pooling and dispatching (executor submission) will happen from a + # thread created from this class/function. _thread_cls = threading.Thread + # This one **must** be overridden by a subclass. + _executor_cls = None + + # Blocking function that should wait for all provided futures to finish. + _wait_for_all = functools.partial(futures.wait, + return_when=futures.ALL_COMPLETED) + def __init__(self, conf, listener, dispatcher): super(PooledExecutor, self).__init__(conf, listener, dispatcher) self.conf.register_opts(_pool_opts) @@ -108,5 +114,5 @@ class PooledExecutor(base.ExecutorBase): incomplete_fs = list(self._incomplete) self._incomplete.clear() if incomplete_fs: - futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) + self._wait_for_all(incomplete_fs) self._executor = None diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py index 9a4651aa2..13cc9c563 100644 --- a/oslo_messaging/_executors/impl_thread.py +++ b/oslo_messaging/_executors/impl_thread.py @@ -14,7 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -from concurrent import futures +import futurist from oslo_messaging._executors import impl_pooledexecutor @@ -26,4 +26,5 @@ class ThreadExecutor(impl_pooledexecutor.PooledExecutor): on reception of an incoming message places the message to be processed in a thread pool to be executed at a later time. """ - _executor_cls = futures.ThreadPoolExecutor + + _executor_cls = futurist.ThreadPoolExecutor