Add spawn_on

Now that spawn is using the default executor and returns a Future we can
have a spawn_on that takes the executor as a parameter so the
scatter-gather code path can use the same context passing logic but use
a separate executor.

This also change spawn() to use spawn_on() with the default executor.
And warns if the executor is busy an therefore the task is queued.

During testing this we discovered that SpawnIsSynchronous fixture made a
wrong assumption that eventlet.spawn(f) would raise the exception if f
raises. It does not. So now when we changed this fixture to the new
executor structure we fixed this test bug. There was two unit test
cases that depended on the wrong behavior they are adapted now.

Change-Id: I3bb40f5d2446dfd06253371d6abe8d3449b11265
Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
Balazs Gibizer
2025-04-24 16:16:04 +02:00
parent 81a03ab824
commit b215c6fee9
4 changed files with 98 additions and 33 deletions

View File

@@ -430,9 +430,9 @@ def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs):
for cell_mapping in cell_mappings:
with target_cell(context, cell_mapping) as cctxt:
future = executor.submit(
utils.pass_context_wrapper(gather_result),
cell_mapping.uuid, fn, cctxt, *args, **kwargs)
future = utils.spawn_on(
executor,
gather_result, cell_mapping.uuid, fn, cctxt, *args, **kwargs)
tasks[cell_mapping.uuid] = future
futurist.waiters.wait_for_all(tasks.values(), timeout)

View File

@@ -1294,37 +1294,24 @@ class IsolatedGreenPoolFixture(fixtures.Fixture):
)
class _FakeFuture(object):
def __init__(self, func, *args, **kwargs):
try:
self._result = func(*args, **kwargs)
self.raised = False
except Exception as e:
self.raised = True
self._result = e
def cancel(self, *args, **kwargs):
# This method doesn't make sense for a synchronous call, it's just
# defined to satisfy the interface.
pass
def add_done_callback(self, func):
func(self)
def result(self):
if self.raised:
raise self._result
return self._result
class SpawnIsSynchronousFixture(fixtures.Fixture):
"""Patch and restore the spawn_* utility methods to be synchronous"""
def setUp(self):
super(SpawnIsSynchronousFixture, self).setUp()
self.useFixture(fixtures.MonkeyPatch(
'nova.utils.spawn', _FakeFuture))
executor = futurist.SynchronousExecutor()
self.addCleanup(executor.shutdown)
def spawn(*args, **kwargs):
return executor.submit(*args, **kwargs)
# Just ignore the first arg that is the original executor instance
# and use our test internal synchronous executor.
def spawn_on(_, *args, **kwargs):
return executor.submit(*args, **kwargs)
self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn', spawn))
self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn_on', spawn_on))
class BannedDBSchemaOperations(fixtures.Fixture):

View File

@@ -17,6 +17,7 @@ import hashlib
import os
import os.path
import tempfile
import threading
from unittest import mock
import fixtures
@@ -1511,3 +1512,52 @@ class DefaultExecutorTestCase(test.NoDBTestCase):
utils.destroy_default_green_pool()
self.assertIsNone(utils.DEFAULT_GREEN_POOL)
self.assertFalse(executor.alive)
class SpawnOnTestCase(test.NoDBTestCase):
def test_spawn_on_submits_work(self):
executor = utils.get_scatter_gather_executor()
task = mock.MagicMock()
future = utils.spawn_on(executor, task, 13, foo='bar')
future.result()
task.assert_called_once_with(13, foo='bar')
@mock.patch.object(
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=True))
@mock.patch.object(utils.LOG, 'warning')
def test_spawn_on_warns_on_full_executor(self, mock_warning):
# Ensure we have executor for a single task only at a time
self.flags(cell_worker_thread_pool_size=1)
executor = utils.get_scatter_gather_executor()
work = threading.Event()
started = threading.Event()
# let the blocked tasks finish after the test case so that the leaked
# thread check is not triggered during cleanup
self.addCleanup(work.set)
def task():
started.set()
work.wait()
# Start two tasks that will wait, the first will execute the second
# will wait in the queue
utils.spawn_on(executor, task)
utils.spawn_on(executor, task)
# wait for the first task to consume the single executor thread
started.wait()
# start one more task to trigger the fullness check.
utils.spawn_on(executor, task)
# We expect that spawn_on will warn due to the second task being is
# waiting in the queue, and no idle worker thread exists.
mock_warning.assert_called_once_with(
'The %s pool does not have free threads so the task %s will be '
'queued. If this happens repeatedly then the size of the pool is '
'too small for the load or there are stuck threads filling the '
'pool.',
'nova.tests.unit.test_utils.SpawnOnTestCase.'
'test_spawn_on_warns_on_full_executor.cell_worker', task)

View File

@@ -675,7 +675,7 @@ def _serialize_profile_info():
return trace_info
def pass_context_wrapper(func):
def _pass_context_wrapper(func):
"""Generalised passthrough method
It will grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
@@ -705,7 +705,7 @@ def pass_context(runner, func, *args, **kwargs):
runner function
"""
return runner(pass_context_wrapper(func), *args, **kwargs)
return runner(_pass_context_wrapper(func), *args, **kwargs)
def spawn(func, *args, **kwargs) -> futurist.Future:
@@ -719,8 +719,36 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
context when using this method to spawn a new thread.
"""
return pass_context(
_get_default_green_pool().submit, func, *args, **kwargs)
return spawn_on(_get_default_green_pool(), func, *args, **kwargs)
def _executor_is_full(executor):
if concurrency_mode_threading():
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
# so that we can avoid accessing the internals of the executor
with executor._shutdown_lock:
idle_workers = len([w for w in executor._workers if w.idle]) > 0
queued_tasks = executor._work_queue.qsize() > 0
return queued_tasks and not idle_workers
return False
def spawn_on(executor, func, *args, **kwargs) -> futurist.Future:
"""Passthrough method to run func on a thread in a given executor.
It will also grab the context from the threadlocal store and add it to
the store on the new thread. This allows for continuity in logging the
context when using this method to spawn a new thread.
"""
if _executor_is_full(executor):
LOG.warning(
"The %s pool does not have free threads so the task %s will be "
"queued. If this happens repeatedly then the size of the pool is "
"too small for the load or there are stuck threads filling the "
"pool.", executor.name, func)
return pass_context(executor.submit, func, *args, **kwargs)
def tpool_execute(func, *args, **kwargs):