Ensure callback variable capture + cleanup is done correctly

It appears the the callback variable that was being called on
future done was not the right one, due to the lambda capture
mechanism referring to a lazy variable which would potentially
be a different callback by the time the future would finish so
make sure we capture the right one and ensure the future has access
to it.

This adds a helper method that submission will go through
to ensure that the callback variable is correctly captured
in the created lambda and also ensures that the incomplete futures
list is cleaned up (when the future is done).

The notification listener tests use now eventlet to show up this
issue that doesn't occur with blocking executor.

Closes-bug: #1474943

Closes-bug: #1475307

Change-Id: I23e393d504662532572b5b344b87387be6d7bcb1
This commit is contained in:
Joshua Harlow 2015-07-15 10:26:06 -07:00
parent 8a9b5d44ae
commit 02a3a39814
2 changed files with 43 additions and 17 deletions
oslo_messaging

@ -65,6 +65,31 @@ class PooledExecutor(base.ExecutorBase):
self._incomplete = collections.deque()
self._mutator = self._lock_cls()
def _do_submit(self, callback):
def _on_done(fut):
with self._mutator:
try:
self._incomplete.remove(fut)
except ValueError:
pass
callback.done()
try:
fut = self._executor.submit(callback.run)
except RuntimeError:
# This is triggered when the executor has been shutdown...
#
# TODO(harlowja): should we put whatever we pulled off back
# since when this is thrown it means the executor has been
# shutdown already??
callback.done()
return False
else:
with self._mutator:
self._incomplete.append(fut)
# Run the other post processing of the callback when done...
fut.add_done_callback(_on_done)
return True
@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while not self._tombstone.is_set():
@ -72,21 +97,9 @@ class PooledExecutor(base.ExecutorBase):
if incoming is None:
continue
callback = self.dispatcher(incoming, self._executor_callback)
try:
fut = self._executor.submit(callback.run)
except RuntimeError:
# This is triggered when the executor has been shutdown...
#
# TODO(harlowja): should we put whatever we pulled off back
# since when this is thrown it means the executor has been
# shutdown already??
callback.done()
return
else:
with self._mutator:
self._incomplete.append(fut)
# Run the other post processing of the callback when done...
fut.add_done_callback(lambda f: callback.done())
was_submitted = self._do_submit(callback)
if not was_submitted:
break
def start(self):
if self._executor is None:

@ -31,17 +31,30 @@ class RestartableServerThread(object):
def __init__(self, server):
self.server = server
self.thread = None
self._started = threading.Event()
self._tombstone = threading.Event()
def start(self):
self._tombstone.clear()
if self.thread is None:
self.thread = threading.Thread(target=self.server.start)
self._started.clear()
self.thread = threading.Thread(target=self._target)
self.thread.daemon = True
self.thread.start()
self._started.wait()
def _target(self):
self.server.start()
self._started.set()
self._tombstone.wait()
def stop(self):
if self.thread is not None:
# Check start() does nothing with a running listener
self.server.start()
self._tombstone.set()
if self.thread is not None:
self.server.stop()
self.server.wait()
self.thread.join(timeout=15)
@ -101,7 +114,7 @@ class ListenerSetupMixin(object):
tracker_name, self.ThreadTracker())
listener = oslo_messaging.get_notification_listener(
transport, targets=targets, endpoints=[tracker] + endpoints,
allow_requeue=True, pool=pool)
allow_requeue=True, pool=pool, executor='eventlet')
thread = RestartableServerThread(listener)
tracker.start(thread)