Merge "Fix duplicated subcloud deletion request in orchestrator"

This commit is contained in:
Zuul
2025-06-16 20:47:05 +00:00
committed by Gerrit Code Review
4 changed files with 119 additions and 32 deletions

View File

@@ -932,6 +932,17 @@ def strategy_step_get_all_to_process(
)
def strategy_step_get_all_to_delete(
context, delete_start_at, last_update_threshold, max_parallel_subclouds
):
"""Retrieve all strategy steps that needs to be deleted in orchestration"""
return IMPL.Connection(context).strategy_step_get_all_to_delete(
delete_start_at=delete_start_at,
last_update_threshold=last_update_threshold,
max_parallel_subclouds=max_parallel_subclouds,
)
def strategy_step_count_all_states(context):
"""Retrieve the count of steps in each possible state"""
return IMPL.Connection(context).strategy_step_count_all_states()

View File

@@ -1835,6 +1835,41 @@ class Connection(object):
)
).all()
@require_context()
def strategy_step_get_all_to_delete(
self, delete_start_at, last_update_threshold, max_parallel_subclouds
):
# When the strategy is in deleting state, the steps that are in the database
# are in either complete or failed state.
with read_session() as session:
# Acquire all steps up to max_parallel_subclouds
subquery = (
model_query(self.context, models.StrategyStep.id, session=session)
.filter_by(deleted=0)
.order_by(models.StrategyStep.id)
.limit(max_parallel_subclouds)
)
# For the strategy deletion, it is necessary to validate both the
# delete_start_at and last_update_threshold since there are no specific
# states to identify when a step is being deleted
return (
model_query(self.context, models.StrategyStep, session=session)
.filter(models.StrategyStep.id.in_(subquery))
.filter(
or_(
# All steps sent for processing have their updated_at field
# reset, in which case they will only be retrieved if they
# are not updated for longer than the last_update_threshold.
# Otherwise, the delete_start_at is used to retrieve the
# steps that were not processed yet.
models.StrategyStep.updated_at < last_update_threshold,
models.StrategyStep.updated_at < delete_start_at,
)
)
.all()
)
@require_context()
def strategy_step_count_all_states(self):
with read_session() as session:

View File

