From cf1b42ea3d35d51d327e3aff0a05a9d402af0e15 Mon Sep 17 00:00:00 2001 From: Dmitry Tantsur Date: Tue, 5 Oct 2021 14:35:22 +0200 Subject: [PATCH] Add a helper for node-based periodics We have a very common pattern of periodic tasks that use iter_nodes to fetch some nodes, check them, create a task and conductor some operation. This change introduces a helper decorator for that and migrates the drivers to it. I'm intentionally leaving unit tests intact to demonstrate that the new decorator works exactly the same way (modulo cosmetic changes) as the previous hand-written code. Change-Id: Ifed4a457275d9451cc412dc80f3c09df72f50492 Story: #2009203 Task: #43522 --- doc/source/contributor/architecture.rst | 7 +- doc/source/contributor/deploy-steps.rst | 89 ++++++++++ ironic/conductor/manager.py | 157 +++++++----------- ironic/conductor/periodics.py | 151 +++++++++++++++++ ironic/drivers/modules/drac/bios.py | 64 +++---- ironic/drivers/modules/drac/management.py | 53 ++---- ironic/drivers/modules/drac/raid.py | 95 ++++------- ironic/drivers/modules/inspector.py | 23 +-- ironic/drivers/modules/irmc/raid.py | 126 ++++++-------- ironic/drivers/modules/pxe_base.py | 33 ++-- ironic/drivers/modules/redfish/management.py | 116 ++++--------- ironic/drivers/modules/redfish/raid.py | 115 ++++--------- ironic/tests/unit/conductor/test_manager.py | 12 +- ironic/tests/unit/conductor/test_periodics.py | 135 +++++++++++++++ .../drivers/modules/drac/test_management.py | 5 +- .../unit/drivers/modules/drac/test_raid.py | 5 +- .../modules/irmc/test_periodic_task.py | 3 + .../modules/redfish/test_management.py | 9 +- 18 files changed, 662 insertions(+), 536 deletions(-) create mode 100644 ironic/conductor/periodics.py create mode 100644 ironic/tests/unit/conductor/test_periodics.py diff --git a/doc/source/contributor/architecture.rst b/doc/source/contributor/architecture.rst index e246fd28b1..84e58ee1a0 100644 --- a/doc/source/contributor/architecture.rst +++ b/doc/source/contributor/architecture.rst @@ -42,7 +42,7 @@ Drivers may run their own periodic tasks, i.e. actions run repeatedly after a certain amount of time. Such a task is created by using the periodic_ decorator on an interface method. For example -:: +.. code-block:: python from futurist import periodics @@ -55,6 +55,11 @@ decorator on an interface method. For example Here the ``spacing`` argument is a period in seconds for a given periodic task. For example 'spacing=5' means every 5 seconds. +Starting with the Yoga cycle, there is also a new decorator +:py:func:`ironic.conductor.periodics.node_periodic` to create periodic tasks +that handle nodes. See :ref:`deploy steps documentation ` +for an example. + Driver-Specific Steps --------------------- diff --git a/doc/source/contributor/deploy-steps.rst b/doc/source/contributor/deploy-steps.rst index e6407d41e3..a6cd6809d2 100644 --- a/doc/source/contributor/deploy-steps.rst +++ b/doc/source/contributor/deploy-steps.rst @@ -188,6 +188,95 @@ following pattern: return deploy_utils.reboot_to_finish_step(task) +.. _deploy-steps-polling: + +Polling for completion +~~~~~~~~~~~~~~~~~~~~~~~ + +Finally, you may want to poll the BMC until the operation is complete. Often +enough, this also involves a reboot. In this case you can use the +:py:func:`ironic.conductor.periodics.node_periodic` decorator to create a +periodic task that operates on relevant nodes: + +.. code-block:: python + + from ironic.common import states + from ironic.common import utils + from ironic.conductor import periodics + from ironic.drivers import base + from ironic.drivers.modules import deploy_utils + + _STATUS_CHECK_INTERVAL = ... # better use a configuration option + + class MyManagement(base.ManagementInterface): + ... + + @base.clean_step(priority=0) + def my_action(self, task): + ... + + reboot_required = ... # your step may or may not need rebooting + + # Make this node as running my_action. Often enough you will store + # some useful data rather than a boolean flag. + utils.set_node_nested_field(task.node, 'driver_internal_info', + 'in_my_action', True) + + # Tell ironic that... + deploy_utils.set_async_step_flags( + node, + # ... we're waiting for IPA to come back after reboot + reboot=reboot_required, + # ... the current step shouldn't be entered again + skip_current_step=True, + # ... we'll be polling until the step is done + polling=True) + + if reboot_required: + return deploy_utils.reboot_to_finish_step(task) + + @periodics.node_periodic( + purpose='checking my action status', + spacing=_STATUS_CHECK_INTERVAL, + filters={ + # Skip nodes that already have a lock + 'reserved': False, + # Only consider nodes that are waiting for cleaning or failed + # on timeout. + 'provision_state_in': [states.CLEANWAIT, states.CLEANFAIL], + }, + # Load driver_internal_info from the database on listing + predicate_extra_fields=['driver_internal_info'], + # Only consider nodes with in_my_action + predicate=lambda n: n.driver_internal_info.get('in_my_action'), + ) + def check_my_action(self, task, manager, context): + # Double-check that the node is managed by this interface + if not isinstance(task.driver.management, MyManagement): + return + + if not needs_actions(): # insert your checks here + return + + task.upgrade_lock() + + ... # do any required updates + + # Drop the flag so that this node is no longer considered + utils.pop_node_nested_field(task.node, 'driver_internal_info', + 'in_my_action') + +Note that creating a ``task`` involves an additional database query, so you +want to avoid creating them for too many nodes in your periodic tasks. Instead: + +* Try to use precise ``filters`` to filter out nodes on the database level. + Using ``reserved`` and ``provision_state``/``provision_state_in`` are + recommended in most cases. See + :py:meth:`ironic.db.api.Connection.get_nodeinfo_list` for a list of possible + filters. +* Use ``predicate`` to filter on complex fields such as + ``driver_internal_info``. Predicates are checked before tasks are created. + Implementing RAID ----------------- diff --git a/ironic/conductor/manager.py b/ironic/conductor/manager.py index ed54aa67b1..4c49bc7893 100644 --- a/ironic/conductor/manager.py +++ b/ironic/conductor/manager.py @@ -45,7 +45,6 @@ import datetime import queue import eventlet -from futurist import periodics from futurist import waiters from ironic_lib import metrics_utils from oslo_log import log @@ -66,6 +65,7 @@ from ironic.conductor import base_manager from ironic.conductor import cleaning from ironic.conductor import deployments from ironic.conductor import notification_utils as notify_utils +from ironic.conductor import periodics from ironic.conductor import steps as conductor_steps from ironic.conductor import task_manager from ironic.conductor import utils @@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager): eventlet.sleep(0) @METRICS.timer('ConductorManager._power_failure_recovery') - @periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval, - enabled=bool( - CONF.conductor.power_failure_recovery_interval)) - def _power_failure_recovery(self, context): + @periodics.node_periodic( + purpose='power failure recovery', + spacing=CONF.conductor.power_failure_recovery_interval, + # NOTE(kaifeng) To avoid conflicts with periodic task of the + # regular power state checking, maintenance is still a required + # condition. + filters={'maintenance': True, 'fault': faults.POWER_FAILURE}, + ) + def _power_failure_recovery(self, task, context): """Periodic task to check power states for nodes in maintenance. Attempt to grab a lock and sync only if the following @@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager): 3) Node is not reserved. 4) Node is not in the ENROLL state. """ - def should_sync_power_state_for_recovery(task): - """Check if ironic should sync power state for recovery.""" - - # NOTE(dtantsur): it's also pointless (and dangerous) to - # sync power state when a power action is in progress - if (task.node.provision_state == states.ENROLL - or not task.node.maintenance - or task.node.fault != faults.POWER_FAILURE - or task.node.target_power_state - or task.node.reservation): - return False - return True - def handle_recovery(task, actual_power_state): """Handle recovery when power sync is succeeded.""" task.upgrade_lock() @@ -1546,48 +1538,33 @@ class ConductorManager(base_manager.BaseConductorManager): notify_utils.emit_power_state_corrected_notification( task, old_power_state) - # NOTE(kaifeng) To avoid conflicts with periodic task of the - # regular power state checking, maintenance is still a required - # condition. - filters = {'maintenance': True, - 'fault': faults.POWER_FAILURE} - node_iter = self.iter_nodes(fields=['id'], filters=filters) - for (node_uuid, driver, conductor_group, node_id) in node_iter: - try: - with task_manager.acquire(context, node_uuid, - purpose='power failure recovery', - shared=True) as task: - if not should_sync_power_state_for_recovery(task): - continue - try: - # Validate driver info in case of parameter changed - # in maintenance. - task.driver.power.validate(task) - # The driver may raise an exception, or may return - # ERROR. Handle both the same way. - power_state = task.driver.power.get_power_state(task) - if power_state == states.ERROR: - raise exception.PowerStateFailure( - _("Power driver returned ERROR state " - "while trying to get power state.")) - except Exception as e: - LOG.debug("During power_failure_recovery, could " - "not get power state for node %(node)s, " - "Error: %(err)s.", - {'node': task.node.uuid, 'err': e}) - else: - handle_recovery(task, power_state) - except exception.NodeNotFound: - LOG.info("During power_failure_recovery, node %(node)s was " - "not found and presumed deleted by another process.", - {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During power_failure_recovery, node %(node)s was " - "already locked by another process. Skip.", - {'node': node_uuid}) - finally: - # Yield on every iteration - eventlet.sleep(0) + # NOTE(dtantsur): it's also pointless (and dangerous) to + # sync power state when a power action is in progress + if (task.node.provision_state == states.ENROLL + or not task.node.maintenance + or task.node.fault != faults.POWER_FAILURE + or task.node.target_power_state + or task.node.reservation): + return + + try: + # Validate driver info in case of parameter changed + # in maintenance. + task.driver.power.validate(task) + # The driver may raise an exception, or may return + # ERROR. Handle both the same way. + power_state = task.driver.power.get_power_state(task) + if power_state == states.ERROR: + raise exception.PowerStateFailure( + _("Power driver returned ERROR state " + "while trying to get power state.")) + except Exception as e: + LOG.debug("During power_failure_recovery, could " + "not get power state for node %(node)s, " + "Error: %(err)s.", + {'node': task.node.uuid, 'err': e}) + else: + handle_recovery(task, power_state) @METRICS.timer('ConductorManager._check_deploy_timeouts') @periodics.periodic( @@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager): ) @METRICS.timer('ConductorManager._sync_local_state') - @periodics.periodic(spacing=CONF.conductor.sync_local_state_interval, - enabled=CONF.conductor.sync_local_state_interval > 0) - def _sync_local_state(self, context): + @periodics.node_periodic( + purpose='node take over', + spacing=CONF.conductor.sync_local_state_interval, + filters={'reserved': False, 'maintenance': False, + 'provision_state': states.ACTIVE}, + predicate_extra_fields=['conductor_affinity'], + predicate=lambda n, m: n.conductor_affinity != m.conductor.id, + limit=lambda: CONF.conductor.periodic_max_workers, + shared_task=False, + ) + def _sync_local_state(self, task, context): """Perform any actions necessary to sync local state. This is called periodically to refresh the conductor's copy of the @@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager): The ensuing actions could include preparing a PXE environment, updating the DHCP server, and so on. """ - filters = {'reserved': False, - 'maintenance': False, - 'provision_state': states.ACTIVE} - node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'], - filters=filters) + # NOTE(tenbrae): now that we have the lock, check again to + # avoid racing with deletes and other state changes + node = task.node + if (node.maintenance + or node.conductor_affinity == self.conductor.id + or node.provision_state != states.ACTIVE): + return False - workers_count = 0 - for (node_uuid, driver, conductor_group, node_id, - conductor_affinity) in node_iter: - if conductor_affinity == self.conductor.id: - continue - - # Node is mapped here, but not updated by this conductor last - try: - with task_manager.acquire(context, node_uuid, - purpose='node take over') as task: - # NOTE(tenbrae): now that we have the lock, check again to - # avoid racing with deletes and other state changes - node = task.node - if (node.maintenance - or node.conductor_affinity == self.conductor.id - or node.provision_state != states.ACTIVE): - continue - - task.spawn_after(self._spawn_worker, - self._do_takeover, task) - - except exception.NoFreeConductorWorker: - break - except (exception.NodeLocked, exception.NodeNotFound): - continue - workers_count += 1 - if workers_count == CONF.conductor.periodic_max_workers: - break + try: + task.spawn_after(self._spawn_worker, self._do_takeover, task) + except exception.NoFreeConductorWorker: + raise periodics.Stop() + else: + return True @METRICS.timer('ConductorManager.validate_driver_interfaces') @messaging.expected_exceptions(exception.NodeLocked) diff --git a/ironic/conductor/periodics.py b/ironic/conductor/periodics.py new file mode 100644 index 0000000000..ead5cbf08a --- /dev/null +++ b/ironic/conductor/periodics.py @@ -0,0 +1,151 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Conductor periodics.""" + +import collections +import functools +import inspect + +import eventlet +from futurist import periodics +from oslo_log import log + +from ironic.common import exception +from ironic.conductor import base_manager +from ironic.conductor import task_manager + + +LOG = log.getLogger(__name__) + + +def periodic(spacing, enabled=True, **kwargs): + """A decorator to define a periodic task. + + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + """ + return periodics.periodic(spacing=spacing, + enabled=enabled and spacing > 0, + **kwargs) + + +class Stop(Exception): + """A signal to stop the current iteration of a periodic task.""" + + +def node_periodic(purpose, spacing, enabled=True, filters=None, + predicate=None, predicate_extra_fields=(), limit=None, + shared_task=True): + """A decorator to define a periodic task to act on nodes. + + Defines a periodic task that fetches the list of nodes mapped to the + current conductor which satisfy the provided filters. + + The decorated function must be a method on either the conductor manager + or a hardware interface. The signature is: + + * for conductor manager: ``(self, task, context)`` + * for hardware interfaces: ``(self, task, manager, context)``. + + ``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop`` + to abort the current iteration of the task and reschedule it. + + :param purpose: a human-readable description of the activity, e.g. + "verifying that the cat is purring". + :param spacing: how often (in seconds) to run the periodic task. + :param enabled: whether the task is enabled; defaults to ``spacing > 0``. + :param filters: database-level filters for the nodes. + :param predicate: a callable to run on the fetched nodes *before* creating + a task for them. The only parameter will be a named tuple with fields + ``uuid``, ``driver``, ``conductor_group`` plus everything from + ``predicate_extra_fields``. If the callable accepts a 2nd parameter, + it will be the conductor manager instance. + :param predicate_extra_fields: extra fields to fetch on the initial + request and pass into the ``predicate``. Must not contain ``uuid``, + ``driver`` and ``conductor_group`` since they are always included. + :param limit: how many nodes to process before stopping the current + iteration. If ``predicate`` returns ``False``, the node is not counted. + If the decorated function returns ``False``, the node is not counted + either. Can be a callable, in which case it will be called on each + iteration to determine the limit. + :param shared_task: if ``True``, the task will have a shared lock. It is + recommended to start with a shared lock and upgrade it only if needed. + """ + node_type = collections.namedtuple( + 'Node', + ['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields) + ) + + # Accepting a conductor manager is a bit of an edge case, doing a bit of + # a signature magic to avoid passing it everywhere. + accepts_manager = (predicate is not None + and len(inspect.signature(predicate).parameters) > 1) + + def decorator(func): + @periodic(spacing=spacing, enabled=enabled) + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # Make it work with both drivers and the conductor manager + if isinstance(self, base_manager.BaseConductorManager): + manager = self + context = args[0] + else: + manager = args[0] + context = args[1] + + if callable(limit): + local_limit = limit() + else: + local_limit = limit + assert local_limit is None or local_limit > 0 + + nodes = manager.iter_nodes(filters=filters, + fields=predicate_extra_fields) + for (node_uuid, *other) in nodes: + if predicate is not None: + node = node_type(node_uuid, *other) + if accepts_manager: + result = predicate(node, manager) + else: + result = predicate(node) + if not result: + continue + + try: + with task_manager.acquire(context, node_uuid, + purpose=purpose, + shared=shared_task) as task: + result = func(self, task, *args, **kwargs) + except exception.NodeNotFound: + LOG.info("During %(action)s, node %(node)s was not found " + "and presumed deleted by another process.", + {'node': node_uuid, 'action': purpose}) + except exception.NodeLocked: + LOG.info("During %(action)s, node %(node)s was already " + "locked by another process. Skip.", + {'node': node_uuid, 'action': purpose}) + except Stop: + break + finally: + # Yield on every iteration + eventlet.sleep(0) + + if (local_limit is not None + and (result is None or result)): + local_limit -= 1 + if not local_limit: + return + + return wrapper + + return decorator diff --git a/ironic/drivers/modules/drac/bios.py b/ironic/drivers/modules/drac/bios.py index e40089a4ff..795e4f1508 100644 --- a/ironic/drivers/modules/drac/bios.py +++ b/ironic/drivers/modules/drac/bios.py @@ -15,7 +15,6 @@ DRAC BIOS configuration specific methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -23,7 +22,7 @@ from oslo_utils import timeutils from ironic.common import exception from ironic.common.i18n import _ -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -151,9 +150,16 @@ class DracWSManBIOS(base.BIOSInterface): # spacing since BIOS jobs could be comparatively shorter in time than # RAID ones currently using the raid spacing to avoid errors # spacing parameter for periodic method - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_bios_config_job_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async bios configuration jobs', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('bios_config_job_ids') + or n.driver_internal_info.get('factory_reset_time_before_reboot')), + ) + def _query_bios_config_job_status(self, task, manager, context): """Periodic task to check the progress of running BIOS config jobs. :param manager: an instance of Ironic Conductor Manager with @@ -161,47 +167,17 @@ class DracWSManBIOS(base.BIOSInterface): :param context: context of the request, needed when acquiring a lock on a node. For access control. """ + # skip a node not being managed by idrac driver + if not isinstance(task.driver.bios, DracWSManBIOS): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] + # check bios_config_job_id exist & checks job is completed + if task.node.driver_internal_info.get("bios_config_job_ids"): + self._check_node_bios_jobs(task) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - # NOTE(TheJulia) Evaluate if work is actually required before - # creating a task for every node in the deployment which does - # not have a lock and is not in maintenance mode. - if (not driver_internal_info.get("bios_config_job_ids") - and not driver_internal_info.get( - "factory_reset_time_before_reboot")): - continue - - lock_purpose = 'checking async bios configuration jobs' - # Performing read-only/non-destructive work with shared lock - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - # skip a node not being managed by idrac driver - if not isinstance(task.driver.bios, DracWSManBIOS): - continue - - # check bios_config_job_id exist & checks job is completed - if driver_internal_info.get("bios_config_job_ids"): - self._check_node_bios_jobs(task) - - if driver_internal_info.get( - "factory_reset_time_before_reboot"): - self._check_last_system_inventory_changed(task) - - except exception.NodeNotFound: - LOG.info("During query_bios_config_job_status, node " - "%(node)s was not found and presumed deleted by " - "another process.", {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During query_bios_config_job_status, node " - "%(node)s was already locked by another process. " - "Skip.", {'node': node_uuid}) + if task.node.driver_internal_info.get( + "factory_reset_time_before_reboot"): + self._check_last_system_inventory_changed(task) def _check_last_system_inventory_changed(self, task): """Check the progress of last system inventory time of a node. diff --git a/ironic/drivers/modules/drac/management.py b/ironic/drivers/modules/drac/management.py index dd614b42a9..9b2b534311 100644 --- a/ironic/drivers/modules/drac/management.py +++ b/ironic/drivers/modules/drac/management.py @@ -23,7 +23,6 @@ DRAC management interface import json import time -from futurist import periodics from ironic_lib import metrics_utils import jsonschema from jsonschema import exceptions as json_schema_exc @@ -34,6 +33,7 @@ from ironic.common import boot_devices from ironic.common import exception from ironic.common.i18n import _ from ironic.common import molds +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -485,46 +485,23 @@ class DracRedfishManagement(redfish_management.RedfishManagement): # Export executed as part of Import async periodic task status check @METRICS.timer('DracRedfishManagement._query_import_configuration_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async import configuration task', spacing=CONF.drac.query_import_config_job_status_interval, - enabled=CONF.drac.query_import_config_job_status_interval > 0) - def _query_import_configuration_status(self, manager, context): + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('import_task_monitor_url') + ), + ) + def _query_import_configuration_status(self, task, manager, context): """Period job to check import configuration task.""" + if not isinstance(task.driver.management, DracRedfishManagement): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - - task_monitor_url = driver_internal_info.get( - 'import_task_monitor_url') - # NOTE(TheJulia): Evaluate if a task montitor URL exists - # based upon our inital DB query before pulling a task for - # every node in the deployment which reduces the overall - # number of DB queries triggering in the background where - # no work is required. - if not task_monitor_url: - continue - - lock_purpose = 'checking async import configuration task' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - DracRedfishManagement): - continue - self._check_import_configuration_task( - task, task_monitor_url) - except exception.NodeNotFound: - LOG.info('During _query_import_configuration_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_import_configuration_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_import_configuration_task( + task, task.node.driver_internal_info.get( + 'import_task_monitor_url')) def _check_import_configuration_task(self, task, task_monitor_url): """Checks progress of running import configuration task""" diff --git a/ironic/drivers/modules/drac/raid.py b/ironic/drivers/modules/drac/raid.py index 1bdd36d85d..726f57d3af 100644 --- a/ironic/drivers/modules/drac/raid.py +++ b/ironic/drivers/modules/drac/raid.py @@ -18,7 +18,6 @@ DRAC RAID specific methods from collections import defaultdict import math -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -28,7 +27,7 @@ import tenacity from ironic.common import exception from ironic.common.i18n import _ from ironic.common import raid as raid_common -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -1487,38 +1486,22 @@ class DracRedfishRAID(redfish_raid.RedfishRAID): return False @METRICS.timer('DracRedfishRAID._query_raid_tasks_status') - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_raid_tasks_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async RAID tasks', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('raid_task_monitor_uris') + ), + ) + def _query_raid_tasks_status(self, task, manager, context): """Periodic task to check the progress of running RAID tasks""" + if not isinstance(task.driver.raid, DracRedfishRAID): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - task_monitor_uris = driver_internal_info.get( - 'raid_task_monitor_uris') - if not task_monitor_uris: - continue - try: - lock_purpose = 'checking async RAID tasks' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, - DracRedfishRAID): - continue - self._check_raid_tasks_status( - task, task_monitor_uris) - except exception.NodeNotFound: - LOG.info('During _query_raid_tasks_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_tasks_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_raid_tasks_status( + task, task.node.driver_internal_info.get('raid_task_monitor_uris')) def _check_raid_tasks_status(self, task, task_mon_uris): """Checks RAID tasks for completion @@ -1763,43 +1746,21 @@ class DracWSManRAID(base.RAIDInterface): return {'logical_disks': logical_disks} @METRICS.timer('DracRAID._query_raid_config_job_status') - @periodics.periodic( - spacing=CONF.drac.query_raid_config_job_status_interval) - def _query_raid_config_job_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async raid configuration jobs', + spacing=CONF.drac.query_raid_config_job_status_interval, + filters={'reserved': False, 'maintenance': False}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: ( + n.driver_internal_info.get('raid_config_job_ids') + ), + ) + def _query_raid_config_job_status(self, task, manager, context): """Periodic task to check the progress of running RAID config jobs.""" + if not isinstance(task.driver.raid, DracWSManRAID): + return - filters = {'reserved': False, 'maintenance': False} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - - job_ids = driver_internal_info.get('raid_config_job_ids') - # NOTE(TheJulia): Evaluate if there is work to be done - # based upon the original DB query's results so we don't - # proceed creating tasks for every node in the deployment. - if not job_ids: - continue - - lock_purpose = 'checking async raid configuration jobs' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, DracWSManRAID): - continue - - self._check_node_raid_jobs(task) - - except exception.NodeNotFound: - LOG.info("During query_raid_config_job_status, node " - "%(node)s was not found and presumed deleted by " - "another process.", {'node': node_uuid}) - except exception.NodeLocked: - LOG.info("During query_raid_config_job_status, node " - "%(node)s was already locked by another process. " - "Skip.", {'node': node_uuid}) + self._check_node_raid_jobs(task) @METRICS.timer('DracRAID._check_node_raid_jobs') def _check_node_raid_jobs(self, task): diff --git a/ironic/drivers/modules/inspector.py b/ironic/drivers/modules/inspector.py index b344abb74c..1b866d0d5c 100644 --- a/ironic/drivers/modules/inspector.py +++ b/ironic/drivers/modules/inspector.py @@ -20,7 +20,6 @@ import shlex from urllib import parse as urlparse import eventlet -from futurist import periodics import openstack from oslo_log import log as logging @@ -29,6 +28,7 @@ from ironic.common.i18n import _ from ironic.common import keystone from ironic.common import states from ironic.common import utils +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as cond_utils from ironic.conf import CONF @@ -292,21 +292,14 @@ class Inspector(base.InspectInterface): 'ironic-inspector', {'uuid': node_uuid}) _get_client(task.context).abort_introspection(node_uuid) - @periodics.periodic(spacing=CONF.inspector.status_check_period) - def _periodic_check_result(self, manager, context): + @periodics.node_periodic( + purpose='checking hardware inspection status', + spacing=CONF.inspector.status_check_period, + filters={'provision_state': states.INSPECTWAIT}, + ) + def _periodic_check_result(self, task, manager, context): """Periodic task checking results of inspection.""" - filters = {'provision_state': states.INSPECTWAIT} - node_iter = manager.iter_nodes(filters=filters) - - for node_uuid, driver, conductor_group in node_iter: - try: - lock_purpose = 'checking hardware inspection status' - with task_manager.acquire(context, node_uuid, - shared=True, - purpose=lock_purpose) as task: - _check_status(task) - except (exception.NodeLocked, exception.NodeNotFound): - continue + _check_status(task) def _start_inspection(node_uuid, context): diff --git a/ironic/drivers/modules/irmc/raid.py b/ironic/drivers/modules/irmc/raid.py index 8f1bd172af..3368e887d3 100644 --- a/ironic/drivers/modules/irmc/raid.py +++ b/ironic/drivers/modules/irmc/raid.py @@ -15,7 +15,6 @@ """ Irmc RAID specific methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log as logging from oslo_utils import importutils @@ -23,7 +22,7 @@ from oslo_utils import importutils from ironic.common import exception from ironic.common import raid as raid_common from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic import conf from ironic.drivers import base @@ -430,80 +429,63 @@ class IRMCRAID(base.RAIDInterface): {'node_id': node_uuid, 'cfg': node.raid_config}) @METRICS.timer('IRMCRAID._query_raid_config_fgi_status') - @periodics.periodic( - spacing=CONF.irmc.query_raid_config_fgi_status_interval) - def _query_raid_config_fgi_status(self, manager, context): + @periodics.node_periodic( + purpose='checking async RAID configuration tasks', + spacing=CONF.irmc.query_raid_config_fgi_status_interval, + filters={'reserved': False, 'provision_state': states.CLEANWAIT, + 'maintenance': False}, + predicate_extra_fields=['raid_config'], + predicate=lambda n: ( + n.raid_config and not n.raid_config.get('fgi_status') + ), + ) + def _query_raid_config_fgi_status(self, task, manager, context): """Periodic tasks to check the progress of running RAID config.""" + node = task.node + node_uuid = task.node.uuid + if not isinstance(task.driver.raid, IRMCRAID): + return + if task.node.target_raid_config is None: + return + task.upgrade_lock() + if node.provision_state != states.CLEANWAIT: + return + # Avoid hitting clean_callback_timeout expiration + node.touch_provisioning() - filters = {'reserved': False, 'provision_state': states.CLEANWAIT, - 'maintenance': False} - fields = ['raid_config'] - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, raid_config) in node_list: - try: - # NOTE(TheJulia): Evaluate based upon presence of raid - # configuration before triggering a task, as opposed to after - # so we don't create excess node task objects with related - # DB queries. - if not raid_config or raid_config.get('fgi_status'): - continue + raid_config = node.raid_config - lock_purpose = 'checking async RAID configuration tasks' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - node = task.node - node_uuid = task.node.uuid - if not isinstance(task.driver.raid, IRMCRAID): - continue - if task.node.target_raid_config is None: - continue - task.upgrade_lock() - if node.provision_state != states.CLEANWAIT: - continue - # Avoid hitting clean_callback_timeout expiration - node.touch_provisioning() + try: + report = irmc_common.get_irmc_report(node) + except client.scci.SCCIInvalidInputError: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, RAID_FAILED) + return + except client.scci.SCCIClientError: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, RAID_FAILED) + return - try: - report = irmc_common.get_irmc_report(node) - except client.scci.SCCIInvalidInputError: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, RAID_FAILED) - continue - except client.scci.SCCIClientError: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, RAID_FAILED) - continue - - fgi_status_dict = _get_fgi_status(report, node_uuid) - # Note(trungnv): Allow to check until RAID mechanism to be - # completed with RAID information in report. - if fgi_status_dict == 'completing': - continue - if not fgi_status_dict: - raid_config.update({'fgi_status': RAID_FAILED}) - raid_common.update_raid_info(node, raid_config) - self._set_clean_failed(task, fgi_status_dict) - continue - if all(fgi_status == 'Idle' for fgi_status in - fgi_status_dict.values()): - raid_config.update({'fgi_status': RAID_COMPLETED}) - raid_common.update_raid_info(node, raid_config) - LOG.info('RAID configuration has completed on ' - 'node %(node)s with fgi_status is %(fgi)s', - {'node': node_uuid, 'fgi': RAID_COMPLETED}) - self._resume_cleaning(task) - - except exception.NodeNotFound: - LOG.info('During query_raid_config_job_status, node ' - '%(node)s was not found raid_config and presumed ' - 'deleted by another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During query_raid_config_job_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + fgi_status_dict = _get_fgi_status(report, node_uuid) + # Note(trungnv): Allow to check until RAID mechanism to be + # completed with RAID information in report. + if fgi_status_dict == 'completing': + return + if not fgi_status_dict: + raid_config.update({'fgi_status': RAID_FAILED}) + raid_common.update_raid_info(node, raid_config) + self._set_clean_failed(task, fgi_status_dict) + return + if all(fgi_status == 'Idle' for fgi_status in + fgi_status_dict.values()): + raid_config.update({'fgi_status': RAID_COMPLETED}) + raid_common.update_raid_info(node, raid_config) + LOG.info('RAID configuration has completed on ' + 'node %(node)s with fgi_status is %(fgi)s', + {'node': node_uuid, 'fgi': RAID_COMPLETED}) + self._resume_cleaning(task) def _set_clean_failed(self, task, fgi_status_dict): LOG.error('RAID configuration task failed for node %(node)s. ' diff --git a/ironic/drivers/modules/pxe_base.py b/ironic/drivers/modules/pxe_base.py index 5fff4ae51b..ab5b0d5357 100644 --- a/ironic/drivers/modules/pxe_base.py +++ b/ironic/drivers/modules/pxe_base.py @@ -13,7 +13,6 @@ Base PXE Interface Methods """ -from futurist import periodics from ironic_lib import metrics_utils from oslo_config import cfg from oslo_log import log as logging @@ -24,7 +23,7 @@ from ironic.common import exception from ironic.common.i18n import _ from ironic.common import pxe_utils from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.drivers.modules import boot_mode_utils from ironic.drivers.modules import deploy_utils @@ -452,29 +451,23 @@ class PXEBaseMixin(object): states.RESCUEWAIT} @METRICS.timer('PXEBaseMixin._check_boot_timeouts') - @periodics.periodic(spacing=CONF.pxe.boot_retry_check_interval, - enabled=bool(CONF.pxe.boot_retry_timeout)) - def _check_boot_timeouts(self, manager, context): + @periodics.node_periodic( + purpose='checking PXE boot status', + spacing=CONF.pxe.boot_retry_check_interval, + enabled=bool(CONF.pxe.boot_retry_timeout), + filters={'provision_state_in': _RETRY_ALLOWED_STATES, + 'reserved': False, + 'maintenance': False, + 'provisioned_before': CONF.pxe.boot_retry_timeout}, + ) + def _check_boot_timeouts(self, task, manager, context): """Periodically checks whether boot has timed out and retry it. + :param task: a task instance. :param manager: conductor manager. :param context: request context. """ - filters = {'provision_state_in': self._RETRY_ALLOWED_STATES, - 'reserved': False, - 'maintenance': False, - 'provisioned_before': CONF.pxe.boot_retry_timeout} - node_iter = manager.iter_nodes(filters=filters) - - for node_uuid, driver, conductor_group in node_iter: - try: - lock_purpose = 'checking PXE boot status' - with task_manager.acquire(context, node_uuid, - shared=True, - purpose=lock_purpose) as task: - self._check_boot_status(task) - except (exception.NodeLocked, exception.NodeNotFound): - continue + self._check_boot_status(task) def _check_boot_status(self, task): if not isinstance(task.driver.boot, PXEBaseMixin): diff --git a/ironic/drivers/modules/redfish/management.py b/ironic/drivers/modules/redfish/management.py index 9a68d99754..ab1a105efb 100644 --- a/ironic/drivers/modules/redfish/management.py +++ b/ironic/drivers/modules/redfish/management.py @@ -15,7 +15,6 @@ import collections -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log from oslo_utils import importutils @@ -29,6 +28,7 @@ from ironic.common.i18n import _ from ironic.common import indicator_states from ironic.common import states from ironic.common import utils +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -853,100 +853,46 @@ class RedfishManagement(base.ManagementInterface): node.save() @METRICS.timer('RedfishManagement._query_firmware_update_failed') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking if async firmware update failed', spacing=CONF.redfish.firmware_update_fail_interval, - enabled=CONF.redfish.firmware_update_fail_interval > 0) - def _query_firmware_update_failed(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANFAIL, + 'maintenance': True}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('firmware_updates'), + ) + def _query_firmware_update_failed(self, task, manager, context): """Periodic job to check for failed firmware updates.""" + if not isinstance(task.driver.management, RedfishManagement): + return - filters = {'reserved': False, 'provision_state': states.CLEANFAIL, - 'maintenance': True} + node = task.node - fields = ['driver_internal_info'] + # A firmware update failed. Discard any remaining firmware + # updates so when the user takes the node out of + # maintenance mode, pending firmware updates do not + # automatically continue. + LOG.warning('Firmware update failed for node %(node)s. ' + 'Discarding remaining firmware updates.', + {'node': node.uuid}) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - firmware_updates = driver_internal_info.get( - 'firmware_updates') - # NOTE(TheJulia): If we don't have a entry upfront, we can - # safely skip past the node as we know work here is not - # required, otherwise minimizing the number of potential - # nodes to visit. - if not firmware_updates: - continue - - lock_purpose = 'checking async firmware update failed.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - RedfishManagement): - continue - - node = task.node - - # A firmware update failed. Discard any remaining firmware - # updates so when the user takes the node out of - # maintenance mode, pending firmware updates do not - # automatically continue. - LOG.warning('Firmware update failed for node %(node)s. ' - 'Discarding remaining firmware updates.', - {'node': node.uuid}) - - task.upgrade_lock() - self._clear_firmware_updates(node) - - except exception.NodeNotFound: - LOG.info('During _query_firmware_update_failed, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_firmware_update_failed, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + task.upgrade_lock() + self._clear_firmware_updates(node) @METRICS.timer('RedfishManagement._query_firmware_update_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async firmware update tasks', spacing=CONF.redfish.firmware_update_status_interval, - enabled=CONF.redfish.firmware_update_status_interval > 0) - def _query_firmware_update_status(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANWAIT}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('firmware_updates'), + ) + def _query_firmware_update_status(self, task, manager, context): """Periodic job to check firmware update tasks.""" + if not isinstance(task.driver.management, RedfishManagement): + return - filters = {'reserved': False, 'provision_state': states.CLEANWAIT} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - firmware_updates = driver_internal_info.get( - 'firmware_updates') - # NOTE(TheJulia): Check and skip upfront before creating a - # task so we don't generate additional tasks and db queries - # for every node in CLEANWAIT which is not locked. - if not firmware_updates: - continue - - lock_purpose = 'checking async firmware update tasks.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.management, - RedfishManagement): - continue - - self._check_node_firmware_update(task) - - except exception.NodeNotFound: - LOG.info('During _query_firmware_update_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_firmware_update_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_node_firmware_update(task) @METRICS.timer('RedfishManagement._check_node_firmware_update') def _check_node_firmware_update(self, task): diff --git a/ironic/drivers/modules/redfish/raid.py b/ironic/drivers/modules/redfish/raid.py index c01d08a9cf..95052bb467 100644 --- a/ironic/drivers/modules/redfish/raid.py +++ b/ironic/drivers/modules/redfish/raid.py @@ -15,7 +15,6 @@ import math -from futurist import periodics from ironic_lib import metrics_utils from oslo_log import log from oslo_utils import importutils @@ -25,7 +24,7 @@ from ironic.common import exception from ironic.common.i18n import _ from ironic.common import raid from ironic.common import states -from ironic.conductor import task_manager +from ironic.conductor import periodics from ironic.conductor import utils as manager_utils from ironic.conf import CONF from ironic.drivers import base @@ -1014,98 +1013,46 @@ class RedfishRAID(base.RAIDInterface): node.save() @METRICS.timer('RedfishRAID._query_raid_config_failed') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async RAID config failed', spacing=CONF.redfish.raid_config_fail_interval, - enabled=CONF.redfish.raid_config_fail_interval > 0) - def _query_raid_config_failed(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANFAIL, + 'maintenance': True}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('raid_configs'), + ) + def _query_raid_config_failed(self, task, manager, context): """Periodic job to check for failed RAID configuration.""" + if not isinstance(task.driver.raid, RedfishRAID): + return - filters = {'reserved': False, 'provision_state': states.CLEANFAIL, - 'maintenance': True} + node = task.node - fields = ['driver_internal_info'] + # A RAID config failed. Discard any remaining RAID + # configs so when the user takes the node out of + # maintenance mode, pending RAID configs do not + # automatically continue. + LOG.warning('RAID configuration failed for node %(node)s. ' + 'Discarding remaining RAID configurations.', + {'node': node.uuid}) - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - raid_configs = driver_internal_info.get( - 'raid_configs') - # NOTE(TheJulia): Evaluate the presence of raid configuration - # activity before pulling the task, so we don't needlessly - # create database queries with tasks which would be skipped - # anyhow. - if not raid_configs: - continue - - lock_purpose = 'checking async RAID config failed.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, RedfishRAID): - continue - - node = task.node - - # A RAID config failed. Discard any remaining RAID - # configs so when the user takes the node out of - # maintenance mode, pending RAID configs do not - # automatically continue. - LOG.warning('RAID configuration failed for node %(node)s. ' - 'Discarding remaining RAID configurations.', - {'node': node.uuid}) - - task.upgrade_lock() - self._clear_raid_configs(node) - - except exception.NodeNotFound: - LOG.info('During _query_raid_config_failed, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_config_failed, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + task.upgrade_lock() + self._clear_raid_configs(node) @METRICS.timer('RedfishRAID._query_raid_config_status') - @periodics.periodic( + @periodics.node_periodic( + purpose='checking async RAID config tasks', spacing=CONF.redfish.raid_config_status_interval, - enabled=CONF.redfish.raid_config_status_interval > 0) - def _query_raid_config_status(self, manager, context): + filters={'reserved': False, 'provision_state': states.CLEANWAIT}, + predicate_extra_fields=['driver_internal_info'], + predicate=lambda n: n.driver_internal_info.get('raid_configs'), + ) + def _query_raid_config_status(self, task, manager, context): """Periodic job to check RAID config tasks.""" + if not isinstance(task.driver.raid, RedfishRAID): + return - filters = {'reserved': False, 'provision_state': states.CLEANWAIT} - fields = ['driver_internal_info'] - - node_list = manager.iter_nodes(fields=fields, filters=filters) - for (node_uuid, driver, conductor_group, - driver_internal_info) in node_list: - try: - raid_configs = driver_internal_info.get( - 'raid_configs') - # NOTE(TheJulia): Skip to next record if we do not - # have raid configuraiton tasks, so we don't pull tasks - # for every unrelated node in CLEANWAIT. - if not raid_configs: - continue - - lock_purpose = 'checking async RAID config tasks.' - with task_manager.acquire(context, node_uuid, - purpose=lock_purpose, - shared=True) as task: - if not isinstance(task.driver.raid, RedfishRAID): - continue - - self._check_node_raid_config(task) - - except exception.NodeNotFound: - LOG.info('During _query_raid_config_status, node ' - '%(node)s was not found and presumed deleted by ' - 'another process.', {'node': node_uuid}) - except exception.NodeLocked: - LOG.info('During _query_raid_config_status, node ' - '%(node)s was already locked by another process. ' - 'Skip.', {'node': node_uuid}) + self._check_node_raid_config(task) def _get_error_messages(self, response): try: diff --git a/ironic/tests/unit/conductor/test_manager.py b/ironic/tests/unit/conductor/test_manager.py index e93b3cb787..a00bb97f83 100644 --- a/ironic/tests/unit/conductor/test_manager.py +++ b/ironic/tests/unit/conductor/test_manager.py @@ -5567,7 +5567,7 @@ class ManagerPowerRecoveryTestCase(mgr_utils.CommonMixIn, self.task.driver = self.driver self.filters = {'maintenance': True, 'fault': 'power failure'} - self.columns = ['uuid', 'driver', 'conductor_group', 'id'] + self.columns = ['uuid', 'driver', 'conductor_group'] def test_node_not_mapped(self, get_nodeinfo_mock, mapped_mock, acquire_mock): @@ -6152,7 +6152,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): self.filters = {'reserved': False, 'maintenance': False, 'provision_state': states.ACTIVE} - self.columns = ['uuid', 'driver', 'conductor_group', 'id', + self.columns = ['uuid', 'driver', 'conductor_group', 'conductor_affinity'] def _assert_get_nodeinfo_args(self, get_nodeinfo_mock): @@ -6200,7 +6200,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): self.service, self.node.uuid, self.node.driver, self.node.conductor_group) acquire_mock.assert_called_once_with(self.context, self.node.uuid, - purpose=mock.ANY) + purpose=mock.ANY, shared=False) # assert spawn_after has been called self.task.spawn_after.assert_called_once_with( self.service._spawn_worker, @@ -6234,7 +6234,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called 2 times only instead of 3. When # NoFreeConductorWorker is raised the loop should be broken expected = [mock.call(self.context, self.node.uuid, - purpose=mock.ANY)] * 2 + purpose=mock.ANY, shared=False)] * 2 self.assertEqual(expected, acquire_mock.call_args_list) # assert spawn_after has been called twice @@ -6264,7 +6264,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called 3 times expected = [mock.call(self.context, self.node.uuid, - purpose=mock.ANY)] * 3 + purpose=mock.ANY, shared=False)] * 3 self.assertEqual(expected, acquire_mock.call_args_list) # assert spawn_after has been called only 2 times @@ -6296,7 +6296,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase): # assert acquire() gets called only once because of the worker limit acquire_mock.assert_called_once_with(self.context, self.node.uuid, - purpose=mock.ANY) + purpose=mock.ANY, shared=False) # assert spawn_after has been called self.task.spawn_after.assert_called_once_with( diff --git a/ironic/tests/unit/conductor/test_periodics.py b/ironic/tests/unit/conductor/test_periodics.py new file mode 100644 index 0000000000..85868163a0 --- /dev/null +++ b/ironic/tests/unit/conductor/test_periodics.py @@ -0,0 +1,135 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import mock + +from oslo_utils import uuidutils + +from ironic.common import context as ironic_context +from ironic.conductor import base_manager +from ironic.conductor import periodics +from ironic.conductor import task_manager +from ironic.tests.unit.db import base as db_base +from ironic.tests.unit.objects import utils as obj_utils + + +_FILTERS = {'maintenance': False} + + +class PeriodicTestService(base_manager.BaseConductorManager): + + def __init__(self, test): + self.test = test + self.nodes = [] + + @periodics.node_periodic(purpose="herding cats", spacing=42) + def simple(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertTrue(task.shared) + self.nodes.append(task.node.uuid) + + @periodics.node_periodic(purpose="herding cats", spacing=42, + shared_task=False, filters=_FILTERS) + def exclusive(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertFalse(task.shared) + self.nodes.append(task.node.uuid) + + @periodics.node_periodic(purpose="never running", spacing=42, + predicate=lambda n: n.cat != 'meow', + predicate_extra_fields=['cat']) + def never_run(self, task, context): + self.test.fail(f"Was not supposed to run, ran with {task.node}") + + @periodics.node_periodic(purpose="herding cats", spacing=42, limit=3) + def limit(self, task, context): + self.test.assertIsInstance(context, ironic_context.RequestContext) + self.test.assertTrue(task.shared) + self.nodes.append(task.node.uuid) + if task.node.uuid == 'stop': + raise periodics.Stop() + + +@mock.patch.object(PeriodicTestService, 'iter_nodes', autospec=True) +class NodePeriodicTestCase(db_base.DbTestCase): + + def setUp(self): + super().setUp() + self.service = PeriodicTestService(self) + self.ctx = ironic_context.get_admin_context() + self.uuid = uuidutils.generate_uuid() + self.node = obj_utils.create_test_node(self.context, uuid=self.uuid) + + def test_simple(self, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (uuidutils.generate_uuid(), 'driver1', ''), + (self.uuid, 'driver2', 'group'), + ]) + + self.service.simple(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual([self.uuid], self.service.nodes) + + def test_exclusive(self, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (uuidutils.generate_uuid(), 'driver1', ''), + (self.uuid, 'driver2', 'group'), + ]) + + self.service.exclusive(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=_FILTERS, + fields=()) + self.assertEqual([self.uuid], self.service.nodes) + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_never_run(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver2', 'group', 'meow'), + ]) + + self.service.never_run(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, + fields=['cat']) + self.assertEqual([], self.service.nodes) + mock_acquire.assert_not_called() + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_limit(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver1', ''), + ] * 10) + mock_acquire.return_value.__enter__.return_value.node.uuid = self.uuid + + self.service.limit(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual([self.uuid] * 3, self.service.nodes) + + @mock.patch.object(task_manager, 'acquire', autospec=True) + def test_stop(self, mock_acquire, mock_iter_nodes): + mock_iter_nodes.return_value = iter([ + (self.uuid, 'driver1', ''), + ] * 10) + mock_acquire.return_value.__enter__.return_value.node.uuid = 'stop' + + self.service.limit(self.ctx) + + mock_iter_nodes.assert_called_once_with(self.service, + filters=None, fields=()) + self.assertEqual(['stop'], self.service.nodes) diff --git a/ironic/tests/unit/drivers/modules/drac/test_management.py b/ironic/tests/unit/drivers/modules/drac/test_management.py index f3d23d9a83..9d5182e899 100644 --- a/ironic/tests/unit/drivers/modules/drac/test_management.py +++ b/ironic/tests/unit/drivers/modules/drac/test_management.py @@ -28,6 +28,7 @@ from oslo_utils import importutils import ironic.common.boot_devices from ironic.common import exception from ironic.common import molds +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.drivers.modules import deploy_utils @@ -1021,7 +1022,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest): self.management._check_import_configuration_task.assert_not_called() - @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_import_configuration_status_node_notfound( self, mock_acquire, mock_log): @@ -1044,7 +1045,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest): self.management._check_import_configuration_task.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(drac_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_import_configuration_status_node_locked( self, mock_acquire, mock_log): diff --git a/ironic/tests/unit/drivers/modules/drac/test_raid.py b/ironic/tests/unit/drivers/modules/drac/test_raid.py index 1a5928e431..01a5ca9d15 100644 --- a/ironic/tests/unit/drivers/modules/drac/test_raid.py +++ b/ironic/tests/unit/drivers/modules/drac/test_raid.py @@ -25,6 +25,7 @@ import tenacity from ironic.common import exception from ironic.common import states +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.conf import CONF @@ -2592,7 +2593,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest): self.raid._check_raid_tasks_status.assert_not_called() - @mock.patch.object(drac_raid.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_raid_tasks_status_node_notfound( self, mock_acquire, mock_log): @@ -2610,7 +2611,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest): self.raid._check_raid_tasks_status.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(drac_raid.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_raid_tasks_status_node_locked( self, mock_acquire, mock_log): diff --git a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py index 865f589626..57ba8263fd 100644 --- a/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py +++ b/ironic/tests/unit/drivers/modules/irmc/test_periodic_task.py @@ -49,6 +49,8 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest): { 'key': 'value' }]} + self.node.raid_config = self.raid_config + self.node.target_raid_config = self.target_raid_config @mock.patch.object(irmc_common, 'get_irmc_report', autospec=True) def test__query_raid_config_fgi_status_without_node( @@ -286,6 +288,7 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest): mock_manager = mock.Mock() raid_config = self.raid_config raid_config_2 = self.raid_config.copy() + self.node_2.raid_config = raid_config_2 fgi_status_dict = {} fgi_mock.side_effect = [{}, {'0': 'Idle', '1': 'Idle'}] node_list = [(self.node_2.uuid, 'fake-hardware', '', raid_config_2), diff --git a/ironic/tests/unit/drivers/modules/redfish/test_management.py b/ironic/tests/unit/drivers/modules/redfish/test_management.py index 99da1265b4..d5f23b93f0 100644 --- a/ironic/tests/unit/drivers/modules/redfish/test_management.py +++ b/ironic/tests/unit/drivers/modules/redfish/test_management.py @@ -25,6 +25,7 @@ from ironic.common import components from ironic.common import exception from ironic.common import indicator_states from ironic.common import states +from ironic.conductor import periodics from ironic.conductor import task_manager from ironic.conductor import utils as manager_utils from ironic.drivers.modules import deploy_utils @@ -905,7 +906,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._clear_firmware_updates.assert_not_called() - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_failed_node_notfound(self, mock_acquire, mock_log): @@ -928,7 +929,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._clear_firmware_updates.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_failed_node_locked( self, mock_acquire, mock_log): @@ -1017,7 +1018,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._check_node_firmware_update.assert_not_called() - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_status_node_notfound(self, mock_acquire, mock_log): @@ -1040,7 +1041,7 @@ class RedfishManagementTestCase(db_base.DbTestCase): management._check_node_firmware_update.assert_not_called() self.assertTrue(mock_log.called) - @mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True) + @mock.patch.object(periodics.LOG, 'info', autospec=True) @mock.patch.object(task_manager, 'acquire', autospec=True) def test__query_firmware_update_status_node_locked( self, mock_acquire, mock_log):