diff --git a/oslo_messaging/_executors/base.py b/oslo_messaging/_executors/base.py index 6fbc15377..7749c0087 100644 --- a/oslo_messaging/_executors/base.py +++ b/oslo_messaging/_executors/base.py @@ -30,12 +30,18 @@ class ExecutorBase(object): @abc.abstractmethod def start(self): - "Start polling for incoming messages." + """Start polling for incoming messages.""" @abc.abstractmethod def stop(self): - "Stop polling for messages." + """Stop polling for messages.""" @abc.abstractmethod - def wait(self): - "Wait until the executor has stopped polling." + def wait(self, timeout=None): + """Wait until the executor has stopped polling. + + If a timeout is provided, and it is not ``None`` then this method will + wait up to that amount of time for its components to finish, if not + all components finish in the alloted time, then false will be returned + otherwise true will be returned. + """ diff --git a/oslo_messaging/_executors/impl_blocking.py b/oslo_messaging/_executors/impl_blocking.py index cc40edd56..b59818f5c 100644 --- a/oslo_messaging/_executors/impl_blocking.py +++ b/oslo_messaging/_executors/impl_blocking.py @@ -26,13 +26,17 @@ class FakeBlockingThread(object): self._target() @staticmethod - def join(): + def join(timeout=None): pass @staticmethod def stop(): pass + @staticmethod + def is_alive(): + return False + class BlockingExecutor(impl_pooledexecutor.PooledExecutor): """A message executor which blocks the current thread. diff --git a/oslo_messaging/_executors/impl_pooledexecutor.py b/oslo_messaging/_executors/impl_pooledexecutor.py index 598229c07..c0837701c 100644 --- a/oslo_messaging/_executors/impl_pooledexecutor.py +++ b/oslo_messaging/_executors/impl_pooledexecutor.py @@ -20,6 +20,7 @@ import threading from futurist import waiters from oslo_config import cfg from oslo_utils import excutils +from oslo_utils import timeutils from oslo_messaging._executors import base @@ -116,16 +117,32 @@ class PooledExecutor(base.ExecutorBase): self._tombstone.set() self.listener.stop() - def wait(self): - # TODO(harlowja): this method really needs a timeout. - if self._poller is not None: - self._tombstone.wait() - self._poller.join() - self._poller = None - if self._executor is not None: - with self._mutator: - incomplete_fs = list(self._incomplete) - self._incomplete.clear() - if incomplete_fs: - self._wait_for_all(incomplete_fs) - self._executor = None + def wait(self, timeout=None): + with timeutils.StopWatch(duration=timeout) as w: + poller = self._poller + if poller is not None: + self._tombstone.wait(w.leftover(return_none=True)) + if not self._tombstone.is_set(): + return False + poller.join(w.leftover(return_none=True)) + if poller.is_alive(): + return False + self._poller = None + executor = self._executor + if executor is not None: + with self._mutator: + incomplete_fs = list(self._incomplete) + if incomplete_fs: + (done, not_done) = self._wait_for_all( + incomplete_fs, + timeout=w.leftover(return_none=True)) + with self._mutator: + for fut in done: + try: + self._incomplete.remove(fut) + except ValueError: + pass + if not_done: + return False + self._executor = None + return True diff --git a/oslo_messaging/tests/executors/test_executor.py b/oslo_messaging/tests/executors/test_executor.py index 3a6b00d9c..007d3ac6a 100644 --- a/oslo_messaging/tests/executors/test_executor.py +++ b/oslo_messaging/tests/executors/test_executor.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import time import threading # eventlet 0.16 with monkey patching does not work yet on Python 3, @@ -71,9 +72,9 @@ class TestExecutor(test_utils.BaseTestCase): thread = threading.Thread(target=target, args=(executor,)) thread.daemon = True thread.start() - thread.join(timeout=30) + return thread - def test_executor_dispatch(self): + def _create_dispatcher(self): if impl_aioeventlet is not None: aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor else: @@ -110,11 +111,13 @@ class TestExecutor(test_utils.BaseTestCase): endpoint = mock.MagicMock(return_value=simple_coroutine('result')) event = eventlet.event.Event() else: + def run_executor(executor): executor.start() executor.wait() endpoint = mock.MagicMock(return_value='result') + event = None class Dispatcher(object): def __init__(self, endpoint): @@ -139,27 +142,52 @@ class TestExecutor(test_utils.BaseTestCase): self.callback, executor_callback) - listener = mock.Mock(spec=['poll', 'stop']) - dispatcher = Dispatcher(endpoint) - executor = self.executor(self.conf, listener, dispatcher) + return Dispatcher(endpoint), endpoint, event, run_executor + def test_slow_wait(self): + dispatcher, endpoint, event, run_executor = self._create_dispatcher() + listener = mock.Mock(spec=['poll', 'stop']) + executor = self.executor(self.conf, listener, dispatcher) incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) def fake_poll(timeout=None): - if is_aioeventlet: - if listener.poll.call_count == 1: - return incoming_message - event.wait() + time.sleep(0.1) + if listener.poll.call_count == 10: + if event is not None: + event.wait() executor.stop() else: - if listener.poll.call_count == 1: - return incoming_message - executor.stop() + return incoming_message listener.poll.side_effect = fake_poll + thread = self._run_in_thread(run_executor, executor) + self.assertFalse(executor.wait(timeout=0.1)) + thread.join() + self.assertTrue(executor.wait()) - self._run_in_thread(run_executor, executor) + def test_dead_wait(self): + dispatcher, _endpoint, _event, _run_executor = self._create_dispatcher() + listener = mock.Mock(spec=['poll', 'stop']) + executor = self.executor(self.conf, listener, dispatcher) + executor.stop() + self.assertTrue(executor.wait()) + def test_executor_dispatch(self): + dispatcher, endpoint, event, run_executor = self._create_dispatcher() + listener = mock.Mock(spec=['poll', 'stop']) + executor = self.executor(self.conf, listener, dispatcher) + incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) + + def fake_poll(timeout=None): + if listener.poll.call_count == 1: + return incoming_message + if event is not None: + event.wait() + executor.stop() + + listener.poll.side_effect = fake_poll + thread = self._run_in_thread(run_executor, executor) + thread.join() endpoint.assert_called_once_with({}, {'payload': 'data'}) self.assertEqual(dispatcher.result, 'result')