Merge "Provide the executor 'wait' function a timeout and use it"

This commit is contained in:
Jenkins 2015-10-15 08:02:32 +00:00 committed by Gerrit Code Review
commit 947ccd7794
4 changed files with 86 additions and 31 deletions

View File

@ -30,12 +30,18 @@ class ExecutorBase(object):
@abc.abstractmethod @abc.abstractmethod
def start(self): def start(self):
"Start polling for incoming messages." """Start polling for incoming messages."""
@abc.abstractmethod @abc.abstractmethod
def stop(self): def stop(self):
"Stop polling for messages." """Stop polling for messages."""
@abc.abstractmethod @abc.abstractmethod
def wait(self): def wait(self, timeout=None):
"Wait until the executor has stopped polling." """Wait until the executor has stopped polling.
If a timeout is provided, and it is not ``None`` then this method will
wait up to that amount of time for its components to finish, if not
all components finish in the alloted time, then false will be returned
otherwise true will be returned.
"""

View File

@ -26,13 +26,17 @@ class FakeBlockingThread(object):
self._target() self._target()
@staticmethod @staticmethod
def join(): def join(timeout=None):
pass pass
@staticmethod @staticmethod
def stop(): def stop():
pass pass
@staticmethod
def is_alive():
return False
class BlockingExecutor(impl_pooledexecutor.PooledExecutor): class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
"""A message executor which blocks the current thread. """A message executor which blocks the current thread.

View File

@ -20,6 +20,7 @@ import threading
from futurist import waiters from futurist import waiters
from oslo_config import cfg from oslo_config import cfg
from oslo_utils import excutils from oslo_utils import excutils
from oslo_utils import timeutils
from oslo_messaging._executors import base from oslo_messaging._executors import base
@ -116,16 +117,32 @@ class PooledExecutor(base.ExecutorBase):
self._tombstone.set() self._tombstone.set()
self.listener.stop() self.listener.stop()
def wait(self): def wait(self, timeout=None):
# TODO(harlowja): this method really needs a timeout. with timeutils.StopWatch(duration=timeout) as w:
if self._poller is not None: poller = self._poller
self._tombstone.wait() if poller is not None:
self._poller.join() self._tombstone.wait(w.leftover(return_none=True))
self._poller = None if not self._tombstone.is_set():
if self._executor is not None: return False
with self._mutator: poller.join(w.leftover(return_none=True))
incomplete_fs = list(self._incomplete) if poller.is_alive():
self._incomplete.clear() return False
if incomplete_fs: self._poller = None
self._wait_for_all(incomplete_fs) executor = self._executor
self._executor = None if executor is not None:
with self._mutator:
incomplete_fs = list(self._incomplete)
if incomplete_fs:
(done, not_done) = self._wait_for_all(
incomplete_fs,
timeout=w.leftover(return_none=True))
with self._mutator:
for fut in done:
try:
self._incomplete.remove(fut)
except ValueError:
pass
if not_done:
return False
self._executor = None
return True

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
import threading import threading
# eventlet 0.16 with monkey patching does not work yet on Python 3, # eventlet 0.16 with monkey patching does not work yet on Python 3,
@ -71,9 +72,9 @@ class TestExecutor(test_utils.BaseTestCase):
thread = threading.Thread(target=target, args=(executor,)) thread = threading.Thread(target=target, args=(executor,))
thread.daemon = True thread.daemon = True
thread.start() thread.start()
thread.join(timeout=30) return thread
def test_executor_dispatch(self): def _create_dispatcher(self):
if impl_aioeventlet is not None: if impl_aioeventlet is not None:
aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor aioeventlet_class = impl_aioeventlet.AsyncioEventletExecutor
else: else:
@ -110,11 +111,13 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value=simple_coroutine('result')) endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
event = eventlet.event.Event() event = eventlet.event.Event()
else: else:
def run_executor(executor): def run_executor(executor):
executor.start() executor.start()
executor.wait() executor.wait()
endpoint = mock.MagicMock(return_value='result') endpoint = mock.MagicMock(return_value='result')
event = None
class Dispatcher(object): class Dispatcher(object):
def __init__(self, endpoint): def __init__(self, endpoint):
@ -139,27 +142,52 @@ class TestExecutor(test_utils.BaseTestCase):
self.callback, self.callback,
executor_callback) executor_callback)
listener = mock.Mock(spec=['poll', 'stop']) return Dispatcher(endpoint), endpoint, event, run_executor
dispatcher = Dispatcher(endpoint)
executor = self.executor(self.conf, listener, dispatcher)
def test_slow_wait(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'}) incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None): def fake_poll(timeout=None):
if is_aioeventlet: time.sleep(0.1)
if listener.poll.call_count == 1: if listener.poll.call_count == 10:
return incoming_message if event is not None:
event.wait() event.wait()
executor.stop() executor.stop()
else: else:
if listener.poll.call_count == 1: return incoming_message
return incoming_message
executor.stop()
listener.poll.side_effect = fake_poll listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
self.assertFalse(executor.wait(timeout=0.1))
thread.join()
self.assertTrue(executor.wait())
self._run_in_thread(run_executor, executor) def test_dead_wait(self):
dispatcher, _endpoint, _event, _run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
executor.stop()
self.assertTrue(executor.wait())
def test_executor_dispatch(self):
dispatcher, endpoint, event, run_executor = self._create_dispatcher()
listener = mock.Mock(spec=['poll', 'stop'])
executor = self.executor(self.conf, listener, dispatcher)
incoming_message = mock.MagicMock(ctxt={}, message={'payload': 'data'})
def fake_poll(timeout=None):
if listener.poll.call_count == 1:
return incoming_message
if event is not None:
event.wait()
executor.stop()
listener.poll.side_effect = fake_poll
thread = self._run_in_thread(run_executor, executor)
thread.join()
endpoint.assert_called_once_with({}, {'payload': 'data'}) endpoint.assert_called_once_with({}, {'payload': 'data'})
self.assertEqual(dispatcher.result, 'result') self.assertEqual(dispatcher.result, 'result')