diff --git a/nova/context.py b/nova/context.py index 6257d5e1b348..ebb5d2f2bed4 100644 --- a/nova/context.py +++ b/nova/context.py @@ -430,9 +430,9 @@ def scatter_gather_cells(context, cell_mappings, timeout, fn, *args, **kwargs): for cell_mapping in cell_mappings: with target_cell(context, cell_mapping) as cctxt: - future = executor.submit( - utils.pass_context_wrapper(gather_result), - cell_mapping.uuid, fn, cctxt, *args, **kwargs) + future = utils.spawn_on( + executor, + gather_result, cell_mapping.uuid, fn, cctxt, *args, **kwargs) tasks[cell_mapping.uuid] = future futurist.waiters.wait_for_all(tasks.values(), timeout) diff --git a/nova/tests/fixtures/nova.py b/nova/tests/fixtures/nova.py index 0bb74b192dab..9e598fe8237c 100644 --- a/nova/tests/fixtures/nova.py +++ b/nova/tests/fixtures/nova.py @@ -1294,37 +1294,24 @@ class IsolatedGreenPoolFixture(fixtures.Fixture): ) -class _FakeFuture(object): - def __init__(self, func, *args, **kwargs): - try: - self._result = func(*args, **kwargs) - self.raised = False - except Exception as e: - self.raised = True - self._result = e - - def cancel(self, *args, **kwargs): - # This method doesn't make sense for a synchronous call, it's just - # defined to satisfy the interface. - pass - - def add_done_callback(self, func): - func(self) - - def result(self): - if self.raised: - raise self._result - - return self._result - - class SpawnIsSynchronousFixture(fixtures.Fixture): """Patch and restore the spawn_* utility methods to be synchronous""" def setUp(self): super(SpawnIsSynchronousFixture, self).setUp() - self.useFixture(fixtures.MonkeyPatch( - 'nova.utils.spawn', _FakeFuture)) + executor = futurist.SynchronousExecutor() + self.addCleanup(executor.shutdown) + + def spawn(*args, **kwargs): + return executor.submit(*args, **kwargs) + + # Just ignore the first arg that is the original executor instance + # and use our test internal synchronous executor. + def spawn_on(_, *args, **kwargs): + return executor.submit(*args, **kwargs) + + self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn', spawn)) + self.useFixture(fixtures.MonkeyPatch('nova.utils.spawn_on', spawn_on)) class BannedDBSchemaOperations(fixtures.Fixture): diff --git a/nova/tests/unit/test_utils.py b/nova/tests/unit/test_utils.py index 9889c8cca714..98555f680d61 100644 --- a/nova/tests/unit/test_utils.py +++ b/nova/tests/unit/test_utils.py @@ -17,6 +17,7 @@ import hashlib import os import os.path import tempfile +import threading from unittest import mock import fixtures @@ -1511,3 +1512,52 @@ class DefaultExecutorTestCase(test.NoDBTestCase): utils.destroy_default_green_pool() self.assertIsNone(utils.DEFAULT_GREEN_POOL) self.assertFalse(executor.alive) + + +class SpawnOnTestCase(test.NoDBTestCase): + def test_spawn_on_submits_work(self): + executor = utils.get_scatter_gather_executor() + task = mock.MagicMock() + + future = utils.spawn_on(executor, task, 13, foo='bar') + future.result() + + task.assert_called_once_with(13, foo='bar') + + @mock.patch.object( + utils, 'concurrency_mode_threading', new=mock.Mock(return_value=True)) + @mock.patch.object(utils.LOG, 'warning') + def test_spawn_on_warns_on_full_executor(self, mock_warning): + # Ensure we have executor for a single task only at a time + self.flags(cell_worker_thread_pool_size=1) + executor = utils.get_scatter_gather_executor() + + work = threading.Event() + started = threading.Event() + + # let the blocked tasks finish after the test case so that the leaked + # thread check is not triggered during cleanup + self.addCleanup(work.set) + + def task(): + started.set() + work.wait() + + # Start two tasks that will wait, the first will execute the second + # will wait in the queue + utils.spawn_on(executor, task) + utils.spawn_on(executor, task) + # wait for the first task to consume the single executor thread + started.wait() + # start one more task to trigger the fullness check. + utils.spawn_on(executor, task) + + # We expect that spawn_on will warn due to the second task being is + # waiting in the queue, and no idle worker thread exists. + mock_warning.assert_called_once_with( + 'The %s pool does not have free threads so the task %s will be ' + 'queued. If this happens repeatedly then the size of the pool is ' + 'too small for the load or there are stuck threads filling the ' + 'pool.', + 'nova.tests.unit.test_utils.SpawnOnTestCase.' + 'test_spawn_on_warns_on_full_executor.cell_worker', task) diff --git a/nova/utils.py b/nova/utils.py index 0ef4e4bb2379..1227146ac412 100644 --- a/nova/utils.py +++ b/nova/utils.py @@ -675,7 +675,7 @@ def _serialize_profile_info(): return trace_info -def pass_context_wrapper(func): +def _pass_context_wrapper(func): """Generalised passthrough method It will grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the @@ -705,7 +705,7 @@ def pass_context(runner, func, *args, **kwargs): runner function """ - return runner(pass_context_wrapper(func), *args, **kwargs) + return runner(_pass_context_wrapper(func), *args, **kwargs) def spawn(func, *args, **kwargs) -> futurist.Future: @@ -719,8 +719,36 @@ def spawn(func, *args, **kwargs) -> futurist.Future: context when using this method to spawn a new thread. """ - return pass_context( - _get_default_green_pool().submit, func, *args, **kwargs) + return spawn_on(_get_default_green_pool(), func, *args, **kwargs) + + +def _executor_is_full(executor): + if concurrency_mode_threading(): + # TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor + # so that we can avoid accessing the internals of the executor + with executor._shutdown_lock: + idle_workers = len([w for w in executor._workers if w.idle]) > 0 + queued_tasks = executor._work_queue.qsize() > 0 + return queued_tasks and not idle_workers + + return False + + +def spawn_on(executor, func, *args, **kwargs) -> futurist.Future: + """Passthrough method to run func on a thread in a given executor. + + It will also grab the context from the threadlocal store and add it to + the store on the new thread. This allows for continuity in logging the + context when using this method to spawn a new thread. + """ + + if _executor_is_full(executor): + LOG.warning( + "The %s pool does not have free threads so the task %s will be " + "queued. If this happens repeatedly then the size of the pool is " + "too small for the load or there are stuck threads filling the " + "pool.", executor.name, func) + return pass_context(executor.submit, func, *args, **kwargs) def tpool_execute(func, *args, **kwargs):