diff --git a/doc/source/executors.rst b/doc/source/executors.rst new file mode 100644 index 000000000..892290535 --- /dev/null +++ b/doc/source/executors.rst @@ -0,0 +1,15 @@ +--------- +Executors +--------- + +.. automodule:: oslo.messaging._executors + +.. currentmodule:: oslo.messaging + +============== +Executor types +============== + +Executors are providing the way an incoming message will be dispatched so that +the message can be used for meaningful work. Different types of executors are +supported, each with its own set of restrictions and capabilities. diff --git a/doc/source/index.rst b/doc/source/index.rst index 5c58e8534..6e4681d12 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -11,6 +11,7 @@ Contents :maxdepth: 1 transport + executors target server rpcclient diff --git a/oslo/messaging/_cmd/zmq_receiver.py b/oslo/messaging/_cmd/zmq_receiver.py index 2521ae495..07c9b10c3 100644 --- a/oslo/messaging/_cmd/zmq_receiver.py +++ b/oslo/messaging/_cmd/zmq_receiver.py @@ -23,11 +23,11 @@ import sys from oslo.config import cfg from oslo.messaging._drivers import impl_zmq -from oslo.messaging._executors import impl_eventlet # FIXME(markmc) +from oslo.messaging._executors import base # FIXME(markmc) CONF = cfg.CONF CONF.register_opts(impl_zmq.zmq_opts) -CONF.register_opts(impl_eventlet._eventlet_opts) +CONF.register_opts(base._pool_opts) def main(): diff --git a/oslo/messaging/_drivers/impl_zmq.py b/oslo/messaging/_drivers/impl_zmq.py index 5537e2cdd..e432c0427 100644 --- a/oslo/messaging/_drivers/impl_zmq.py +++ b/oslo/messaging/_drivers/impl_zmq.py @@ -31,7 +31,7 @@ from six import moves from oslo.config import cfg from oslo.messaging._drivers import base from oslo.messaging._drivers import common as rpc_common -from oslo.messaging._executors import impl_eventlet # FIXME(markmc) +from oslo.messaging._executors import base as executor_base # FIXME(markmc) from oslo.messaging._i18n import _, _LE from oslo.serialization import jsonutils from oslo.utils import excutils @@ -857,7 +857,7 @@ class ZmqDriver(base.BaseDriver): if not zmq: raise ImportError("Failed to import eventlet.green.zmq") conf.register_opts(zmq_opts) - conf.register_opts(impl_eventlet._eventlet_opts) + conf.register_opts(executor_base._pool_opts) super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) diff --git a/oslo/messaging/_executors/base.py b/oslo/messaging/_executors/base.py index 8019017b3..d60d656e9 100644 --- a/oslo/messaging/_executors/base.py +++ b/oslo/messaging/_executors/base.py @@ -16,6 +16,14 @@ import abc import six +from oslo.config import cfg + +_pool_opts = [ + cfg.IntOpt('rpc_thread_pool_size', + default=64, + help='Size of RPC thread pool.'), +] + @six.add_metaclass(abc.ABCMeta) class ExecutorBase(object): @@ -36,3 +44,11 @@ class ExecutorBase(object): @abc.abstractmethod def wait(self): "Wait until the executor has stopped polling." + + +class PooledExecutorBase(ExecutorBase): + """An executor that uses a rpc thread pool of a given size.""" + + def __init__(self, conf, listener, callback): + super(PooledExecutorBase, self).__init__(conf, listener, callback) + self.conf.register_opts(_pool_opts) diff --git a/oslo/messaging/_executors/impl_eventlet.py b/oslo/messaging/_executors/impl_eventlet.py index 2eaecaf53..3d11457e0 100644 --- a/oslo/messaging/_executors/impl_eventlet.py +++ b/oslo/messaging/_executors/impl_eventlet.py @@ -21,19 +21,12 @@ from eventlet.green import threading as greenthreading from eventlet import greenpool import greenlet -from oslo.config import cfg from oslo.messaging._executors import base from oslo.messaging import localcontext from oslo.utils import excutils LOG = logging.getLogger(__name__) -_eventlet_opts = [ - cfg.IntOpt('rpc_thread_pool_size', - default=64, - help='Size of RPC greenthread pool.'), -] - def spawn_with(ctxt, pool): """This is the equivalent of a with statement @@ -64,7 +57,7 @@ def spawn_with(ctxt, pool): return thread -class EventletExecutor(base.ExecutorBase): +class EventletExecutor(base.PooledExecutorBase): """A message executor which integrates with eventlet. @@ -77,7 +70,6 @@ class EventletExecutor(base.ExecutorBase): def __init__(self, conf, listener, dispatcher): super(EventletExecutor, self).__init__(conf, listener, dispatcher) - self.conf.register_opts(_eventlet_opts) self._thread = None self._greenpool = greenpool.GreenPool(self.conf.rpc_thread_pool_size) self._running = False diff --git a/oslo/messaging/_executors/impl_thread.py b/oslo/messaging/_executors/impl_thread.py new file mode 100644 index 000000000..57c03bbe0 --- /dev/null +++ b/oslo/messaging/_executors/impl_thread.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools +import sys +import threading + +from concurrent import futures +import six + +from oslo.messaging._executors import base +from oslo.utils import excutils + + +class ThreadExecutor(base.PooledExecutorBase): + """A message executor which integrates with threads. + + A message process that polls for messages from a dispatching thread and + on reception of an incoming message places the message to be processed in + a thread pool to be executed at a later time. + """ + + # NOTE(harlowja): if eventlet is being used and the thread module is monkey + # patched this should/is supposed to work the same as the eventlet based + # executor. + + # NOTE(harlowja): Make it somewhat easy to change this via + # inheritance (since there does exist other executor types that could be + # used/tried here). + _executor_cls = futures.ThreadPoolExecutor + + def __init__(self, conf, listener, dispatcher): + super(ThreadExecutor, self).__init__(conf, listener, dispatcher) + self._poller = None + self._executor = None + self._tombstone = threading.Event() + self._incomplete = collections.deque() + self._mutator = threading.Lock() + + def _completer(self, exit_method, fut): + """Completes futures.""" + try: + exc = fut.exception() + if exc is not None: + exc_type = type(exc) + # Not available on < 3.x due to this being an added feature + # of pep-3134 (exception chaining and embedded tracebacks). + if six.PY3: + exc_tb = exc.__traceback__ + else: + exc_tb = None + if not exit_method(exc_type, exc, exc_tb): + six.reraise(exc_type, exc, tb=exc_tb) + else: + exit_method(None, None, None) + finally: + with self._mutator: + try: + self._incomplete.remove(fut) + except ValueError: + pass + + @excutils.forever_retry_uncaught_exceptions + def _runner(self): + while not self._tombstone.is_set(): + incoming = self.listener.poll() + if incoming is None: + continue + # This is hacky, needs to be fixed.... + context = self.dispatcher(incoming) + enter_method = context.__enter__() + exit_method = context.__exit__ + try: + fut = self._executor.submit(enter_method) + except RuntimeError: + # This is triggered when the executor has been shutdown... + # + # TODO(harlowja): should we put whatever we pulled off back + # since when this is thrown it means the executor has been + # shutdown already?? + exit_method(*sys.exc_info()) + return + else: + with self._mutator: + self._incomplete.append(fut) + # Run the other half (__exit__) when done... + fut.add_done_callback(functools.partial(self._completer, + exit_method)) + + def start(self): + if self._executor is None: + self._executor = self._executor_cls(self.conf.rpc_thread_pool_size) + self._tombstone.clear() + if self._poller is None or not self._poller.is_alive(): + self._poller = threading.Thread(target=self._runner) + self._poller.daemon = True + self._poller.start() + + def stop(self): + if self._executor is not None: + self._executor.shutdown(wait=False) + self._tombstone.set() + self.listener.stop() + + def wait(self): + # TODO(harlowja): this method really needs a timeout. + if self._poller is not None: + self._tombstone.wait() + self._poller.join() + self._poller = None + if self._executor is not None: + with self._mutator: + incomplete_fs = list(self._incomplete) + self._incomplete.clear() + if incomplete_fs: + futures.wait(incomplete_fs, return_when=futures.ALL_COMPLETED) + self._executor = None diff --git a/oslo/messaging/opts.py b/oslo/messaging/opts.py index b101a5833..41264c239 100644 --- a/oslo/messaging/opts.py +++ b/oslo/messaging/opts.py @@ -28,7 +28,7 @@ from oslo.messaging._drivers import matchmaker from oslo.messaging._drivers import matchmaker_redis from oslo.messaging._drivers import matchmaker_ring from oslo.messaging._drivers.protocols.amqp import opts as amqp_opts -from oslo.messaging._executors import impl_eventlet +from oslo.messaging._executors import base from oslo.messaging.notify import notifier from oslo.messaging.rpc import client from oslo.messaging import transport @@ -39,7 +39,7 @@ _global_opt_lists = [ impl_rabbit.rabbit_opts, impl_zmq.zmq_opts, matchmaker.matchmaker_opts, - impl_eventlet._eventlet_opts, + base._pool_opts, notifier._notifier_opts, client._client_opts, transport._transport_opts, diff --git a/requirements.txt b/requirements.txt index fa19517f7..e52ebc333 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,6 @@ kombu>=2.5.0 # middleware oslo.middleware>=0.1.0 # Apache-2.0 + +# for the futures based executor +futures>=2.1.6 diff --git a/tests/executors/test_executor.py b/tests/executors/test_executor.py index 445e2283b..6e0376680 100644 --- a/tests/executors/test_executor.py +++ b/tests/executors/test_executor.py @@ -30,16 +30,19 @@ try: from oslo.messaging._executors import impl_eventlet except ImportError: impl_eventlet = None +from oslo.messaging._executors import impl_thread from tests import utils as test_utils load_tests = testscenarios.load_tests_apply_scenarios class TestExecutor(test_utils.BaseTestCase): - @classmethod def generate_scenarios(cls): - impl = [('blocking', dict(executor=impl_blocking.BlockingExecutor))] + impl = [ + ('blocking', dict(executor=impl_blocking.BlockingExecutor)), + ('threaded', dict(executor=impl_thread.ThreadExecutor)), + ] if impl_eventlet is not None: impl.append( ('eventlet', dict(executor=impl_eventlet.EventletExecutor)))