Merge "Switch nova-conductor to use ThreadPoolExecutor"

This commit is contained in:
Zuul
2025-10-02 13:40:26 +00:00
committed by Gerrit Code Review
3 changed files with 37 additions and 33 deletions

View File

@@ -19,9 +19,9 @@ import contextlib
import copy import copy
import functools import functools
import sys import sys
import threading
import typing as ty import typing as ty
import futurist
from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import exceptions as ks_exc
from oslo_config import cfg from oslo_config import cfg
from oslo_db import exception as db_exc from oslo_db import exception as db_exc
@@ -2062,7 +2062,7 @@ class ComputeTaskManager:
the host list the host list
:param image_id: The IDs of the image to cache :param image_id: The IDs of the image to cache
""" """
local_lock = threading.Lock()
# TODO(mriedem): Consider including the list of images in the # TODO(mriedem): Consider including the list of images in the
# notification payload. # notification payload.
compute_utils.notify_about_aggregate_action( compute_utils.notify_about_aggregate_action(
@@ -2072,7 +2072,7 @@ class ComputeTaskManager:
clock = timeutils.StopWatch() clock = timeutils.StopWatch()
threads = CONF.image_cache.precache_concurrency threads = CONF.image_cache.precache_concurrency
fetch_executor = futurist.GreenThreadPoolExecutor(max_workers=threads) fetch_executor = utils.create_executor(threads)
hosts_by_cell = {} hosts_by_cell = {}
cells_by_uuid = {} cells_by_uuid = {}
@@ -2099,24 +2099,24 @@ class ComputeTaskManager:
} }
def host_completed(context, host, result): def host_completed(context, host, result):
for image_id, status in result.items(): with local_lock:
cached, existing, error, unsupported = stats[image_id] for image_id, status in result.items():
if status == 'error': cached, existing, error, unsupported = stats[image_id]
failed_images[image_id] += 1 if status == 'error':
error += 1 failed_images[image_id] += 1
elif status == 'cached': error += 1
cached += 1 elif status == 'cached':
elif status == 'existing': cached += 1
existing += 1 elif status == 'existing':
elif status == 'unsupported': existing += 1
unsupported += 1 elif status == 'unsupported':
stats[image_id] = (cached, existing, error, unsupported) unsupported += 1
stats[image_id] = (cached, existing, error, unsupported)
host_stats['completed'] += 1 host_stats['completed'] += 1
compute_utils.notify_about_aggregate_cache(context, aggregate, compute_utils.notify_about_aggregate_cache(
host, result, context, aggregate, host, result,
host_stats['completed'], host_stats['completed'], host_stats['total'])
host_stats['total'])
def wrap_cache_images(ctxt, host, image_ids): def wrap_cache_images(ctxt, host, image_ids):
result = self.compute_rpcapi.cache_images( result = self.compute_rpcapi.cache_images(

View File

@@ -104,19 +104,23 @@ def destroy_default_executor():
DEFAULT_EXECUTOR = None DEFAULT_EXECUTOR = None
def create_executor(max_workers):
if concurrency_mode_threading():
executor = futurist.ThreadPoolExecutor(max_workers)
else:
executor = futurist.GreenThreadPoolExecutor(max_workers)
return executor
def _get_default_executor(): def _get_default_executor():
global DEFAULT_EXECUTOR global DEFAULT_EXECUTOR
if not DEFAULT_EXECUTOR: if not DEFAULT_EXECUTOR:
if concurrency_mode_threading(): max_workers = (
DEFAULT_EXECUTOR = futurist.ThreadPoolExecutor( CONF.default_thread_pool_size if concurrency_mode_threading()
CONF.default_thread_pool_size else CONF.default_green_pool_size
) )
else: DEFAULT_EXECUTOR = create_executor(max_workers)
DEFAULT_EXECUTOR = futurist.GreenThreadPoolExecutor(
CONF.default_green_pool_size
)
pname = multiprocessing.current_process().name pname = multiprocessing.current_process().name
executor_name = f"{pname}.default" executor_name = f"{pname}.default"
DEFAULT_EXECUTOR.name = executor_name DEFAULT_EXECUTOR.name = executor_name
@@ -1183,11 +1187,11 @@ def get_scatter_gather_executor():
global SCATTER_GATHER_EXECUTOR global SCATTER_GATHER_EXECUTOR
if not SCATTER_GATHER_EXECUTOR: if not SCATTER_GATHER_EXECUTOR:
if concurrency_mode_threading(): max_workers = (
SCATTER_GATHER_EXECUTOR = futurist.ThreadPoolExecutor( CONF.cell_worker_thread_pool_size
CONF.cell_worker_thread_pool_size) if concurrency_mode_threading() else 1000
else: )
SCATTER_GATHER_EXECUTOR = futurist.GreenThreadPoolExecutor() SCATTER_GATHER_EXECUTOR = create_executor(max_workers)
pname = multiprocessing.current_process().name pname = multiprocessing.current_process().name
executor_name = f"{pname}.cell_worker" executor_name = f"{pname}.cell_worker"

View File

@@ -60,6 +60,6 @@ cursive>=0.2.1 # Apache-2.0
retrying>=1.3.3 # Apache-2.0 retrying>=1.3.3 # Apache-2.0
os-service-types>=1.7.0 # Apache-2.0 os-service-types>=1.7.0 # Apache-2.0
python-dateutil>=2.7.0 # BSD python-dateutil>=2.7.0 # BSD
futurist>=1.8.0 # Apache-2.0 futurist>=3.2.1 # Apache-2.0
openstacksdk>=4.4.0 # Apache-2.0 openstacksdk>=4.4.0 # Apache-2.0
PyYAML>=5.1 # MIT PyYAML>=5.1 # MIT