diff --git a/oslo_messaging/_cmd/zmq_broker.py b/oslo_messaging/_cmd/zmq_broker.py index 08f3d0aca..82c1580af 100644 --- a/oslo_messaging/_cmd/zmq_broker.py +++ b/oslo_messaging/_cmd/zmq_broker.py @@ -20,11 +20,11 @@ from oslo_config import cfg from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.zmq_driver.broker import zmq_broker -from oslo_messaging._executors import impl_pooledexecutor +from oslo_messaging import server CONF = cfg.CONF CONF.register_opts(impl_zmq.zmq_opts) -CONF.register_opts(impl_pooledexecutor._pool_opts) +CONF.register_opts(server._pool_opts) CONF.rpc_zmq_native = True diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 8c1a836ba..3196a39d1 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -24,8 +24,8 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client import zmq_client from oslo_messaging._drivers.zmq_driver.server import zmq_server from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging._i18n import _LE +from oslo_messaging import server RPCException = rpc_common.RPCException @@ -160,7 +160,7 @@ class ZmqDriver(base.BaseDriver): raise ImportError(_LE("ZeroMQ is not available!")) conf.register_opts(zmq_opts) - conf.register_opts(impl_pooledexecutor._pool_opts) + conf.register_opts(server._pool_opts) conf.register_opts(base.base_opts) self.conf = conf self.allowed_remote_exmods = allowed_remote_exmods diff --git a/oslo_messaging/_drivers/protocols/amqp/driver.py b/oslo_messaging/_drivers/protocols/amqp/driver.py index c0e01bdc5..04feb2de1 100644 --- a/oslo_messaging/_drivers/protocols/amqp/driver.py +++ b/oslo_messaging/_drivers/protocols/amqp/driver.py @@ -20,6 +20,7 @@ messaging protocol. The driver sends messages and creates subscriptions via 'tasks' that are performed on its behalf via the controller module. """ +import collections import logging import os import threading @@ -27,7 +28,7 @@ import time from oslo_serialization import jsonutils from oslo_utils import importutils -from six import moves +from oslo_utils import timeutils from oslo_messaging._drivers import base from oslo_messaging._drivers import common @@ -114,18 +115,49 @@ class ProtonIncomingMessage(base.RpcIncomingMessage): pass +class Queue(object): + def __init__(self): + self._queue = collections.deque() + self._lock = threading.Lock() + self._pop_wake_condition = threading.Condition(self._lock) + self._started = True + + def put(self, item): + with self._lock: + self._queue.appendleft(item) + self._pop_wake_condition.notify() + + def pop(self, timeout): + with timeutils.StopWatch(timeout) as stop_watcher: + with self._lock: + while len(self._queue) == 0: + if stop_watcher.expired() or not self._started: + return None + self._pop_wake_condition.wait( + stop_watcher.leftover(return_none=True) + ) + return self._queue.pop() + + def stop(self): + with self._lock: + self._started = False + self._pop_wake_condition.notify_all() + + class ProtonListener(base.Listener): def __init__(self, driver): super(ProtonListener, self).__init__(driver.prefetch_size) self.driver = driver - self.incoming = moves.queue.Queue() + self.incoming = Queue() + + def stop(self): + self.incoming.stop() @base.batch_poll_helper def poll(self, timeout=None): - try: - message = self.incoming.get(True, timeout) - except moves.queue.Empty: - return + message = self.incoming.pop(timeout) + if message is None: + return None request, ctxt = unmarshal_request(message) LOG.debug("Returning incoming message") return ProtonIncomingMessage(self, ctxt, request, message) diff --git a/oslo_messaging/_executors/__init__.py b/oslo_messaging/_executors/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py deleted file mode 100644 index 56cded112..000000000 --- a/oslo_messaging/_executors/base.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2013 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - - -@six.add_metaclass(abc.ABCMeta) -class ExecutorBase(object): - - def __init__(self, conf, listener, dispatcher): - self.conf = conf - self.listener = listener - self.dispatcher = dispatcher - - @abc.abstractmethod - def start(self, override_pool_size=None): - """Start polling for incoming messages.""" - - @abc.abstractmethod - def stop(self): - """Stop polling for messages.""" - - @abc.abstractmethod - def wait(self, timeout=None): - """Wait until the executor has stopped polling. - - If a timeout is provided, and it is not ``None`` then this method will - wait up to that amount of time for its components to finish, if not - all components finish in the alloted time, then false will be returned - otherwise true will be returned. - """ diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py deleted file mode 100644 index b788c47f4..000000000 --- a/oslo_messaging/_executors/impl_blocking.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# Copyright 2013 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import futurist -import threading - -from oslo_messaging._executors import impl_pooledexecutor -from oslo_utils import timeutils - - -class FakeBlockingThread(object): - '''A minimal implementation of threading.Thread which does not create a - thread or start executing the target when start() is called. Instead, the - caller must explicitly execute the non-blocking thread.execute() method - after start() has been called. - ''' - - def __init__(self, target): - self._target = target - self._running = False - self._running_cond = threading.Condition() - - def start(self): - if self._running: - # Not a user error. No need to translate. - raise RuntimeError('FakeBlockingThread already started') - - with self._running_cond: - self._running = True - self._running_cond.notify_all() - - def join(self, timeout=None): - with timeutils.StopWatch(duration=timeout) as w, self._running_cond: - while self._running: - self._running_cond.wait(w.leftover(return_none=True)) - - # Thread.join() does not raise an exception on timeout. It is - # the caller's responsibility to check is_alive(). - if w.expired(): - return - - def is_alive(self): - return self._running - - def execute(self): - if not self._running: - # Not a user error. No need to translate. - raise RuntimeError('FakeBlockingThread not started') - - try: - self._target() - finally: - with self._running_cond: - self._running = False - self._running_cond.notify_all() - - -class BlockingExecutor(impl_pooledexecutor.PooledExecutor): - """A message executor which blocks the current thread. - - The blocking executor's start() method functions as a request processing - loop - i.e. it blocks, processes messages and only returns when stop() is - called from a dispatched method. - - Method calls are dispatched in the current thread, so only a single method - call can be executing at once. This executor is likely to only be useful - for simple demo programs. - """ - - _executor_cls = lambda __, ___: futurist.SynchronousExecutor() - _thread_cls = FakeBlockingThread - - def __init__(self, *args, **kwargs): - super(BlockingExecutor, self).__init__(*args, **kwargs) - - def execute(self): - '''Explicitly run the executor in the current context.''' - # NOTE(mdbooth): Splitting start into start and execute for the - # blocking executor closes a potential race. On a non-blocking - # executor, calling start performs some initialisation synchronously - # before starting the executor and returning control to the caller. In - # the non-blocking caller there was no externally visible boundary - # between the completion of initialisation and the start of execution, - # meaning the caller cannot indicate to another thread that - # initialisation is complete. With the split, the start call for the - # blocking executor becomes analogous to the non-blocking case, - # indicating that initialisation is complete. The caller can then - # synchronously call execute. - if self._poller is not None: - self._poller.execute() diff --git a/oslo_messaging/_executors/impl_eventlet.py b/oslo_messaging/_executors/impl_eventlet.py deleted file mode 100644 index a73ed15cb..000000000 --- a/oslo_messaging/_executors/impl_eventlet.py +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# Copyright 2013 New Dream Network, LLC (DreamHost) -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from eventlet.green import threading as greenthreading -import futurist - -from oslo_messaging._executors import impl_pooledexecutor -from oslo_utils import eventletutils - - -class EventletExecutor(impl_pooledexecutor.PooledExecutor): - """A message executor which integrates with eventlet. - - This is an executor which polls for incoming messages from a greenthread - and dispatches each message in its own greenthread powered async - executor. - - The stop() method kills the message polling greenthread and the wait() - method waits for all executor maintained greenthreads to complete. - """ - - def __init__(self, conf, listener, dispatcher): - super(EventletExecutor, self).__init__(conf, listener, dispatcher) - eventletutils.warn_eventlet_not_patched( - expected_patched_modules=['thread'], - what="the 'oslo.messaging eventlet executor'") - - _executor_cls = futurist.GreenThreadPoolExecutor - _lock_cls = greenthreading.Lock - _event_cls = greenthreading.Event - _thread_cls = greenthreading.Thread diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py deleted file mode 100644 index d3f73a104..000000000 --- a/oslo_messaging/_executors/impl_pooledexecutor.py +++ /dev/null @@ -1,155 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import threading - -from futurist import waiters -from oslo_config import cfg -from oslo_utils import excutils -from oslo_utils import timeutils - -from oslo_messaging._executors import base - -_pool_opts = [ - cfg.IntOpt('executor_thread_pool_size', - default=64, - deprecated_name="rpc_thread_pool_size", - help='Size of executor thread pool.'), -] - - -class PooledExecutor(base.ExecutorBase): - """A message executor which integrates with some executor. - - This will create a message thread that polls for messages from a - dispatching thread and on reception of an incoming message places the - message to be processed into a async executor to be executed at a later - time. - """ - - # These may be overridden by subclasses (and implemented using whatever - # objects make most sense for the provided async execution model). - _event_cls = threading.Event - _lock_cls = threading.Lock - - # Pooling and dispatching (executor submission) will happen from a - # thread created from this class/function. - _thread_cls = threading.Thread - - # This one **must** be overridden by a subclass. - _executor_cls = None - - # Blocking function that should wait for all provided futures to finish. - _wait_for_all = staticmethod(waiters.wait_for_all) - - def __init__(self, conf, listener, dispatcher): - super(PooledExecutor, self).__init__(conf, listener, dispatcher) - self.conf.register_opts(_pool_opts) - self._poller = None - self._executor = None - self._tombstone = self._event_cls() - self._incomplete = collections.deque() - self._mutator = self._lock_cls() - - def _do_submit(self, callback): - def _on_done(fut): - with self._mutator: - try: - self._incomplete.remove(fut) - except ValueError: - pass - callback.done() - try: - fut = self._executor.submit(callback.run) - except RuntimeError: - # This is triggered when the executor has been shutdown... - # - # TODO(harlowja): should we put whatever we pulled off back - # since when this is thrown it means the executor has been - # shutdown already?? - callback.done() - return False - else: - with self._mutator: - self._incomplete.append(fut) - # Run the other post processing of the callback when done... - fut.add_done_callback(_on_done) - return True - - @excutils.forever_retry_uncaught_exceptions - def _runner(self): - while not self._tombstone.is_set(): - incoming = self.listener.poll( - timeout=self.dispatcher.batch_timeout, - prefetch_size=self.dispatcher.batch_size) - - if not incoming: - continue - callback = self.dispatcher(incoming) - was_submitted = self._do_submit(callback) - if not was_submitted: - break - - def start(self, override_pool_size=None): - if self._executor is None: - if override_pool_size is not None and int(override_pool_size) < 1: - raise ValueError('The thread pool size should be a positive ' - 'value.') - self._executor = self._executor_cls( - override_pool_size if override_pool_size else - self.conf.executor_thread_pool_size) - self._tombstone.clear() - if self._poller is None or not self._poller.is_alive(): - self._poller = self._thread_cls(target=self._runner) - self._poller.daemon = True - self._poller.start() - - def stop(self): - if self._executor is not None: - self._executor.shutdown(wait=False) - self._tombstone.set() - self.listener.stop() - - def wait(self, timeout=None): - with timeutils.StopWatch(duration=timeout) as w: - poller = self._poller - if poller is not None: - self._tombstone.wait(w.leftover(return_none=True)) - if not self._tombstone.is_set(): - return False - poller.join(w.leftover(return_none=True)) - if poller.is_alive(): - return False - self._poller = None - executor = self._executor - if executor is not None: - with self._mutator: - incomplete_fs = list(self._incomplete) - if incomplete_fs: - (done, not_done) = self._wait_for_all( - incomplete_fs, - timeout=w.leftover(return_none=True)) - with self._mutator: - for fut in done: - try: - self._incomplete.remove(fut) - except ValueError: - pass - if not_done: - return False - self._executor = None - return True diff --git a/oslo_messaging/_executors/impl_thread.py b/oslo_messaging/_executors/impl_thread.py deleted file mode 100644 index 13cc9c563..000000000 --- a/oslo_messaging/_executors/impl_thread.py +++ /dev/null @@ -1,30 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import futurist - -from oslo_messaging._executors import impl_pooledexecutor - - -class ThreadExecutor(impl_pooledexecutor.PooledExecutor): - """A message executor which integrates with threads. - - A message process that polls for messages from a dispatching thread and - on reception of an incoming message places the message to be processed in - a thread pool to be executed at a later time. - """ - - _executor_cls = futurist.ThreadPoolExecutor diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 2ee0318da..eb0878d71 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -22,19 +22,21 @@ import itertools from oslo_messaging._drivers import amqp from oslo_messaging._drivers import base as drivers_base +from oslo_messaging._drivers import impl_pika from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis -from oslo_messaging._executors import impl_pooledexecutor from oslo_messaging.notify import notifier from oslo_messaging.rpc import client +from oslo_messaging import server from oslo_messaging import transport + _global_opt_lists = [ drivers_base.base_opts, impl_zmq.zmq_opts, - impl_pooledexecutor._pool_opts, + server._pool_opts, client._client_opts, transport._transport_opts, ] @@ -44,8 +46,10 @@ _opts = [ ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_notifications', notifier._notifier_opts), - ('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts, - impl_rabbit.rabbit_opts))), + ('oslo_messaging_rabbit', list( + itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts, + impl_pika.pika_opts, impl_pika.pika_pool_opts, + impl_pika.notification_opts, impl_pika.rpc_opts))), ] diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 7f520c8ba..b702c079d 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -29,7 +29,10 @@ import logging import threading import traceback +from oslo_config import cfg from oslo_service import service +from oslo_utils import eventletutils +from oslo_utils import excutils from oslo_utils import timeutils from stevedore import driver @@ -44,6 +47,14 @@ LOG = logging.getLogger(__name__) DEFAULT_LOG_AFTER = 30 +_pool_opts = [ + cfg.IntOpt('executor_thread_pool_size', + default=64, + deprecated_name="rpc_thread_pool_size", + help='Size of executor thread pool.'), +] + + class MessagingServerError(exceptions.MessagingException): """Base class for all MessageHandlingServer exceptions.""" @@ -311,24 +322,33 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): :type executor: str """ self.conf = transport.conf + self.conf.register_opts(_pool_opts) self.transport = transport self.dispatcher = dispatcher - self.executor = executor + self.executor_type = executor + + self.listener = None try: mgr = driver.DriverManager('oslo.messaging.executors', - self.executor) + self.executor_type) except RuntimeError as ex: - raise ExecutorLoadFailure(self.executor, ex) + raise ExecutorLoadFailure(self.executor_type, ex) self._executor_cls = mgr.driver - self._executor_obj = None + + self._work_executor = None + self._poll_executor = None self._started = False super(MessageHandlingServer, self).__init__() + def _submit_work(self, callback): + fut = self._work_executor.submit(callback.run) + fut.add_done_callback(lambda f: callback.done()) + @ordered(reset_after='stop') def start(self, override_pool_size=None): """Start handling incoming messages. @@ -354,18 +374,28 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): self._started = True try: - listener = self.dispatcher._listen(self.transport) + self.listener = self.dispatcher._listen(self.transport) except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) - executor = self._executor_cls(self.conf, listener, self.dispatcher) - executor.start(override_pool_size=override_pool_size) - self._executor_obj = executor - if self.executor == 'blocking': - # N.B. This will be executed unlocked and unordered, so - # we can't rely on the value of self._executor_obj when this runs. - # We explicitly pass the local variable. - return lambda: executor.execute() + executor_opts = {} + + if self.executor_type == "threading": + executor_opts["max_workers"] = ( + override_pool_size or self.conf.executor_thread_pool_size + ) + elif self.executor_type == "eventlet": + eventletutils.warn_eventlet_not_patched( + expected_patched_modules=['thread'], + what="the 'oslo.messaging eventlet executor'") + executor_opts["max_workers"] = ( + override_pool_size or self.conf.executor_thread_pool_size + ) + + self._work_executor = self._executor_cls(**executor_opts) + self._poll_executor = self._executor_cls(**executor_opts) + + return lambda: self._poll_executor.submit(self._runner) @ordered(after='start') def stop(self): @@ -376,7 +406,30 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): some messages, and underlying driver resources associated to this server are still in use. See 'wait' for more details. """ - self._executor_obj.stop() + self.listener.stop() + self._started = False + + @excutils.forever_retry_uncaught_exceptions + def _runner(self): + while self._started: + incoming = self.listener.poll( + timeout=self.dispatcher.batch_timeout, + prefetch_size=self.dispatcher.batch_size) + + if incoming: + self._submit_work(self.dispatcher(incoming)) + + # listener is stopped but we need to process all already consumed + # messages + while True: + incoming = self.listener.poll( + timeout=self.dispatcher.batch_timeout, + prefetch_size=self.dispatcher.batch_size) + + if incoming: + self._submit_work(self.dispatcher(incoming)) + else: + return @ordered(after='stop') def wait(self): @@ -389,12 +442,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner): Once it's finished, the underlying driver resources associated to this server are released (like closing useless network connections). """ - try: - self._executor_obj.wait() - finally: - # Close listener connection after processing all messages - self._executor_obj.listener.cleanup() - self._executor_obj = None + self._poll_executor.shutdown(wait=True) + self._work_executor.shutdown(wait=True) + + # Close listener connection after processing all messages + self.listener.cleanup() def reset(self): """Reset service. diff --git a/oslo_messaging/tests/executors/__init__.py b/oslo_messaging/tests/executors/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py deleted file mode 100644 index bba37b747..000000000 --- a/oslo_messaging/tests/executors/test_executor.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# Copyright 2013 eNovance -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import threading -import time - -from six.moves import mock -import testscenarios - -from oslo_messaging._executors import impl_blocking -try: - from oslo_messaging._executors import impl_eventlet -except ImportError: - impl_eventlet = None -from oslo_messaging._executors import impl_thread -from oslo_messaging import dispatcher as dispatcher_base -from oslo_messaging.tests import utils as test_utils - -load_tests = testscenarios.load_tests_apply_scenarios - - -class TestExecutor(test_utils.BaseTestCase): - @classmethod - def generate_scenarios(cls): - impl = [ - ('blocking', dict(executor=impl_blocking.BlockingExecutor)), - ('threaded', dict(executor=impl_thread.ThreadExecutor)), - ] - if impl_eventlet is not None: - impl.append( - ('eventlet', dict(executor=impl_eventlet.EventletExecutor))) - cls.scenarios = testscenarios.multiply_scenarios(impl) - - @staticmethod - def _run_in_thread(target, executor): - thread = threading.Thread(target=target, args=(executor,)) - thread.daemon = True - thread.start() - return thread - - def _create_dispatcher(self): - if impl_blocking is not None: - blocking_class = impl_blocking.BlockingExecutor - else: - blocking_class = None - is_blocking = (self.executor == blocking_class) - - if is_blocking: - def run_executor(executor): - executor.start() - executor.execute() - executor.wait() - - endpoint = mock.MagicMock(return_value='result') - event = None - else: - def run_executor(executor): - executor.start() - executor.wait() - - endpoint = mock.MagicMock(return_value='result') - event = None - - class Dispatcher(dispatcher_base.DispatcherBase): - def __init__(self, endpoint): - self.endpoint = endpoint - self.result = "not set" - - def _listen(self, transport): - pass - - def callback(self, incoming): - result = self.endpoint(incoming.ctxt, - incoming.message) - self.result = result - return result - - def __call__(self, incoming): - return dispatcher_base.DispatcherExecutorContext( - incoming[0], self.callback) - - return Dispatcher(endpoint), endpoint, event, run_executor - - def test_slow_wait(self): - dispatcher, endpoint, event, run_executor = self._create_dispatcher() - listener = mock.Mock(spec=['poll', 'stop']) - executor = self.executor(self.conf, listener, dispatcher) - incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) - - def fake_poll(timeout=None, prefetch_size=1): - time.sleep(0.1) - if listener.poll.call_count == 10: - if event is not None: - event.wait() - executor.stop() - else: - return incoming_message - - listener.poll.side_effect = fake_poll - thread = self._run_in_thread(run_executor, executor) - self.assertFalse(executor.wait(timeout=0.1)) - thread.join() - self.assertTrue(executor.wait()) - - def test_dead_wait(self): - (dispatcher, _endpoint, _event, - _run_executor) = self._create_dispatcher() - listener = mock.Mock(spec=['poll', 'stop']) - executor = self.executor(self.conf, listener, dispatcher) - executor.stop() - self.assertTrue(executor.wait()) - - def test_executor_dispatch(self): - dispatcher, endpoint, event, run_executor = self._create_dispatcher() - listener = mock.Mock(spec=['poll', 'stop']) - executor = self.executor(self.conf, listener, dispatcher) - incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) - - def fake_poll(timeout=None, prefetch_size=1): - if listener.poll.call_count == 1: - return [incoming_message] - if event is not None: - event.wait() - executor.stop() - - listener.poll.side_effect = fake_poll - thread = self._run_in_thread(run_executor, executor) - thread.join() - endpoint.assert_called_once_with({}, {'payload': 'data'}) - self.assertEqual(dispatcher.result, 'result') - -TestExecutor.generate_scenarios() diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index ac169126b..d802218d0 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -320,7 +320,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # order between events with different categories is not guaranteed received = {} for expected in events: - e = listener.events.get(timeout=0.5) + e = listener.events.get(timeout=1) received[e[0]] = e for expected in events: diff --git a/oslo_messaging/tests/notify/test_listener.py b/oslo_messaging/tests/notify/test_listener.py index 59ed7b5ad..8300e8419 100644 --- a/oslo_messaging/tests/notify/test_listener.py +++ b/oslo_messaging/tests/notify/test_listener.py @@ -146,7 +146,7 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): self.assertIsInstance(listener.dispatcher, dispatcher.NotificationDispatcher) self.assertIs(listener.dispatcher.endpoints, endpoints) - self.assertEqual('blocking', listener.executor) + self.assertEqual('blocking', listener.executor_type) def test_no_target_topic(self): transport = msg_notifier.get_notification_transport( diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 5b747a561..effa640f2 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -127,7 +127,7 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): self.assertIsInstance(server.dispatcher, oslo_messaging.RPCDispatcher) self.assertIs(server.dispatcher.endpoints, endpoints) self.assertIs(server.dispatcher.serializer, serializer) - self.assertEqual('blocking', server.executor) + self.assertEqual('blocking', server.executor_type) def test_server_wait_method(self): transport = oslo_messaging.get_transport(self.conf, url='fake:') @@ -136,11 +136,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): serializer = object() class MagicMockIgnoreArgs(mock.MagicMock): - '''MagicMock ignores arguments. + """MagicMock ignores arguments. A MagicMock which can never misinterpret the arguments passed to it during construction. - ''' + """ def __init__(self, *args, **kwargs): super(MagicMockIgnoreArgs, self).__init__() @@ -149,15 +149,16 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): serializer=serializer) # Mocking executor server._executor_cls = MagicMockIgnoreArgs + server.listener = MagicMockIgnoreArgs() + server.dispatcher = MagicMockIgnoreArgs() # Here assigning executor's listener object to listener variable # before calling wait method, because in wait method we are # setting executor to None. server.start() - listener = server._executor_obj.listener + listener = server.listener server.stop() # call server wait method server.wait() - self.assertIsNone(server._executor_obj) self.assertEqual(1, listener.cleanup.call_count) def test_no_target_server(self): @@ -539,7 +540,7 @@ class TestServerLocking(test_utils.BaseTestCase): super(TestServerLocking, self).setUp(conf=cfg.ConfigOpts()) def _logmethod(name): - def method(self): + def method(self, *args, **kwargs): with self._lock: self._calls.append(name) return method @@ -553,13 +554,8 @@ class TestServerLocking(test_utils.BaseTestCase): self.listener = mock.MagicMock() executors.append(self) - def start(self, override_pool_size=None): - with self._lock: - self._calls.append('start') - - stop = _logmethod('stop') - wait = _logmethod('wait') - execute = _logmethod('execute') + submit = _logmethod('submit') + shutdown = _logmethod('shutdown') self.executors = executors @@ -574,11 +570,10 @@ class TestServerLocking(test_utils.BaseTestCase): self.server.stop() self.server.wait() - self.assertEqual(len(self.executors), 1) - executor = self.executors[0] - self.assertEqual(executor._calls, - ['start', 'execute', 'stop', 'wait']) - self.assertTrue(executor.listener.cleanup.called) + self.assertEqual(len(self.executors), 2) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) + self.assertTrue(self.server.listener.cleanup.called) def test_reversed_order(self): # Test that if we call wait, stop, start, these will be correctly @@ -596,10 +591,9 @@ class TestServerLocking(test_utils.BaseTestCase): self.server.wait() - self.assertEqual(len(self.executors), 1) - executor = self.executors[0] - self.assertEqual(executor._calls, - ['start', 'execute', 'stop', 'wait']) + self.assertEqual(len(self.executors), 2) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) def test_wait_for_running_task(self): # Test that if 2 threads call a method simultaneously, both will wait, @@ -611,16 +605,16 @@ class TestServerLocking(test_utils.BaseTestCase): running_event = threading.Event() done_event = threading.Event() - runner = [None] + _runner = [None] class SteppingFakeExecutor(self.server._executor_cls): - def start(self, override_pool_size=None): + def __init__(self, *args, **kwargs): # Tell the test which thread won the race - runner[0] = eventlet.getcurrent() + _runner[0] = eventlet.getcurrent() running_event.set() start_event.wait() - super(SteppingFakeExecutor, self).start() + super(SteppingFakeExecutor, self).__init__(*args, **kwargs) done_event.set() finish_event.wait() @@ -632,7 +626,7 @@ class TestServerLocking(test_utils.BaseTestCase): # Wait until one of the threads starts running running_event.wait() - runner = runner[0] + runner = _runner[0] waiter = start2 if runner == start1 else start2 waiter_finished = threading.Event() @@ -640,18 +634,16 @@ class TestServerLocking(test_utils.BaseTestCase): # At this point, runner is running start(), and waiter() is waiting for # it to complete. runner has not yet logged anything. - self.assertEqual(1, len(self.executors)) - executor = self.executors[0] - - self.assertEqual(executor._calls, []) + self.assertEqual(0, len(self.executors)) self.assertFalse(waiter_finished.is_set()) # Let the runner log the call start_event.set() done_event.wait() - # We haven't signalled completion yet, so execute shouldn't have run - self.assertEqual(executor._calls, ['start']) + # We haven't signalled completion yet, so submit shouldn't have run + self.assertEqual(1, len(self.executors)) + self.assertEqual(self.executors[0]._calls, []) self.assertFalse(waiter_finished.is_set()) # Let the runner complete @@ -662,7 +654,9 @@ class TestServerLocking(test_utils.BaseTestCase): # Check that both threads have finished, start was only called once, # and execute ran self.assertTrue(waiter_finished.is_set()) - self.assertEqual(executor._calls, ['start', 'execute']) + self.assertEqual(2, len(self.executors)) + self.assertEqual(self.executors[0]._calls, []) + self.assertEqual(self.executors[1]._calls, ['submit']) def test_start_stop_wait_stop_wait(self): # Test that we behave correctly when calling stop/wait more than once. @@ -674,11 +668,10 @@ class TestServerLocking(test_utils.BaseTestCase): self.server.stop() self.server.wait() - self.assertEqual(len(self.executors), 1) - executor = self.executors[0] - self.assertEqual(executor._calls, - ['start', 'execute', 'stop', 'wait']) - self.assertTrue(executor.listener.cleanup.called) + self.assertEqual(len(self.executors), 2) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) + self.assertTrue(self.server.listener.cleanup.called) def test_state_wrapping(self): # Test that we behave correctly if a thread waits, and the server state @@ -712,8 +705,9 @@ class TestServerLocking(test_utils.BaseTestCase): complete_waiting_callback.wait() # The server should have started, but stop should not have been called - self.assertEqual(1, len(self.executors)) - self.assertEqual(self.executors[0]._calls, ['start', 'execute']) + self.assertEqual(2, len(self.executors)) + self.assertEqual(self.executors[0]._calls, []) + self.assertEqual(self.executors[1]._calls, ['submit']) self.assertFalse(thread1_finished.is_set()) self.server.stop() @@ -721,19 +715,20 @@ class TestServerLocking(test_utils.BaseTestCase): # We should have gone through all the states, and thread1 should still # be waiting - self.assertEqual(1, len(self.executors)) - self.assertEqual(self.executors[0]._calls, ['start', 'execute', - 'stop', 'wait']) + self.assertEqual(2, len(self.executors)) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) self.assertFalse(thread1_finished.is_set()) # Start again self.server.start() - # We should now record 2 executors - self.assertEqual(2, len(self.executors)) - self.assertEqual(self.executors[0]._calls, ['start', 'execute', - 'stop', 'wait']) - self.assertEqual(self.executors[1]._calls, ['start', 'execute']) + # We should now record 4 executors (2 for each server) + self.assertEqual(4, len(self.executors)) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) + self.assertEqual(self.executors[2]._calls, []) + self.assertEqual(self.executors[3]._calls, ['submit']) self.assertFalse(thread1_finished.is_set()) # Allow thread1 to complete @@ -742,10 +737,11 @@ class TestServerLocking(test_utils.BaseTestCase): # thread1 should now have finished, and stop should not have been # called again on either the first or second executor - self.assertEqual(2, len(self.executors)) - self.assertEqual(self.executors[0]._calls, ['start', 'execute', - 'stop', 'wait']) - self.assertEqual(self.executors[1]._calls, ['start', 'execute']) + self.assertEqual(4, len(self.executors)) + self.assertEqual(self.executors[0]._calls, ['shutdown']) + self.assertEqual(self.executors[1]._calls, ['submit', 'shutdown']) + self.assertEqual(self.executors[2]._calls, []) + self.assertEqual(self.executors[3]._calls, ['submit']) self.assertTrue(thread1_finished.is_set()) @mock.patch.object(server_module, 'DEFAULT_LOG_AFTER', 1) @@ -810,24 +806,24 @@ class TestServerLocking(test_utils.BaseTestCase): # Start the server, which will also instantiate an executor self.server.start() - - stop_called = threading.Event() + self.server.stop() + shutdown_called = threading.Event() # Patch the executor's stop method to be very slow - def slow_stop(): - stop_called.set() + def slow_shutdown(wait): + shutdown_called.set() eventlet.sleep(10) - self.executors[0].stop = slow_stop + self.executors[0].shutdown = slow_shutdown - # Call stop in a new thread - thread = eventlet.spawn(self.server.stop) + # Call wait in a new thread + thread = eventlet.spawn(self.server.wait) # Wait until the thread is in the slow stop method - stop_called.wait() + shutdown_called.wait() - # Call stop again in the main thread with a timeout + # Call wait again in the main thread with a timeout self.assertRaises(server_module.TaskTimeout, - self.server.stop, timeout=1) + self.server.wait, timeout=1) thread.kill() @mock.patch.object(server_module, 'LOG') diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index d5fcf7ff1..1daa11d3f 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -17,7 +17,7 @@ import testtools import mock -from oslo_messaging._executors import impl_thread +from oslo_messaging import server try: from oslo_messaging import opts except ImportError: @@ -59,6 +59,8 @@ class OptsTestCase(test_utils.BaseTestCase): self._test_list_opts(result) def test_defaults(self): - impl_thread.ThreadExecutor(self.conf, mock.Mock(), mock.Mock()) + transport = mock.Mock() + transport.conf = self.conf + server.MessageHandlingServer(transport, mock.Mock()) opts.set_defaults(self.conf, executor_thread_pool_size=100) self.assertEqual(100, self.conf.executor_thread_pool_size) diff --git a/setup.cfg b/setup.cfg index ee8881582..bc58d750a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,9 +40,9 @@ oslo.messaging.drivers = pika = oslo_messaging._drivers.impl_pika:PikaDriver oslo.messaging.executors = - blocking = oslo_messaging._executors.impl_blocking:BlockingExecutor - eventlet = oslo_messaging._executors.impl_eventlet:EventletExecutor - threading = oslo_messaging._executors.impl_thread:ThreadExecutor + blocking = futurist:SynchronousExecutor + eventlet = futurist:GreenThreadPoolExecutor + threading = futurist:ThreadPoolExecutor oslo.messaging.notify.drivers = messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver