Use a condition (and/or a dummy one) instead of a lock

Instead of having to spin in the wait method, just use
a condition and block until stopping has actually happened,
when stop happens, it will use the notify_all method to let
any blockers release.

Closes-Bug: #1505730

Change-Id: I3cfbe1bf02d451e379b1dcc23dacb0139c03be76
This commit is contained in:
Mehdi Abaakouk 2015-10-13 18:30:50 +02:00 committed by Joshua Harlow
parent 68a67f8dc7
commit c68266b36b
3 changed files with 64 additions and 39 deletions
oslo_messaging

@ -116,6 +116,29 @@ def fetch_current_thread_functor():
return lambda: threading.current_thread() return lambda: threading.current_thread()
class DummyCondition(object):
def acquire(self):
pass
def notify(self):
pass
def notify_all(self):
pass
def wait(self, timeout=None):
pass
def release(self):
pass
def __enter__(self):
self.acquire()
def __exit__(self, type, value, traceback):
self.release()
class DummyLock(object): class DummyLock(object):
def acquire(self): def acquire(self):
pass pass

@ -27,6 +27,7 @@ import logging
import threading import threading
from oslo_service import service from oslo_service import service
from oslo_utils import timeutils
from stevedore import driver from stevedore import driver
from oslo_messaging._drivers import base as driver_base from oslo_messaging._drivers import base as driver_base
@ -98,9 +99,11 @@ class MessageHandlingServer(service.ServiceBase):
# is fully started. Except for the blocking executor that have # is fully started. Except for the blocking executor that have
# start() that doesn't return # start() that doesn't return
if self.executor != "blocking": if self.executor != "blocking":
self._state_lock = threading.Lock() self._state_cond = threading.Condition()
self._dummy_cond = False
else: else:
self._state_lock = _utils.DummyLock() self._state_cond = _utils.DummyCondition()
self._dummy_cond = True
try: try:
mgr = driver.DriverManager('oslo.messaging.executors', mgr = driver.DriverManager('oslo.messaging.executors',
@ -130,16 +133,18 @@ class MessageHandlingServer(service.ServiceBase):
""" """
if self._executor is not None: if self._executor is not None:
return return
try: with self._state_cond:
listener = self.dispatcher._listen(self.transport) if self._executor is not None:
except driver_base.TransportDriverError as ex: return
raise ServerListenError(self.target, ex) try:
listener = self.dispatcher._listen(self.transport)
with self._state_lock: except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self._running = True self._running = True
self._executor = self._executor_cls(self.conf, listener, self._executor = self._executor_cls(self.conf, listener,
self.dispatcher) self.dispatcher)
self._executor.start() self._executor.start()
self._state_cond.notify_all()
def stop(self): def stop(self):
"""Stop handling incoming messages. """Stop handling incoming messages.
@ -149,10 +154,11 @@ class MessageHandlingServer(service.ServiceBase):
some messages, and underlying driver resources associated to this some messages, and underlying driver resources associated to this
server are still in use. See 'wait' for more details. server are still in use. See 'wait' for more details.
""" """
with self._state_lock: with self._state_cond:
if self._executor is not None: if self._executor is not None:
self._running = False self._running = False
self._executor.stop() self._executor.stop()
self._state_cond.notify_all()
def wait(self): def wait(self):
"""Wait for message processing to complete. """Wait for message processing to complete.
@ -164,21 +170,37 @@ class MessageHandlingServer(service.ServiceBase):
Once it's finished, the underlying driver resources associated to this Once it's finished, the underlying driver resources associated to this
server are released (like closing useless network connections). server are released (like closing useless network connections).
""" """
with self._state_lock: with self._state_cond:
if self._running: if self._running:
# NOTE(dims): Need to change this to raise RuntimeError after
# verifying/fixing other openstack projects (like Neutron)
# work ok with this change
LOG.warn(_LW("wait() should be called after stop() as it " LOG.warn(_LW("wait() should be called after stop() as it "
"waits for existing messages to finish " "waits for existing messages to finish "
"processing")) "processing"))
w = timeutils.StopWatch()
if self._executor is not None: w.start()
self._executor.wait() while self._running:
# Close listener connection after processing all messages # NOTE(harlowja): 1.0 seconds was mostly chosen at
self._executor.listener.cleanup() # random, but it seems like a reasonable value to
# use to avoid spamming the logs with to much
# information.
self._state_cond.wait(1.0)
if self._running and not self._dummy_cond:
LOG.warn(
_LW("wait() should be have been called"
" after stop() as wait() waits for existing"
" messages to finish processing, it has"
" been %0.2f seconds and stop() still has"
" not been called"), w.elapsed())
executor = self._executor
self._executor = None self._executor = None
if executor is not None:
# We are the lucky calling thread to wait on the executor to
# actually finish.
try:
executor.wait()
finally:
# Close listener connection after processing all messages
executor.listener.cleanup()
executor = None
def reset(self): def reset(self):
"""Reset service. """Reset service.

@ -130,26 +130,6 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
self.assertIsNone(server._executor) self.assertIsNone(server._executor)
self.assertEqual(1, listener.cleanup.call_count) self.assertEqual(1, listener.cleanup.call_count)
@mock.patch('oslo_messaging._executors.impl_pooledexecutor.'
'PooledExecutor.wait')
def test_server_invalid_wait_running_server(self, mock_wait):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo', server='bar')
endpoints = [object()]
serializer = object()
server = oslo_messaging.get_rpc_server(transport, target, endpoints,
serializer=serializer,
executor='eventlet')
self.addCleanup(server.wait)
self.addCleanup(server.stop)
server.start()
with mock.patch('logging.Logger.warn') as warn:
server.wait()
warn.assert_called_with('wait() should be called after '
'stop() as it waits for existing '
'messages to finish processing')
def test_no_target_server(self): def test_no_target_server(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:') transport = oslo_messaging.get_transport(self.conf, url='fake:')