Make the default executor configurable
This patch adds the logic to initialize our DEFAULT_EXECUTOR based on the concurrency mode. If the OS_NOVA_DISABLE_EVENTLET_PATCHING env variable is set to "true" then we use ThreadPoolExecutor so the service will use native threads. Otherwise the service will keep using the GreenThreadPoolExecutor and therefore using Eventlet. There is a new config option `[DEFAULT]default_thread_pool_size` added to define the size of the ThreadPoolExecutor and the existing `[DEFAULT]default_green_pool_size` is deprecated. The two config options are independent and each of them is only used if the service is started with the matching concurrency mode. Our test fixture that catches leaked threads are adapted to handle both threading mode. But at the moment it is only tested with the eventlet mode as no service can be switched to threading mode yet. Change-Id: I17b8065e8b14f0401297258e43dc2a98031d61f4 Signed-off-by: Balazs Gibizer <gibi@redhat.com>
This commit is contained in:
@@ -62,11 +62,27 @@ entry.
|
||||
help='Explicitly specify the temporary working directory.'),
|
||||
cfg.IntOpt(
|
||||
'default_green_pool_size',
|
||||
deprecated_for_removal=True,
|
||||
deprecated_since='32.0.0',
|
||||
deprecated_reason="""
|
||||
This option is only used if the service is running in Eventlet mode. When
|
||||
that mode is removed this config option will be removed too.
|
||||
""",
|
||||
default=1000,
|
||||
min=100,
|
||||
help='''
|
||||
The total number of coroutines that can be run via nova's default
|
||||
greenthread pool concurrently, defaults to 1000, min value is 100.
|
||||
greenthread pool concurrently, defaults to 1000, min value is 100. It is only
|
||||
used if the service is running in Eventlet mode.
|
||||
'''),
|
||||
cfg.IntOpt(
|
||||
'default_thread_pool_size',
|
||||
default=10,
|
||||
min=1,
|
||||
help='''
|
||||
The total number of threads that can be run via nova's default
|
||||
thread pool concurrently. It is only used if the service is running in
|
||||
native threading mode.
|
||||
'''),
|
||||
cfg.IntOpt(
|
||||
'cell_worker_thread_pool_size',
|
||||
|
@@ -199,7 +199,7 @@ class TestCase(base.BaseTestCase):
|
||||
self._service_fixture_count = collections.defaultdict(int)
|
||||
|
||||
self.useFixture(nova_fixtures.OpenStackSDKFixture())
|
||||
self.useFixture(nova_fixtures.IsolatedGreenPoolFixture(self.id()))
|
||||
self.useFixture(nova_fixtures.IsolatedExecutorFixture(self.id()))
|
||||
|
||||
self.useFixture(log_fixture.get_logging_handle_error_fixture())
|
||||
|
||||
|
8
nova/tests/fixtures/nova.py
vendored
8
nova/tests/fixtures/nova.py
vendored
@@ -1185,11 +1185,11 @@ class IndirectionAPIFixture(fixtures.Fixture):
|
||||
self.addCleanup(self.cleanup)
|
||||
|
||||
|
||||
class IsolatedGreenPoolFixture(fixtures.Fixture):
|
||||
"""isolate each test to a dedicated greenpool.
|
||||
class IsolatedExecutorFixture(fixtures.Fixture):
|
||||
"""isolate each test to a dedicated executor.
|
||||
|
||||
Replace the default shared greenpool with a pre test greenpool
|
||||
and wait for all greenthreads to finish in test cleanup.
|
||||
Replace the default shared executor with a per test executor
|
||||
and wait for all threads to finish in test cleanup.
|
||||
"""
|
||||
|
||||
def __init__(self, test):
|
||||
|
@@ -1500,10 +1500,21 @@ class DefaultExecutorTestCase(test.NoDBTestCase):
|
||||
|
||||
@mock.patch.object(
|
||||
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=False))
|
||||
def test_executor_type_eventlet(self):
|
||||
def test_executor_type_and_size_eventlet(self):
|
||||
self.flags(default_green_pool_size=113)
|
||||
executor = utils._get_default_executor()
|
||||
|
||||
self.assertEqual('GreenThreadPoolExecutor', type(executor).__name__)
|
||||
self.assertEqual(113, executor._max_workers)
|
||||
|
||||
@mock.patch.object(
|
||||
utils, 'concurrency_mode_threading', new=mock.Mock(return_value=True))
|
||||
def test_executor_type_and_size_threading(self):
|
||||
self.flags(default_thread_pool_size=13)
|
||||
executor = utils._get_default_executor()
|
||||
|
||||
self.assertEqual('ThreadPoolExecutor', type(executor).__name__)
|
||||
self.assertEqual(13, executor._max_workers)
|
||||
|
||||
def test_executor_destroy(self):
|
||||
executor = utils._get_default_executor()
|
||||
|
@@ -111,9 +111,14 @@ def _get_default_executor():
|
||||
global DEFAULT_EXECUTOR
|
||||
|
||||
if not DEFAULT_EXECUTOR:
|
||||
DEFAULT_EXECUTOR = futurist.GreenThreadPoolExecutor(
|
||||
CONF.default_green_pool_size
|
||||
)
|
||||
if concurrency_mode_threading():
|
||||
DEFAULT_EXECUTOR = futurist.ThreadPoolExecutor(
|
||||
CONF.default_thread_pool_size
|
||||
)
|
||||
else:
|
||||
DEFAULT_EXECUTOR = futurist.GreenThreadPoolExecutor(
|
||||
CONF.default_green_pool_size
|
||||
)
|
||||
|
||||
pname = multiprocessing.current_process().name
|
||||
executor_name = f"{pname}.default"
|
||||
|
Reference in New Issue
Block a user