Merge "Fix a race calling blocking MessageHandlingServer.start()"

This commit is contained in:
Jenkins 2015-10-21 16:36:33 +00:00 committed by Gerrit Code Review
commit 22e574ff03
4 changed files with 108 additions and 27 deletions
oslo_messaging

@ -14,28 +14,57 @@
# under the License.
import futurist
import threading
from oslo_messaging._executors import impl_pooledexecutor
from oslo_utils import timeutils
class FakeBlockingThread(object):
'''A minimal implementation of threading.Thread which does not create a
thread or start executing the target when start() is called. Instead, the
caller must explicitly execute the non-blocking thread.execute() method
after start() has been called.
'''
def __init__(self, target):
self._target = target
self._running = False
self._running_cond = threading.Condition()
def start(self):
self._target()
if self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread already started')
@staticmethod
def join(timeout=None):
pass
with self._running_cond:
self._running = True
self._running_cond.notify_all()
@staticmethod
def stop():
pass
def join(self, timeout=None):
with timeutils.StopWatch(duration=timeout) as w, self._running_cond:
while self._running:
self._running_cond.wait(w.leftover(return_none=True))
@staticmethod
def is_alive():
return False
# Thread.join() does not raise an exception on timeout. It is
# the caller's responsibility to check is_alive().
if w.expired():
return
def is_alive(self):
return self._running
def execute(self):
if not self._running:
# Not a user error. No need to translate.
raise RuntimeError('FakeBlockingThread not started')
try:
self._target()
finally:
with self._running_cond:
self._running = False
self._running_cond.notify_all()
class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
@ -52,3 +81,22 @@ class BlockingExecutor(impl_pooledexecutor.PooledExecutor):
_executor_cls = lambda __, ___: futurist.SynchronousExecutor()
_thread_cls = FakeBlockingThread
def __init__(self, *args, **kwargs):
super(BlockingExecutor, self).__init__(*args, **kwargs)
def execute(self):
'''Explicitly run the executor in the current context.'''
# NOTE(mdbooth): Splitting start into start and execute for the
# blocking executor closes a potential race. On a non-blocking
# executor, calling start performs some initialisation synchronously
# before starting the executor and returning control to the caller. In
# the non-blocking caller there was no externally visible boundary
# between the completion of initialisation and the start of execution,
# meaning the caller cannot indicate to another thread that
# initialisation is complete. With the split, the start call for the
# blocking executor becomes analogous to the non-blocking case,
# indicating that initialisation is complete. The caller can then
# synchronously call execute.
if self._poller is not None:
self._poller.execute()

@ -140,12 +140,15 @@ class MessageHandlingServer(service.ServiceBase):
listener = self.dispatcher._listen(self.transport)
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self._running = True
self._executor_obj = self._executor_cls(self.conf, listener,
self.dispatcher)
self._executor_obj.start()
self._running = True
self._state_cond.notify_all()
if self.executor == 'blocking':
self._executor_obj.execute()
def stop(self):
"""Stop handling incoming messages.

@ -81,6 +81,12 @@ class TestExecutor(test_utils.BaseTestCase):
aioeventlet_class = None
is_aioeventlet = (self.executor == aioeventlet_class)
if impl_blocking is not None:
blocking_class = impl_blocking.BlockingExecutor
else:
blocking_class = None
is_blocking = (self.executor == blocking_class)
if is_aioeventlet:
policy = aioeventlet.EventLoopPolicy()
trollius.set_event_loop_policy(policy)
@ -110,8 +116,15 @@ class TestExecutor(test_utils.BaseTestCase):
endpoint = mock.MagicMock(return_value=simple_coroutine('result'))
event = eventlet.event.Event()
else:
elif is_blocking:
def run_executor(executor):
executor.start()
executor.execute()
executor.wait()
endpoint = mock.MagicMock(return_value='result')
event = None
else:
def run_executor(executor):
executor.start()
executor.wait()

@ -27,22 +27,38 @@ load_tests = testscenarios.load_tests_apply_scenarios
class ServerSetupMixin(object):
class Server(object):
class Server(threading.Thread):
def __init__(self, transport, topic, server, endpoint, serializer):
self.controller = ServerSetupMixin.ServerController()
target = oslo_messaging.Target(topic=topic, server=server)
self._server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint, self],
serializer=serializer)
self.server = oslo_messaging.get_rpc_server(transport,
target,
[endpoint,
self.controller],
serializer=serializer)
super(ServerSetupMixin.Server, self).__init__()
self.daemon = True
def wait(self):
# Wait for the executor to process the stop message, indicating all
# test messages have been processed
self.controller.stopped.wait()
# Check start() does nothing with a running server
self.server.start()
self.server.stop()
self.server.wait()
def run(self):
self.server.start()
class ServerController(object):
def __init__(self):
self.stopped = threading.Event()
def stop(self, ctxt):
# Check start() does nothing with a running server
self._server.start()
self._server.stop()
self._server.wait()
def start(self):
self._server.start()
self.stopped.set()
class TestSerializer(object):
@ -72,13 +88,14 @@ class ServerSetupMixin(object):
thread.daemon = True
thread.start()
return thread
return server
def _stop_server(self, client, server_thread, topic=None):
def _stop_server(self, client, server, topic=None):
if topic is not None:
client = client.prepare(topic=topic)
client.cast({}, 'stop')
server_thread.join(timeout=30)
server.wait()
def _setup_client(self, transport, topic='testtopic'):
return oslo_messaging.RPCClient(transport,