@@ -37,6 +37,10 @@ from dcmanager.common import scheduler
from dcmanager.common import utils
from dcmanager.db import api as db_api
from dcmanager.orchestrator import rpcapi as orchestrator_rpc_api
from dcmanager.orchestrator.orchestrator_worker import (
DEFAULT_SLEEP_TIME_IN_SECONDS,
DELETE_COUNTER,
)
from dcmanager.orchestrator.validators.firmware_validator import (
FirmwareStrategyValidator,
)
@@ -99,6 +103,9 @@ class OrchestratorManager(manager.Manager):
}
self.thread_group_manager = scheduler.ThreadGroupManager(thread_pool_size=1)
# Stores the time in which the strategy deletion started
self.delete_start_at = None
# When starting the manager service, it is necessary to confirm if there
# are any strategies in a state different from initial, because that means
# the service was unexpectedly restarted and the periodic strategy monitoring
@@ -116,6 +123,11 @@ class OrchestratorManager(manager.Manager):
f"({strategy.type}) An active strategy was found, restarting "
"its monitoring"
)
# Set the delete start time when the strategy is deleting
if strategy.state == consts.SW_UPDATE_STATE_DELETING:
self.delete_start_at = timeutils.utcnow()
# The steps will only start processing after the orchestration interval
# This is done to avoid sending the steps to the workers in cases
# where only the manager service was restarted
@@ -183,26 +195,26 @@ class OrchestratorManager(manager.Manager):
steps_to_orchestrate.append(step.id)
if len(steps_to_orchestrate) == chunksize:
LOG.info(
f"({strategy_type}) Sending {len(steps_to_orchestrate)} steps "
"to orchestrate"
)
self.orchestrator_worker_rpc_client.orchestrate(
self.context, steps_to_orchestrate, strategy_type
)
LOG.info(
f"({strategy_type}) Sent {len(steps_to_orchestrate)} steps "
"to orchestrate"
)
if update:
steps_to_update.extend(steps_to_orchestrate)
steps_to_orchestrate = []
if steps_to_orchestrate:
LOG.info(
f"({strategy_type}) Sending final {len(steps_to_orchestrate)} steps "
"to orchestrate"
)
self.orchestrator_worker_rpc_client.orchestrate(
self.context, steps_to_orchestrate, strategy_type
)
LOG.info(
f"({strategy_type}) Sent final {len(steps_to_orchestrate)} steps "
"to orchestrate"
)
if update:
steps_to_update.extend(steps_to_orchestrate)
@@ -329,19 +341,40 @@ class OrchestratorManager(manager.Manager):
self.sleep_time = ORCHESTRATION_STRATEGY_MONITORING_INTERVAL
elif strategy.state == consts.SW_UPDATE_STATE_DELETING:
if total_steps != 0:
# If there are steps that were not deleted yet, send them to the
# workers for deletion
if strategy.state == consts.SW_UPDATE_STATE_DELETING:
steps = db_api.strategy_step_get_all(
self.context, limit=strategy.max_parallel_subclouds
# In the worker process, the deletion step has a wait of up to 180
# seconds, which is greater than the orchestration interval. Because
# of that, the threshold needs to be higher to ensure a step that is
# still being process is not identified as idle.
last_update_threshold = timeutils.utcnow() - datetime.timedelta(
seconds=(DEFAULT_SLEEP_TIME_IN_SECONDS * (DELETE_COUNTER + 1))
)
# If there are steps that were not deleted yet, verify if there is
# any that needs to be sent to the workers.
steps = db_api.strategy_step_get_all_to_delete(
self.context,
self.delete_start_at,
last_update_threshold,
strategy.max_parallel_subclouds,
)
if steps:
LOG.info(
f"({strategy_type}) {len(steps)} pending steps were found, "
"start processing"
)
self._create_and_send_step_batches(strategy_type, steps, True)
else:
# If all steps were deleted, delete the strategy
with self.strategy_lock:
db_api.sw_update_strategy_destroy(self.context, strategy_type)
self._monitor_strategy = False
self.sleep_time = ORCHESTRATION_STRATEGY_MONITORING_INTERVAL
return
# If all steps were deleted, delete the strategy
with self.strategy_lock:
db_api.sw_update_strategy_destroy(self.context, strategy_type)
LOG.info(f"({strategy_type}) Subcloud strategy deleted")
self._monitor_strategy = False
self.delete_start_at = None
self.sleep_time = ORCHESTRATION_STRATEGY_MONITORING_INTERVAL
def stop(self):
self.thread_group_manager.stop()
@@ -675,8 +708,11 @@ class OrchestratorManager(manager.Manager):
LOG.info(f"({sw_update_strategy.type}) Subcloud orchestration deleted")
return strategy_dict
# Set the start time for delete
self.delete_start_at = timeutils.utcnow()
# Reduce the sleep time since the deletion is faster than apply
self.sleep_time = self.sleep_time / 3
self.sleep_time = self.sleep_time / 6
# Send steps to be processed and start monitoring
self._create_and_send_step_batches(sw_update_strategy.type, steps, True)

View File

@@ -42,6 +42,7 @@ from dcmanager.orchestrator.strategies.software import SoftwareStrategy
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
DEFAULT_SLEEP_TIME_IN_SECONDS = 10
DELETE_COUNTER = 18
MANAGER_SLEEP_TIME_IN_SECONDS = 30
@@ -174,7 +175,8 @@ class OrchestratorWorker(object):
if self.strategy_type is None:
LOG.info(f"({strategy_type}) Orchestration starting with steps: {steps_id}")
# If the strategy does not exist, set the steps to process directly
self.steps_to_process = set(steps_id)
with self.steps_lock:
self.steps_received = set(steps_id)
self.strategy_type = strategy_type
self.thread_group_manager.start(self.orchestration_thread)
self._last_update = timeutils.utcnow()
@@ -217,7 +219,7 @@ class OrchestratorWorker(object):
try:
LOG.debug(
f"({self.strategy_type}) Orchestration is running for"
f"({self.strategy_type}) Orchestration is running for "
f"{len(self.steps_to_process)}"
)
@@ -266,10 +268,13 @@ class OrchestratorWorker(object):
# The strategy_type needs to be reset so that a new orchestration request
# is identified in orchestrate(), starting the orchestration thread again
self.strategy_type = None
self.steps_to_process.clear()
self.steps_received.clear()
with self.steps_lock:
self.strategy_type = None
self.steps_to_process.clear()
self.steps_received.clear()
self._last_update = None
self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS
def _adjust_sleep_time(self, number_of_subclouds, strategy_type):
prev_sleep_time = self._sleep_time
@@ -712,21 +717,21 @@ class OrchestratorWorker(object):
# Wait for 180 seconds so that last 100 workers can complete their execution
counter = 0
while len(self.subcloud_workers) > 0:
time.sleep(10)
time.sleep(DEFAULT_SLEEP_TIME_IN_SECONDS)
counter = counter + 1
if counter > 18:
if counter > DELETE_COUNTER:
break
# Remove the strategy from the database if all workers have completed their
# execution
try:
db_api.strategy_step_destroy_all(self.context, steps_id)
# Because the execution is synchronous in this case, the steps_to_process
# is not updated as the loop did not finish yet.
self.steps_to_process.clear()
except Exception as e:
LOG.exception(f"({strategy.type}) exception during delete")
raise e
finally:
# The orchestration is complete, halt the processing
self._processing = False
self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS
LOG.info(f"({strategy.type}) Finished deleting strategy")
LOG.info(f"({strategy.type}) Finished deleting strategy steps")