diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index b59818f5c..b788c47f4 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -14,28 +14,57 @@ # under the License. import futurist +import threading from oslo_messaging._executors import impl_pooledexecutor +from oslo_utils import timeutils class FakeBlockingThread(object): + '''A minimal implementation of threading.Thread which does not create a + thread or start executing the target when start() is called. Instead, the + caller must explicitly execute the non-blocking thread.execute() method + after start() has been called. + ''' + def __init__(self, target): self._target = target + self._running = False + self._running_cond = threading.Condition() def start(self): - self._target() + if self._running: + # Not a user error. No need to translate. + raise RuntimeError('FakeBlockingThread already started') - @staticmethod - def join(timeout=None): - pass + with self._running_cond: + self._running = True + self._running_cond.notify_all() - @staticmethod - def stop(): - pass + def join(self, timeout=None): + with timeutils.StopWatch(duration=timeout) as w, self._running_cond: + while self._running: + self._running_cond.wait(w.leftover(return_none=True)) - @staticmethod - def is_alive(): - return False + # Thread.join() does not raise an exception on timeout. It is + # the caller's responsibility to check is_alive(). + if w.expired(): + return + + def is_alive(self): + return self._running + + def execute(self): + if not self._running: + # Not a user error. No need to translate. + raise RuntimeError('FakeBlockingThread not started') + + try: + self._target() + finally: + with self._running_cond: + self._running = False + self._running_cond.notify_all() class BlockingExecutor(impl_pooledexecutor.PooledExecutor): @@ -52,3 +81,22 @@ class BlockingExecutor(impl_pooledexecutor.PooledExecutor): _executor_cls = lambda __, ___: futurist.SynchronousExecutor() _thread_cls = FakeBlockingThread + + def __init__(self, *args, **kwargs): + super(BlockingExecutor, self).__init__(*args, **kwargs) + + def execute(self): + '''Explicitly run the executor in the current context.''' + # NOTE(mdbooth): Splitting start into start and execute for the + # blocking executor closes a potential race. On a non-blocking + # executor, calling start performs some initialisation synchronously + # before starting the executor and returning control to the caller. In + # the non-blocking caller there was no externally visible boundary + # between the completion of initialisation and the start of execution, + # meaning the caller cannot indicate to another thread that + # initialisation is complete. With the split, the start call for the + # blocking executor becomes analogous to the non-blocking case, + # indicating that initialisation is complete. The caller can then + # synchronously call execute. + if self._poller is not None: + self._poller.execute() diff --git a/oslo_messaging/server.py b/oslo_messaging/server.py index 02bae191a..491ccbf52 100644 --- a/oslo_messaging/server.py +++ b/oslo_messaging/server.py @@ -140,12 +140,15 @@ class MessageHandlingServer(service.ServiceBase): listener = self.dispatcher._listen(self.transport) except driver_base.TransportDriverError as ex: raise ServerListenError(self.target, ex) - self._running = True self._executor_obj = self._executor_cls(self.conf, listener, self.dispatcher) self._executor_obj.start() + self._running = True self._state_cond.notify_all() + if self.executor == 'blocking': + self._executor_obj.execute() + def stop(self): """Stop handling incoming messages. diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 007d3ac6a..1e175fdf8 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -81,6 +81,12 @@ class TestExecutor(test_utils.BaseTestCase): aioeventlet_class = None is_aioeventlet = (self.executor == aioeventlet_class) + if impl_blocking is not None: + blocking_class = impl_blocking.BlockingExecutor + else: + blocking_class = None + is_blocking = (self.executor == blocking_class) + if is_aioeventlet: policy = aioeventlet.EventLoopPolicy() trollius.set_event_loop_policy(policy) @@ -110,8 +116,15 @@ class TestExecutor(test_utils.BaseTestCase): endpoint = mock.MagicMock(return_value=simple_coroutine('result')) event = eventlet.event.Event() - else: + elif is_blocking: + def run_executor(executor): + executor.start() + executor.execute() + executor.wait() + endpoint = mock.MagicMock(return_value='result') + event = None + else: def run_executor(executor): executor.start() executor.wait() diff --git a/oslo_messaging/tests/rpc/test_server.py b/oslo_messaging/tests/rpc/test_server.py index 9a2b53b24..258dacb24 100644 --- a/oslo_messaging/tests/rpc/test_server.py +++ b/oslo_messaging/tests/rpc/test_server.py @@ -27,22 +27,38 @@ load_tests = testscenarios.load_tests_apply_scenarios class ServerSetupMixin(object): - class Server(object): + class Server(threading.Thread): def __init__(self, transport, topic, server, endpoint, serializer): + self.controller = ServerSetupMixin.ServerController() target = oslo_messaging.Target(topic=topic, server=server) - self._server = oslo_messaging.get_rpc_server(transport, - target, - [endpoint, self], - serializer=serializer) + self.server = oslo_messaging.get_rpc_server(transport, + target, + [endpoint, + self.controller], + serializer=serializer) + + super(ServerSetupMixin.Server, self).__init__() + self.daemon = True + + def wait(self): + # Wait for the executor to process the stop message, indicating all + # test messages have been processed + self.controller.stopped.wait() + + # Check start() does nothing with a running server + self.server.start() + self.server.stop() + self.server.wait() + + def run(self): + self.server.start() + + class ServerController(object): + def __init__(self): + self.stopped = threading.Event() def stop(self, ctxt): - # Check start() does nothing with a running server - self._server.start() - self._server.stop() - self._server.wait() - - def start(self): - self._server.start() + self.stopped.set() class TestSerializer(object): @@ -72,13 +88,14 @@ class ServerSetupMixin(object): thread.daemon = True thread.start() - return thread + return server - def _stop_server(self, client, server_thread, topic=None): + def _stop_server(self, client, server, topic=None): if topic is not None: client = client.prepare(topic=topic) client.cast({}, 'stop') - server_thread.join(timeout=30) + server.wait() + def _setup_client(self, transport, topic='testtopic'): return oslo_messaging.RPCClient(transport,