Add a helper for node-based periodics
We have a very common pattern of periodic tasks that use iter_nodes to fetch some nodes, check them, create a task and conductor some operation. This change introduces a helper decorator for that and migrates the drivers to it. I'm intentionally leaving unit tests intact to demonstrate that the new decorator works exactly the same way (modulo cosmetic changes) as the previous hand-written code. Change-Id: Ifed4a457275d9451cc412dc80f3c09df72f50492 Story: #2009203 Task: #43522
This commit is contained in:
parent
7f9badb543
commit
cf1b42ea3d
@ -42,7 +42,7 @@ Drivers may run their own periodic tasks, i.e. actions run repeatedly after
|
||||
a certain amount of time. Such a task is created by using the periodic_
|
||||
decorator on an interface method. For example
|
||||
|
||||
::
|
||||
.. code-block:: python
|
||||
|
||||
from futurist import periodics
|
||||
|
||||
@ -55,6 +55,11 @@ decorator on an interface method. For example
|
||||
Here the ``spacing`` argument is a period in seconds for a given periodic task.
|
||||
For example 'spacing=5' means every 5 seconds.
|
||||
|
||||
Starting with the Yoga cycle, there is also a new decorator
|
||||
:py:func:`ironic.conductor.periodics.node_periodic` to create periodic tasks
|
||||
that handle nodes. See :ref:`deploy steps documentation <deploy-steps-polling>`
|
||||
for an example.
|
||||
|
||||
Driver-Specific Steps
|
||||
---------------------
|
||||
|
||||
|
@ -188,6 +188,95 @@ following pattern:
|
||||
|
||||
return deploy_utils.reboot_to_finish_step(task)
|
||||
|
||||
.. _deploy-steps-polling:
|
||||
|
||||
Polling for completion
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Finally, you may want to poll the BMC until the operation is complete. Often
|
||||
enough, this also involves a reboot. In this case you can use the
|
||||
:py:func:`ironic.conductor.periodics.node_periodic` decorator to create a
|
||||
periodic task that operates on relevant nodes:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from ironic.common import states
|
||||
from ironic.common import utils
|
||||
from ironic.conductor import periodics
|
||||
from ironic.drivers import base
|
||||
from ironic.drivers.modules import deploy_utils
|
||||
|
||||
_STATUS_CHECK_INTERVAL = ... # better use a configuration option
|
||||
|
||||
class MyManagement(base.ManagementInterface):
|
||||
...
|
||||
|
||||
@base.clean_step(priority=0)
|
||||
def my_action(self, task):
|
||||
...
|
||||
|
||||
reboot_required = ... # your step may or may not need rebooting
|
||||
|
||||
# Make this node as running my_action. Often enough you will store
|
||||
# some useful data rather than a boolean flag.
|
||||
utils.set_node_nested_field(task.node, 'driver_internal_info',
|
||||
'in_my_action', True)
|
||||
|
||||
# Tell ironic that...
|
||||
deploy_utils.set_async_step_flags(
|
||||
node,
|
||||
# ... we're waiting for IPA to come back after reboot
|
||||
reboot=reboot_required,
|
||||
# ... the current step shouldn't be entered again
|
||||
skip_current_step=True,
|
||||
# ... we'll be polling until the step is done
|
||||
polling=True)
|
||||
|
||||
if reboot_required:
|
||||
return deploy_utils.reboot_to_finish_step(task)
|
||||
|
||||
@periodics.node_periodic(
|
||||
purpose='checking my action status',
|
||||
spacing=_STATUS_CHECK_INTERVAL,
|
||||
filters={
|
||||
# Skip nodes that already have a lock
|
||||
'reserved': False,
|
||||
# Only consider nodes that are waiting for cleaning or failed
|
||||
# on timeout.
|
||||
'provision_state_in': [states.CLEANWAIT, states.CLEANFAIL],
|
||||
},
|
||||
# Load driver_internal_info from the database on listing
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
# Only consider nodes with in_my_action
|
||||
predicate=lambda n: n.driver_internal_info.get('in_my_action'),
|
||||
)
|
||||
def check_my_action(self, task, manager, context):
|
||||
# Double-check that the node is managed by this interface
|
||||
if not isinstance(task.driver.management, MyManagement):
|
||||
return
|
||||
|
||||
if not needs_actions(): # insert your checks here
|
||||
return
|
||||
|
||||
task.upgrade_lock()
|
||||
|
||||
... # do any required updates
|
||||
|
||||
# Drop the flag so that this node is no longer considered
|
||||
utils.pop_node_nested_field(task.node, 'driver_internal_info',
|
||||
'in_my_action')
|
||||
|
||||
Note that creating a ``task`` involves an additional database query, so you
|
||||
want to avoid creating them for too many nodes in your periodic tasks. Instead:
|
||||
|
||||
* Try to use precise ``filters`` to filter out nodes on the database level.
|
||||
Using ``reserved`` and ``provision_state``/``provision_state_in`` are
|
||||
recommended in most cases. See
|
||||
:py:meth:`ironic.db.api.Connection.get_nodeinfo_list` for a list of possible
|
||||
filters.
|
||||
* Use ``predicate`` to filter on complex fields such as
|
||||
``driver_internal_info``. Predicates are checked before tasks are created.
|
||||
|
||||
Implementing RAID
|
||||
-----------------
|
||||
|
||||
|
@ -45,7 +45,6 @@ import datetime
|
||||
import queue
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from futurist import waiters
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log
|
||||
@ -66,6 +65,7 @@ from ironic.conductor import base_manager
|
||||
from ironic.conductor import cleaning
|
||||
from ironic.conductor import deployments
|
||||
from ironic.conductor import notification_utils as notify_utils
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import steps as conductor_steps
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils
|
||||
@ -1497,10 +1497,15 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
eventlet.sleep(0)
|
||||
|
||||
@METRICS.timer('ConductorManager._power_failure_recovery')
|
||||
@periodics.periodic(spacing=CONF.conductor.power_failure_recovery_interval,
|
||||
enabled=bool(
|
||||
CONF.conductor.power_failure_recovery_interval))
|
||||
def _power_failure_recovery(self, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='power failure recovery',
|
||||
spacing=CONF.conductor.power_failure_recovery_interval,
|
||||
# NOTE(kaifeng) To avoid conflicts with periodic task of the
|
||||
# regular power state checking, maintenance is still a required
|
||||
# condition.
|
||||
filters={'maintenance': True, 'fault': faults.POWER_FAILURE},
|
||||
)
|
||||
def _power_failure_recovery(self, task, context):
|
||||
"""Periodic task to check power states for nodes in maintenance.
|
||||
|
||||
Attempt to grab a lock and sync only if the following
|
||||
@ -1511,19 +1516,6 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
3) Node is not reserved.
|
||||
4) Node is not in the ENROLL state.
|
||||
"""
|
||||
def should_sync_power_state_for_recovery(task):
|
||||
"""Check if ironic should sync power state for recovery."""
|
||||
|
||||
# NOTE(dtantsur): it's also pointless (and dangerous) to
|
||||
# sync power state when a power action is in progress
|
||||
if (task.node.provision_state == states.ENROLL
|
||||
or not task.node.maintenance
|
||||
or task.node.fault != faults.POWER_FAILURE
|
||||
or task.node.target_power_state
|
||||
or task.node.reservation):
|
||||
return False
|
||||
return True
|
||||
|
||||
def handle_recovery(task, actual_power_state):
|
||||
"""Handle recovery when power sync is succeeded."""
|
||||
task.upgrade_lock()
|
||||
@ -1546,19 +1538,15 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
notify_utils.emit_power_state_corrected_notification(
|
||||
task, old_power_state)
|
||||
|
||||
# NOTE(kaifeng) To avoid conflicts with periodic task of the
|
||||
# regular power state checking, maintenance is still a required
|
||||
# condition.
|
||||
filters = {'maintenance': True,
|
||||
'fault': faults.POWER_FAILURE}
|
||||
node_iter = self.iter_nodes(fields=['id'], filters=filters)
|
||||
for (node_uuid, driver, conductor_group, node_id) in node_iter:
|
||||
try:
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose='power failure recovery',
|
||||
shared=True) as task:
|
||||
if not should_sync_power_state_for_recovery(task):
|
||||
continue
|
||||
# NOTE(dtantsur): it's also pointless (and dangerous) to
|
||||
# sync power state when a power action is in progress
|
||||
if (task.node.provision_state == states.ENROLL
|
||||
or not task.node.maintenance
|
||||
or task.node.fault != faults.POWER_FAILURE
|
||||
or task.node.target_power_state
|
||||
or task.node.reservation):
|
||||
return
|
||||
|
||||
try:
|
||||
# Validate driver info in case of parameter changed
|
||||
# in maintenance.
|
||||
@ -1577,17 +1565,6 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
{'node': task.node.uuid, 'err': e})
|
||||
else:
|
||||
handle_recovery(task, power_state)
|
||||
except exception.NodeNotFound:
|
||||
LOG.info("During power_failure_recovery, node %(node)s was "
|
||||
"not found and presumed deleted by another process.",
|
||||
{'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info("During power_failure_recovery, node %(node)s was "
|
||||
"already locked by another process. Skip.",
|
||||
{'node': node_uuid})
|
||||
finally:
|
||||
# Yield on every iteration
|
||||
eventlet.sleep(0)
|
||||
|
||||
@METRICS.timer('ConductorManager._check_deploy_timeouts')
|
||||
@periodics.periodic(
|
||||
@ -1869,9 +1846,17 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
)
|
||||
|
||||
@METRICS.timer('ConductorManager._sync_local_state')
|
||||
@periodics.periodic(spacing=CONF.conductor.sync_local_state_interval,
|
||||
enabled=CONF.conductor.sync_local_state_interval > 0)
|
||||
def _sync_local_state(self, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='node take over',
|
||||
spacing=CONF.conductor.sync_local_state_interval,
|
||||
filters={'reserved': False, 'maintenance': False,
|
||||
'provision_state': states.ACTIVE},
|
||||
predicate_extra_fields=['conductor_affinity'],
|
||||
predicate=lambda n, m: n.conductor_affinity != m.conductor.id,
|
||||
limit=lambda: CONF.conductor.periodic_max_workers,
|
||||
shared_task=False,
|
||||
)
|
||||
def _sync_local_state(self, task, context):
|
||||
"""Perform any actions necessary to sync local state.
|
||||
|
||||
This is called periodically to refresh the conductor's copy of the
|
||||
@ -1880,40 +1865,20 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
The ensuing actions could include preparing a PXE environment,
|
||||
updating the DHCP server, and so on.
|
||||
"""
|
||||
filters = {'reserved': False,
|
||||
'maintenance': False,
|
||||
'provision_state': states.ACTIVE}
|
||||
node_iter = self.iter_nodes(fields=['id', 'conductor_affinity'],
|
||||
filters=filters)
|
||||
|
||||
workers_count = 0
|
||||
for (node_uuid, driver, conductor_group, node_id,
|
||||
conductor_affinity) in node_iter:
|
||||
if conductor_affinity == self.conductor.id:
|
||||
continue
|
||||
|
||||
# Node is mapped here, but not updated by this conductor last
|
||||
try:
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose='node take over') as task:
|
||||
# NOTE(tenbrae): now that we have the lock, check again to
|
||||
# avoid racing with deletes and other state changes
|
||||
node = task.node
|
||||
if (node.maintenance
|
||||
or node.conductor_affinity == self.conductor.id
|
||||
or node.provision_state != states.ACTIVE):
|
||||
continue
|
||||
|
||||
task.spawn_after(self._spawn_worker,
|
||||
self._do_takeover, task)
|
||||
return False
|
||||
|
||||
try:
|
||||
task.spawn_after(self._spawn_worker, self._do_takeover, task)
|
||||
except exception.NoFreeConductorWorker:
|
||||
break
|
||||
except (exception.NodeLocked, exception.NodeNotFound):
|
||||
continue
|
||||
workers_count += 1
|
||||
if workers_count == CONF.conductor.periodic_max_workers:
|
||||
break
|
||||
raise periodics.Stop()
|
||||
else:
|
||||
return True
|
||||
|
||||
@METRICS.timer('ConductorManager.validate_driver_interfaces')
|
||||
@messaging.expected_exceptions(exception.NodeLocked)
|
||||
|
151
ironic/conductor/periodics.py
Normal file
151
ironic/conductor/periodics.py
Normal file
@ -0,0 +1,151 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Conductor periodics."""
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
from oslo_log import log
|
||||
|
||||
from ironic.common import exception
|
||||
from ironic.conductor import base_manager
|
||||
from ironic.conductor import task_manager
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
def periodic(spacing, enabled=True, **kwargs):
|
||||
"""A decorator to define a periodic task.
|
||||
|
||||
:param spacing: how often (in seconds) to run the periodic task.
|
||||
:param enabled: whether the task is enabled; defaults to ``spacing > 0``.
|
||||
"""
|
||||
return periodics.periodic(spacing=spacing,
|
||||
enabled=enabled and spacing > 0,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class Stop(Exception):
|
||||
"""A signal to stop the current iteration of a periodic task."""
|
||||
|
||||
|
||||
def node_periodic(purpose, spacing, enabled=True, filters=None,
|
||||
predicate=None, predicate_extra_fields=(), limit=None,
|
||||
shared_task=True):
|
||||
"""A decorator to define a periodic task to act on nodes.
|
||||
|
||||
Defines a periodic task that fetches the list of nodes mapped to the
|
||||
current conductor which satisfy the provided filters.
|
||||
|
||||
The decorated function must be a method on either the conductor manager
|
||||
or a hardware interface. The signature is:
|
||||
|
||||
* for conductor manager: ``(self, task, context)``
|
||||
* for hardware interfaces: ``(self, task, manager, context)``.
|
||||
|
||||
``NodeNotFound`` and ``NodeLocked`` exceptions are ignored. Raise ``Stop``
|
||||
to abort the current iteration of the task and reschedule it.
|
||||
|
||||
:param purpose: a human-readable description of the activity, e.g.
|
||||
"verifying that the cat is purring".
|
||||
:param spacing: how often (in seconds) to run the periodic task.
|
||||
:param enabled: whether the task is enabled; defaults to ``spacing > 0``.
|
||||
:param filters: database-level filters for the nodes.
|
||||
:param predicate: a callable to run on the fetched nodes *before* creating
|
||||
a task for them. The only parameter will be a named tuple with fields
|
||||
``uuid``, ``driver``, ``conductor_group`` plus everything from
|
||||
``predicate_extra_fields``. If the callable accepts a 2nd parameter,
|
||||
it will be the conductor manager instance.
|
||||
:param predicate_extra_fields: extra fields to fetch on the initial
|
||||
request and pass into the ``predicate``. Must not contain ``uuid``,
|
||||
``driver`` and ``conductor_group`` since they are always included.
|
||||
:param limit: how many nodes to process before stopping the current
|
||||
iteration. If ``predicate`` returns ``False``, the node is not counted.
|
||||
If the decorated function returns ``False``, the node is not counted
|
||||
either. Can be a callable, in which case it will be called on each
|
||||
iteration to determine the limit.
|
||||
:param shared_task: if ``True``, the task will have a shared lock. It is
|
||||
recommended to start with a shared lock and upgrade it only if needed.
|
||||
"""
|
||||
node_type = collections.namedtuple(
|
||||
'Node',
|
||||
['uuid', 'driver', 'conductor_group'] + list(predicate_extra_fields)
|
||||
)
|
||||
|
||||
# Accepting a conductor manager is a bit of an edge case, doing a bit of
|
||||
# a signature magic to avoid passing it everywhere.
|
||||
accepts_manager = (predicate is not None
|
||||
and len(inspect.signature(predicate).parameters) > 1)
|
||||
|
||||
def decorator(func):
|
||||
@periodic(spacing=spacing, enabled=enabled)
|
||||
@functools.wraps(func)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
# Make it work with both drivers and the conductor manager
|
||||
if isinstance(self, base_manager.BaseConductorManager):
|
||||
manager = self
|
||||
context = args[0]
|
||||
else:
|
||||
manager = args[0]
|
||||
context = args[1]
|
||||
|
||||
if callable(limit):
|
||||
local_limit = limit()
|
||||
else:
|
||||
local_limit = limit
|
||||
assert local_limit is None or local_limit > 0
|
||||
|
||||
nodes = manager.iter_nodes(filters=filters,
|
||||
fields=predicate_extra_fields)
|
||||
for (node_uuid, *other) in nodes:
|
||||
if predicate is not None:
|
||||
node = node_type(node_uuid, *other)
|
||||
if accepts_manager:
|
||||
result = predicate(node, manager)
|
||||
else:
|
||||
result = predicate(node)
|
||||
if not result:
|
||||
continue
|
||||
|
||||
try:
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=purpose,
|
||||
shared=shared_task) as task:
|
||||
result = func(self, task, *args, **kwargs)
|
||||
except exception.NodeNotFound:
|
||||
LOG.info("During %(action)s, node %(node)s was not found "
|
||||
"and presumed deleted by another process.",
|
||||
{'node': node_uuid, 'action': purpose})
|
||||
except exception.NodeLocked:
|
||||
LOG.info("During %(action)s, node %(node)s was already "
|
||||
"locked by another process. Skip.",
|
||||
{'node': node_uuid, 'action': purpose})
|
||||
except Stop:
|
||||
break
|
||||
finally:
|
||||
# Yield on every iteration
|
||||
eventlet.sleep(0)
|
||||
|
||||
if (local_limit is not None
|
||||
and (result is None or result)):
|
||||
local_limit -= 1
|
||||
if not local_limit:
|
||||
return
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
@ -15,7 +15,6 @@
|
||||
DRAC BIOS configuration specific methods
|
||||
"""
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
@ -23,7 +22,7 @@ from oslo_utils import timeutils
|
||||
|
||||
from ironic.common import exception
|
||||
from ironic.common.i18n import _
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
from ironic.drivers import base
|
||||
@ -151,9 +150,16 @@ class DracWSManBIOS(base.BIOSInterface):
|
||||
# spacing since BIOS jobs could be comparatively shorter in time than
|
||||
# RAID ones currently using the raid spacing to avoid errors
|
||||
# spacing parameter for periodic method
|
||||
@periodics.periodic(
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval)
|
||||
def _query_bios_config_job_status(self, manager, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async bios configuration jobs',
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval,
|
||||
filters={'reserved': False, 'maintenance': False},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: (
|
||||
n.driver_internal_info.get('bios_config_job_ids')
|
||||
or n.driver_internal_info.get('factory_reset_time_before_reboot')),
|
||||
)
|
||||
def _query_bios_config_job_status(self, task, manager, context):
|
||||
"""Periodic task to check the progress of running BIOS config jobs.
|
||||
|
||||
:param manager: an instance of Ironic Conductor Manager with
|
||||
@ -161,48 +167,18 @@ class DracWSManBIOS(base.BIOSInterface):
|
||||
:param context: context of the request, needed when acquiring
|
||||
a lock on a node. For access control.
|
||||
"""
|
||||
|
||||
filters = {'reserved': False, 'maintenance': False}
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
# NOTE(TheJulia) Evaluate if work is actually required before
|
||||
# creating a task for every node in the deployment which does
|
||||
# not have a lock and is not in maintenance mode.
|
||||
if (not driver_internal_info.get("bios_config_job_ids")
|
||||
and not driver_internal_info.get(
|
||||
"factory_reset_time_before_reboot")):
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async bios configuration jobs'
|
||||
# Performing read-only/non-destructive work with shared lock
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
# skip a node not being managed by idrac driver
|
||||
if not isinstance(task.driver.bios, DracWSManBIOS):
|
||||
continue
|
||||
return
|
||||
|
||||
# check bios_config_job_id exist & checks job is completed
|
||||
if driver_internal_info.get("bios_config_job_ids"):
|
||||
if task.node.driver_internal_info.get("bios_config_job_ids"):
|
||||
self._check_node_bios_jobs(task)
|
||||
|
||||
if driver_internal_info.get(
|
||||
if task.node.driver_internal_info.get(
|
||||
"factory_reset_time_before_reboot"):
|
||||
self._check_last_system_inventory_changed(task)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info("During query_bios_config_job_status, node "
|
||||
"%(node)s was not found and presumed deleted by "
|
||||
"another process.", {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info("During query_bios_config_job_status, node "
|
||||
"%(node)s was already locked by another process. "
|
||||
"Skip.", {'node': node_uuid})
|
||||
|
||||
def _check_last_system_inventory_changed(self, task):
|
||||
"""Check the progress of last system inventory time of a node.
|
||||
|
||||
|
@ -23,7 +23,6 @@ DRAC management interface
|
||||
import json
|
||||
import time
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
import jsonschema
|
||||
from jsonschema import exceptions as json_schema_exc
|
||||
@ -34,6 +33,7 @@ from ironic.common import boot_devices
|
||||
from ironic.common import exception
|
||||
from ironic.common.i18n import _
|
||||
from ironic.common import molds
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
@ -485,46 +485,23 @@ class DracRedfishManagement(redfish_management.RedfishManagement):
|
||||
# Export executed as part of Import async periodic task status check
|
||||
|
||||
@METRICS.timer('DracRedfishManagement._query_import_configuration_status')
|
||||
@periodics.periodic(
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async import configuration task',
|
||||
spacing=CONF.drac.query_import_config_job_status_interval,
|
||||
enabled=CONF.drac.query_import_config_job_status_interval > 0)
|
||||
def _query_import_configuration_status(self, manager, context):
|
||||
filters={'reserved': False, 'maintenance': False},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: (
|
||||
n.driver_internal_info.get('import_task_monitor_url')
|
||||
),
|
||||
)
|
||||
def _query_import_configuration_status(self, task, manager, context):
|
||||
"""Period job to check import configuration task."""
|
||||
if not isinstance(task.driver.management, DracRedfishManagement):
|
||||
return
|
||||
|
||||
filters = {'reserved': False, 'maintenance': False}
|
||||
fields = ['driver_internal_info']
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
|
||||
task_monitor_url = driver_internal_info.get(
|
||||
'import_task_monitor_url')
|
||||
# NOTE(TheJulia): Evaluate if a task montitor URL exists
|
||||
# based upon our inital DB query before pulling a task for
|
||||
# every node in the deployment which reduces the overall
|
||||
# number of DB queries triggering in the background where
|
||||
# no work is required.
|
||||
if not task_monitor_url:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async import configuration task'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.management,
|
||||
DracRedfishManagement):
|
||||
continue
|
||||
self._check_import_configuration_task(
|
||||
task, task_monitor_url)
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_import_configuration_status, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_import_configuration_status, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
task, task.node.driver_internal_info.get(
|
||||
'import_task_monitor_url'))
|
||||
|
||||
def _check_import_configuration_task(self, task, task_monitor_url):
|
||||
"""Checks progress of running import configuration task"""
|
||||
|
@ -18,7 +18,6 @@ DRAC RAID specific methods
|
||||
from collections import defaultdict
|
||||
import math
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
@ -28,7 +27,7 @@ import tenacity
|
||||
from ironic.common import exception
|
||||
from ironic.common.i18n import _
|
||||
from ironic.common import raid as raid_common
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
from ironic.drivers import base
|
||||
@ -1487,38 +1486,22 @@ class DracRedfishRAID(redfish_raid.RedfishRAID):
|
||||
return False
|
||||
|
||||
@METRICS.timer('DracRedfishRAID._query_raid_tasks_status')
|
||||
@periodics.periodic(
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval)
|
||||
def _query_raid_tasks_status(self, manager, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async RAID tasks',
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval,
|
||||
filters={'reserved': False, 'maintenance': False},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: (
|
||||
n.driver_internal_info.get('raid_task_monitor_uris')
|
||||
),
|
||||
)
|
||||
def _query_raid_tasks_status(self, task, manager, context):
|
||||
"""Periodic task to check the progress of running RAID tasks"""
|
||||
if not isinstance(task.driver.raid, DracRedfishRAID):
|
||||
return
|
||||
|
||||
filters = {'reserved': False, 'maintenance': False}
|
||||
fields = ['driver_internal_info']
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
task_monitor_uris = driver_internal_info.get(
|
||||
'raid_task_monitor_uris')
|
||||
if not task_monitor_uris:
|
||||
continue
|
||||
try:
|
||||
lock_purpose = 'checking async RAID tasks'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.raid,
|
||||
DracRedfishRAID):
|
||||
continue
|
||||
self._check_raid_tasks_status(
|
||||
task, task_monitor_uris)
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_raid_tasks_status, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_raid_tasks_status, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
task, task.node.driver_internal_info.get('raid_task_monitor_uris'))
|
||||
|
||||
def _check_raid_tasks_status(self, task, task_mon_uris):
|
||||
"""Checks RAID tasks for completion
|
||||
@ -1763,44 +1746,22 @@ class DracWSManRAID(base.RAIDInterface):
|
||||
return {'logical_disks': logical_disks}
|
||||
|
||||
@METRICS.timer('DracRAID._query_raid_config_job_status')
|
||||
@periodics.periodic(
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval)
|
||||
def _query_raid_config_job_status(self, manager, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async raid configuration jobs',
|
||||
spacing=CONF.drac.query_raid_config_job_status_interval,
|
||||
filters={'reserved': False, 'maintenance': False},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: (
|
||||
n.driver_internal_info.get('raid_config_job_ids')
|
||||
),
|
||||
)
|
||||
def _query_raid_config_job_status(self, task, manager, context):
|
||||
"""Periodic task to check the progress of running RAID config jobs."""
|
||||
|
||||
filters = {'reserved': False, 'maintenance': False}
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
|
||||
job_ids = driver_internal_info.get('raid_config_job_ids')
|
||||
# NOTE(TheJulia): Evaluate if there is work to be done
|
||||
# based upon the original DB query's results so we don't
|
||||
# proceed creating tasks for every node in the deployment.
|
||||
if not job_ids:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async raid configuration jobs'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.raid, DracWSManRAID):
|
||||
continue
|
||||
return
|
||||
|
||||
self._check_node_raid_jobs(task)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info("During query_raid_config_job_status, node "
|
||||
"%(node)s was not found and presumed deleted by "
|
||||
"another process.", {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info("During query_raid_config_job_status, node "
|
||||
"%(node)s was already locked by another process. "
|
||||
"Skip.", {'node': node_uuid})
|
||||
|
||||
@METRICS.timer('DracRAID._check_node_raid_jobs')
|
||||
def _check_node_raid_jobs(self, task):
|
||||
"""Check the progress of running RAID config jobs of a node."""
|
||||
|
@ -20,7 +20,6 @@ import shlex
|
||||
from urllib import parse as urlparse
|
||||
|
||||
import eventlet
|
||||
from futurist import periodics
|
||||
import openstack
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -29,6 +28,7 @@ from ironic.common.i18n import _
|
||||
from ironic.common import keystone
|
||||
from ironic.common import states
|
||||
from ironic.common import utils
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as cond_utils
|
||||
from ironic.conf import CONF
|
||||
@ -292,21 +292,14 @@ class Inspector(base.InspectInterface):
|
||||
'ironic-inspector', {'uuid': node_uuid})
|
||||
_get_client(task.context).abort_introspection(node_uuid)
|
||||
|
||||
@periodics.periodic(spacing=CONF.inspector.status_check_period)
|
||||
def _periodic_check_result(self, manager, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='checking hardware inspection status',
|
||||
spacing=CONF.inspector.status_check_period,
|
||||
filters={'provision_state': states.INSPECTWAIT},
|
||||
)
|
||||
def _periodic_check_result(self, task, manager, context):
|
||||
"""Periodic task checking results of inspection."""
|
||||
filters = {'provision_state': states.INSPECTWAIT}
|
||||
node_iter = manager.iter_nodes(filters=filters)
|
||||
|
||||
for node_uuid, driver, conductor_group in node_iter:
|
||||
try:
|
||||
lock_purpose = 'checking hardware inspection status'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
shared=True,
|
||||
purpose=lock_purpose) as task:
|
||||
_check_status(task)
|
||||
except (exception.NodeLocked, exception.NodeNotFound):
|
||||
continue
|
||||
|
||||
|
||||
def _start_inspection(node_uuid, context):
|
||||
|
@ -15,7 +15,6 @@
|
||||
"""
|
||||
Irmc RAID specific methods
|
||||
"""
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
@ -23,7 +22,7 @@ from oslo_utils import importutils
|
||||
from ironic.common import exception
|
||||
from ironic.common import raid as raid_common
|
||||
from ironic.common import states
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic import conf
|
||||
from ironic.drivers import base
|
||||
@ -430,63 +429,55 @@ class IRMCRAID(base.RAIDInterface):
|
||||
{'node_id': node_uuid, 'cfg': node.raid_config})
|
||||
|
||||
@METRICS.timer('IRMCRAID._query_raid_config_fgi_status')
|
||||
@periodics.periodic(
|
||||
spacing=CONF.irmc.query_raid_config_fgi_status_interval)
|
||||
def _query_raid_config_fgi_status(self, manager, context):
|
||||
"""Periodic tasks to check the progress of running RAID config."""
|
||||
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async RAID configuration tasks',
|
||||
spacing=CONF.irmc.query_raid_config_fgi_status_interval,
|
||||
filters={'reserved': False, 'provision_state': states.CLEANWAIT,
|
||||
'maintenance': False}
|
||||
fields = ['raid_config']
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group, raid_config) in node_list:
|
||||
try:
|
||||
# NOTE(TheJulia): Evaluate based upon presence of raid
|
||||
# configuration before triggering a task, as opposed to after
|
||||
# so we don't create excess node task objects with related
|
||||
# DB queries.
|
||||
if not raid_config or raid_config.get('fgi_status'):
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async RAID configuration tasks'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
'maintenance': False},
|
||||
predicate_extra_fields=['raid_config'],
|
||||
predicate=lambda n: (
|
||||
n.raid_config and not n.raid_config.get('fgi_status')
|
||||
),
|
||||
)
|
||||
def _query_raid_config_fgi_status(self, task, manager, context):
|
||||
"""Periodic tasks to check the progress of running RAID config."""
|
||||
node = task.node
|
||||
node_uuid = task.node.uuid
|
||||
if not isinstance(task.driver.raid, IRMCRAID):
|
||||
continue
|
||||
return
|
||||
if task.node.target_raid_config is None:
|
||||
continue
|
||||
return
|
||||
task.upgrade_lock()
|
||||
if node.provision_state != states.CLEANWAIT:
|
||||
continue
|
||||
return
|
||||
# Avoid hitting clean_callback_timeout expiration
|
||||
node.touch_provisioning()
|
||||
|
||||
raid_config = node.raid_config
|
||||
|
||||
try:
|
||||
report = irmc_common.get_irmc_report(node)
|
||||
except client.scci.SCCIInvalidInputError:
|
||||
raid_config.update({'fgi_status': RAID_FAILED})
|
||||
raid_common.update_raid_info(node, raid_config)
|
||||
self._set_clean_failed(task, RAID_FAILED)
|
||||
continue
|
||||
return
|
||||
except client.scci.SCCIClientError:
|
||||
raid_config.update({'fgi_status': RAID_FAILED})
|
||||
raid_common.update_raid_info(node, raid_config)
|
||||
self._set_clean_failed(task, RAID_FAILED)
|
||||
continue
|
||||
return
|
||||
|
||||
fgi_status_dict = _get_fgi_status(report, node_uuid)
|
||||
# Note(trungnv): Allow to check until RAID mechanism to be
|
||||
# completed with RAID information in report.
|
||||
if fgi_status_dict == 'completing':
|
||||
continue
|
||||
return
|
||||
if not fgi_status_dict:
|
||||
raid_config.update({'fgi_status': RAID_FAILED})
|
||||
raid_common.update_raid_info(node, raid_config)
|
||||
self._set_clean_failed(task, fgi_status_dict)
|
||||
continue
|
||||
return
|
||||
if all(fgi_status == 'Idle' for fgi_status in
|
||||
fgi_status_dict.values()):
|
||||
raid_config.update({'fgi_status': RAID_COMPLETED})
|
||||
@ -496,15 +487,6 @@ class IRMCRAID(base.RAIDInterface):
|
||||
{'node': node_uuid, 'fgi': RAID_COMPLETED})
|
||||
self._resume_cleaning(task)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During query_raid_config_job_status, node '
|
||||
'%(node)s was not found raid_config and presumed '
|
||||
'deleted by another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During query_raid_config_job_status, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
|
||||
def _set_clean_failed(self, task, fgi_status_dict):
|
||||
LOG.error('RAID configuration task failed for node %(node)s. '
|
||||
'with FGI status is: %(fgi)s. ',
|
||||
|
@ -13,7 +13,6 @@
|
||||
Base PXE Interface Methods
|
||||
"""
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -24,7 +23,7 @@ from ironic.common import exception
|
||||
from ironic.common.i18n import _
|
||||
from ironic.common import pxe_utils
|
||||
from ironic.common import states
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.drivers.modules import boot_mode_utils
|
||||
from ironic.drivers.modules import deploy_utils
|
||||
@ -452,29 +451,23 @@ class PXEBaseMixin(object):
|
||||
states.RESCUEWAIT}
|
||||
|
||||
@METRICS.timer('PXEBaseMixin._check_boot_timeouts')
|
||||
@periodics.periodic(spacing=CONF.pxe.boot_retry_check_interval,
|
||||
enabled=bool(CONF.pxe.boot_retry_timeout))
|
||||
def _check_boot_timeouts(self, manager, context):
|
||||
@periodics.node_periodic(
|
||||
purpose='checking PXE boot status',
|
||||
spacing=CONF.pxe.boot_retry_check_interval,
|
||||
enabled=bool(CONF.pxe.boot_retry_timeout),
|
||||
filters={'provision_state_in': _RETRY_ALLOWED_STATES,
|
||||
'reserved': False,
|
||||
'maintenance': False,
|
||||
'provisioned_before': CONF.pxe.boot_retry_timeout},
|
||||
)
|
||||
def _check_boot_timeouts(self, task, manager, context):
|
||||
"""Periodically checks whether boot has timed out and retry it.
|
||||
|
||||
:param task: a task instance.
|
||||
:param manager: conductor manager.
|
||||
:param context: request context.
|
||||
"""
|
||||
filters = {'provision_state_in': self._RETRY_ALLOWED_STATES,
|
||||
'reserved': False,
|
||||
'maintenance': False,
|
||||
'provisioned_before': CONF.pxe.boot_retry_timeout}
|
||||
node_iter = manager.iter_nodes(filters=filters)
|
||||
|
||||
for node_uuid, driver, conductor_group in node_iter:
|
||||
try:
|
||||
lock_purpose = 'checking PXE boot status'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
shared=True,
|
||||
purpose=lock_purpose) as task:
|
||||
self._check_boot_status(task)
|
||||
except (exception.NodeLocked, exception.NodeNotFound):
|
||||
continue
|
||||
|
||||
def _check_boot_status(self, task):
|
||||
if not isinstance(task.driver.boot, PXEBaseMixin):
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
import collections
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log
|
||||
from oslo_utils import importutils
|
||||
@ -29,6 +28,7 @@ from ironic.common.i18n import _
|
||||
from ironic.common import indicator_states
|
||||
from ironic.common import states
|
||||
from ironic.common import utils
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
@ -853,37 +853,18 @@ class RedfishManagement(base.ManagementInterface):
|
||||
node.save()
|
||||
|
||||
@METRICS.timer('RedfishManagement._query_firmware_update_failed')
|
||||
@periodics.periodic(
|
||||
@periodics.node_periodic(
|
||||
purpose='checking if async firmware update failed',
|
||||
spacing=CONF.redfish.firmware_update_fail_interval,
|
||||
enabled=CONF.redfish.firmware_update_fail_interval > 0)
|
||||
def _query_firmware_update_failed(self, manager, context):
|
||||
"""Periodic job to check for failed firmware updates."""
|
||||
|
||||
filters={'reserved': False, 'provision_state': states.CLEANFAIL,
|
||||
'maintenance': True}
|
||||
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
firmware_updates = driver_internal_info.get(
|
||||
'firmware_updates')
|
||||
# NOTE(TheJulia): If we don't have a entry upfront, we can
|
||||
# safely skip past the node as we know work here is not
|
||||
# required, otherwise minimizing the number of potential
|
||||
# nodes to visit.
|
||||
if not firmware_updates:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async firmware update failed.'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.management,
|
||||
RedfishManagement):
|
||||
continue
|
||||
'maintenance': True},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: n.driver_internal_info.get('firmware_updates'),
|
||||
)
|
||||
def _query_firmware_update_failed(self, task, manager, context):
|
||||
"""Periodic job to check for failed firmware updates."""
|
||||
if not isinstance(task.driver.management, RedfishManagement):
|
||||
return
|
||||
|
||||
node = task.node
|
||||
|
||||
@ -898,56 +879,21 @@ class RedfishManagement(base.ManagementInterface):
|
||||
task.upgrade_lock()
|
||||
self._clear_firmware_updates(node)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_firmware_update_failed, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_firmware_update_failed, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
|
||||
@METRICS.timer('RedfishManagement._query_firmware_update_status')
|
||||
@periodics.periodic(
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async firmware update tasks',
|
||||
spacing=CONF.redfish.firmware_update_status_interval,
|
||||
enabled=CONF.redfish.firmware_update_status_interval > 0)
|
||||
def _query_firmware_update_status(self, manager, context):
|
||||
filters={'reserved': False, 'provision_state': states.CLEANWAIT},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: n.driver_internal_info.get('firmware_updates'),
|
||||
)
|
||||
def _query_firmware_update_status(self, task, manager, context):
|
||||
"""Periodic job to check firmware update tasks."""
|
||||
|
||||
filters = {'reserved': False, 'provision_state': states.CLEANWAIT}
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
firmware_updates = driver_internal_info.get(
|
||||
'firmware_updates')
|
||||
# NOTE(TheJulia): Check and skip upfront before creating a
|
||||
# task so we don't generate additional tasks and db queries
|
||||
# for every node in CLEANWAIT which is not locked.
|
||||
if not firmware_updates:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async firmware update tasks.'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.management,
|
||||
RedfishManagement):
|
||||
continue
|
||||
if not isinstance(task.driver.management, RedfishManagement):
|
||||
return
|
||||
|
||||
self._check_node_firmware_update(task)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_firmware_update_status, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_firmware_update_status, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
|
||||
@METRICS.timer('RedfishManagement._check_node_firmware_update')
|
||||
def _check_node_firmware_update(self, task):
|
||||
"""Check the progress of running firmware update on a node."""
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
import math
|
||||
|
||||
from futurist import periodics
|
||||
from ironic_lib import metrics_utils
|
||||
from oslo_log import log
|
||||
from oslo_utils import importutils
|
||||
@ -25,7 +24,7 @@ from ironic.common import exception
|
||||
from ironic.common.i18n import _
|
||||
from ironic.common import raid
|
||||
from ironic.common import states
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
from ironic.drivers import base
|
||||
@ -1014,36 +1013,18 @@ class RedfishRAID(base.RAIDInterface):
|
||||
node.save()
|
||||
|
||||
@METRICS.timer('RedfishRAID._query_raid_config_failed')
|
||||
@periodics.periodic(
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async RAID config failed',
|
||||
spacing=CONF.redfish.raid_config_fail_interval,
|
||||
enabled=CONF.redfish.raid_config_fail_interval > 0)
|
||||
def _query_raid_config_failed(self, manager, context):
|
||||
"""Periodic job to check for failed RAID configuration."""
|
||||
|
||||
filters={'reserved': False, 'provision_state': states.CLEANFAIL,
|
||||
'maintenance': True}
|
||||
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
raid_configs = driver_internal_info.get(
|
||||
'raid_configs')
|
||||
# NOTE(TheJulia): Evaluate the presence of raid configuration
|
||||
# activity before pulling the task, so we don't needlessly
|
||||
# create database queries with tasks which would be skipped
|
||||
# anyhow.
|
||||
if not raid_configs:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async RAID config failed.'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
'maintenance': True},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: n.driver_internal_info.get('raid_configs'),
|
||||
)
|
||||
def _query_raid_config_failed(self, task, manager, context):
|
||||
"""Periodic job to check for failed RAID configuration."""
|
||||
if not isinstance(task.driver.raid, RedfishRAID):
|
||||
continue
|
||||
return
|
||||
|
||||
node = task.node
|
||||
|
||||
@ -1058,55 +1039,21 @@ class RedfishRAID(base.RAIDInterface):
|
||||
task.upgrade_lock()
|
||||
self._clear_raid_configs(node)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_raid_config_failed, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_raid_config_failed, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
|
||||
@METRICS.timer('RedfishRAID._query_raid_config_status')
|
||||
@periodics.periodic(
|
||||
@periodics.node_periodic(
|
||||
purpose='checking async RAID config tasks',
|
||||
spacing=CONF.redfish.raid_config_status_interval,
|
||||
enabled=CONF.redfish.raid_config_status_interval > 0)
|
||||
def _query_raid_config_status(self, manager, context):
|
||||
filters={'reserved': False, 'provision_state': states.CLEANWAIT},
|
||||
predicate_extra_fields=['driver_internal_info'],
|
||||
predicate=lambda n: n.driver_internal_info.get('raid_configs'),
|
||||
)
|
||||
def _query_raid_config_status(self, task, manager, context):
|
||||
"""Periodic job to check RAID config tasks."""
|
||||
|
||||
filters = {'reserved': False, 'provision_state': states.CLEANWAIT}
|
||||
fields = ['driver_internal_info']
|
||||
|
||||
node_list = manager.iter_nodes(fields=fields, filters=filters)
|
||||
for (node_uuid, driver, conductor_group,
|
||||
driver_internal_info) in node_list:
|
||||
try:
|
||||
raid_configs = driver_internal_info.get(
|
||||
'raid_configs')
|
||||
# NOTE(TheJulia): Skip to next record if we do not
|
||||
# have raid configuraiton tasks, so we don't pull tasks
|
||||
# for every unrelated node in CLEANWAIT.
|
||||
if not raid_configs:
|
||||
continue
|
||||
|
||||
lock_purpose = 'checking async RAID config tasks.'
|
||||
with task_manager.acquire(context, node_uuid,
|
||||
purpose=lock_purpose,
|
||||
shared=True) as task:
|
||||
if not isinstance(task.driver.raid, RedfishRAID):
|
||||
continue
|
||||
return
|
||||
|
||||
self._check_node_raid_config(task)
|
||||
|
||||
except exception.NodeNotFound:
|
||||
LOG.info('During _query_raid_config_status, node '
|
||||
'%(node)s was not found and presumed deleted by '
|
||||
'another process.', {'node': node_uuid})
|
||||
except exception.NodeLocked:
|
||||
LOG.info('During _query_raid_config_status, node '
|
||||
'%(node)s was already locked by another process. '
|
||||
'Skip.', {'node': node_uuid})
|
||||
|
||||
def _get_error_messages(self, response):
|
||||
try:
|
||||
body = response.json()
|
||||
|
@ -5567,7 +5567,7 @@ class ManagerPowerRecoveryTestCase(mgr_utils.CommonMixIn,
|
||||
self.task.driver = self.driver
|
||||
self.filters = {'maintenance': True,
|
||||
'fault': 'power failure'}
|
||||
self.columns = ['uuid', 'driver', 'conductor_group', 'id']
|
||||
self.columns = ['uuid', 'driver', 'conductor_group']
|
||||
|
||||
def test_node_not_mapped(self, get_nodeinfo_mock,
|
||||
mapped_mock, acquire_mock):
|
||||
@ -6152,7 +6152,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
self.filters = {'reserved': False,
|
||||
'maintenance': False,
|
||||
'provision_state': states.ACTIVE}
|
||||
self.columns = ['uuid', 'driver', 'conductor_group', 'id',
|
||||
self.columns = ['uuid', 'driver', 'conductor_group',
|
||||
'conductor_affinity']
|
||||
|
||||
def _assert_get_nodeinfo_args(self, get_nodeinfo_mock):
|
||||
@ -6200,7 +6200,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
self.service, self.node.uuid, self.node.driver,
|
||||
self.node.conductor_group)
|
||||
acquire_mock.assert_called_once_with(self.context, self.node.uuid,
|
||||
purpose=mock.ANY)
|
||||
purpose=mock.ANY, shared=False)
|
||||
# assert spawn_after has been called
|
||||
self.task.spawn_after.assert_called_once_with(
|
||||
self.service._spawn_worker,
|
||||
@ -6234,7 +6234,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
# assert acquire() gets called 2 times only instead of 3. When
|
||||
# NoFreeConductorWorker is raised the loop should be broken
|
||||
expected = [mock.call(self.context, self.node.uuid,
|
||||
purpose=mock.ANY)] * 2
|
||||
purpose=mock.ANY, shared=False)] * 2
|
||||
self.assertEqual(expected, acquire_mock.call_args_list)
|
||||
|
||||
# assert spawn_after has been called twice
|
||||
@ -6264,7 +6264,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
|
||||
# assert acquire() gets called 3 times
|
||||
expected = [mock.call(self.context, self.node.uuid,
|
||||
purpose=mock.ANY)] * 3
|
||||
purpose=mock.ANY, shared=False)] * 3
|
||||
self.assertEqual(expected, acquire_mock.call_args_list)
|
||||
|
||||
# assert spawn_after has been called only 2 times
|
||||
@ -6296,7 +6296,7 @@ class ManagerSyncLocalStateTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
|
||||
# assert acquire() gets called only once because of the worker limit
|
||||
acquire_mock.assert_called_once_with(self.context, self.node.uuid,
|
||||
purpose=mock.ANY)
|
||||
purpose=mock.ANY, shared=False)
|
||||
|
||||
# assert spawn_after has been called
|
||||
self.task.spawn_after.assert_called_once_with(
|
||||
|
135
ironic/tests/unit/conductor/test_periodics.py
Normal file
135
ironic/tests/unit/conductor/test_periodics.py
Normal file
@ -0,0 +1,135 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from ironic.common import context as ironic_context
|
||||
from ironic.conductor import base_manager
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.tests.unit.db import base as db_base
|
||||
from ironic.tests.unit.objects import utils as obj_utils
|
||||
|
||||
|
||||
_FILTERS = {'maintenance': False}
|
||||
|
||||
|
||||
class PeriodicTestService(base_manager.BaseConductorManager):
|
||||
|
||||
def __init__(self, test):
|
||||
self.test = test
|
||||
self.nodes = []
|
||||
|
||||
@periodics.node_periodic(purpose="herding cats", spacing=42)
|
||||
def simple(self, task, context):
|
||||
self.test.assertIsInstance(context, ironic_context.RequestContext)
|
||||
self.test.assertTrue(task.shared)
|
||||
self.nodes.append(task.node.uuid)
|
||||
|
||||
@periodics.node_periodic(purpose="herding cats", spacing=42,
|
||||
shared_task=False, filters=_FILTERS)
|
||||
def exclusive(self, task, context):
|
||||
self.test.assertIsInstance(context, ironic_context.RequestContext)
|
||||
self.test.assertFalse(task.shared)
|
||||
self.nodes.append(task.node.uuid)
|
||||
|
||||
@periodics.node_periodic(purpose="never running", spacing=42,
|
||||
predicate=lambda n: n.cat != 'meow',
|
||||
predicate_extra_fields=['cat'])
|
||||
def never_run(self, task, context):
|
||||
self.test.fail(f"Was not supposed to run, ran with {task.node}")
|
||||
|
||||
@periodics.node_periodic(purpose="herding cats", spacing=42, limit=3)
|
||||
def limit(self, task, context):
|
||||
self.test.assertIsInstance(context, ironic_context.RequestContext)
|
||||
self.test.assertTrue(task.shared)
|
||||
self.nodes.append(task.node.uuid)
|
||||
if task.node.uuid == 'stop':
|
||||
raise periodics.Stop()
|
||||
|
||||
|
||||
@mock.patch.object(PeriodicTestService, 'iter_nodes', autospec=True)
|
||||
class NodePeriodicTestCase(db_base.DbTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.service = PeriodicTestService(self)
|
||||
self.ctx = ironic_context.get_admin_context()
|
||||
self.uuid = uuidutils.generate_uuid()
|
||||
self.node = obj_utils.create_test_node(self.context, uuid=self.uuid)
|
||||
|
||||
def test_simple(self, mock_iter_nodes):
|
||||
mock_iter_nodes.return_value = iter([
|
||||
(uuidutils.generate_uuid(), 'driver1', ''),
|
||||
(self.uuid, 'driver2', 'group'),
|
||||
])
|
||||
|
||||
self.service.simple(self.ctx)
|
||||
|
||||
mock_iter_nodes.assert_called_once_with(self.service,
|
||||
filters=None, fields=())
|
||||
self.assertEqual([self.uuid], self.service.nodes)
|
||||
|
||||
def test_exclusive(self, mock_iter_nodes):
|
||||
mock_iter_nodes.return_value = iter([
|
||||
(uuidutils.generate_uuid(), 'driver1', ''),
|
||||
(self.uuid, 'driver2', 'group'),
|
||||
])
|
||||
|
||||
self.service.exclusive(self.ctx)
|
||||
|
||||
mock_iter_nodes.assert_called_once_with(self.service,
|
||||
filters=_FILTERS,
|
||||
fields=())
|
||||
self.assertEqual([self.uuid], self.service.nodes)
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test_never_run(self, mock_acquire, mock_iter_nodes):
|
||||
mock_iter_nodes.return_value = iter([
|
||||
(self.uuid, 'driver2', 'group', 'meow'),
|
||||
])
|
||||
|
||||
self.service.never_run(self.ctx)
|
||||
|
||||
mock_iter_nodes.assert_called_once_with(self.service,
|
||||
filters=None,
|
||||
fields=['cat'])
|
||||
self.assertEqual([], self.service.nodes)
|
||||
mock_acquire.assert_not_called()
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test_limit(self, mock_acquire, mock_iter_nodes):
|
||||
mock_iter_nodes.return_value = iter([
|
||||
(self.uuid, 'driver1', ''),
|
||||
] * 10)
|
||||
mock_acquire.return_value.__enter__.return_value.node.uuid = self.uuid
|
||||
|
||||
self.service.limit(self.ctx)
|
||||
|
||||
mock_iter_nodes.assert_called_once_with(self.service,
|
||||
filters=None, fields=())
|
||||
self.assertEqual([self.uuid] * 3, self.service.nodes)
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test_stop(self, mock_acquire, mock_iter_nodes):
|
||||
mock_iter_nodes.return_value = iter([
|
||||
(self.uuid, 'driver1', ''),
|
||||
] * 10)
|
||||
mock_acquire.return_value.__enter__.return_value.node.uuid = 'stop'
|
||||
|
||||
self.service.limit(self.ctx)
|
||||
|
||||
mock_iter_nodes.assert_called_once_with(self.service,
|
||||
filters=None, fields=())
|
||||
self.assertEqual(['stop'], self.service.nodes)
|
@ -28,6 +28,7 @@ from oslo_utils import importutils
|
||||
import ironic.common.boot_devices
|
||||
from ironic.common import exception
|
||||
from ironic.common import molds
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.drivers.modules import deploy_utils
|
||||
@ -1021,7 +1022,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest):
|
||||
|
||||
self.management._check_import_configuration_task.assert_not_called()
|
||||
|
||||
@mock.patch.object(drac_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_import_configuration_status_node_notfound(
|
||||
self, mock_acquire, mock_log):
|
||||
@ -1044,7 +1045,7 @@ class DracRedfishManagementTestCase(test_utils.BaseDracTest):
|
||||
self.management._check_import_configuration_task.assert_not_called()
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
@mock.patch.object(drac_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_import_configuration_status_node_locked(
|
||||
self, mock_acquire, mock_log):
|
||||
|
@ -25,6 +25,7 @@ import tenacity
|
||||
|
||||
from ironic.common import exception
|
||||
from ironic.common import states
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.conf import CONF
|
||||
@ -2592,7 +2593,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest):
|
||||
|
||||
self.raid._check_raid_tasks_status.assert_not_called()
|
||||
|
||||
@mock.patch.object(drac_raid.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_raid_tasks_status_node_notfound(
|
||||
self, mock_acquire, mock_log):
|
||||
@ -2610,7 +2611,7 @@ class DracRedfishRAIDTestCase(test_utils.BaseDracTest):
|
||||
self.raid._check_raid_tasks_status.assert_not_called()
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
@mock.patch.object(drac_raid.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_raid_tasks_status_node_locked(
|
||||
self, mock_acquire, mock_log):
|
||||
|
@ -49,6 +49,8 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest):
|
||||
{
|
||||
'key': 'value'
|
||||
}]}
|
||||
self.node.raid_config = self.raid_config
|
||||
self.node.target_raid_config = self.target_raid_config
|
||||
|
||||
@mock.patch.object(irmc_common, 'get_irmc_report', autospec=True)
|
||||
def test__query_raid_config_fgi_status_without_node(
|
||||
@ -286,6 +288,7 @@ class iRMCPeriodicTaskTestCase(test_common.BaseIRMCTest):
|
||||
mock_manager = mock.Mock()
|
||||
raid_config = self.raid_config
|
||||
raid_config_2 = self.raid_config.copy()
|
||||
self.node_2.raid_config = raid_config_2
|
||||
fgi_status_dict = {}
|
||||
fgi_mock.side_effect = [{}, {'0': 'Idle', '1': 'Idle'}]
|
||||
node_list = [(self.node_2.uuid, 'fake-hardware', '', raid_config_2),
|
||||
|
@ -25,6 +25,7 @@ from ironic.common import components
|
||||
from ironic.common import exception
|
||||
from ironic.common import indicator_states
|
||||
from ironic.common import states
|
||||
from ironic.conductor import periodics
|
||||
from ironic.conductor import task_manager
|
||||
from ironic.conductor import utils as manager_utils
|
||||
from ironic.drivers.modules import deploy_utils
|
||||
@ -905,7 +906,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
|
||||
|
||||
management._clear_firmware_updates.assert_not_called()
|
||||
|
||||
@mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_firmware_update_failed_node_notfound(self, mock_acquire,
|
||||
mock_log):
|
||||
@ -928,7 +929,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
|
||||
management._clear_firmware_updates.assert_not_called()
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
@mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_firmware_update_failed_node_locked(
|
||||
self, mock_acquire, mock_log):
|
||||
@ -1017,7 +1018,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
|
||||
|
||||
management._check_node_firmware_update.assert_not_called()
|
||||
|
||||
@mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_firmware_update_status_node_notfound(self, mock_acquire,
|
||||
mock_log):
|
||||
@ -1040,7 +1041,7 @@ class RedfishManagementTestCase(db_base.DbTestCase):
|
||||
management._check_node_firmware_update.assert_not_called()
|
||||
self.assertTrue(mock_log.called)
|
||||
|
||||
@mock.patch.object(redfish_mgmt.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(periodics.LOG, 'info', autospec=True)
|
||||
@mock.patch.object(task_manager, 'acquire', autospec=True)
|
||||
def test__query_firmware_update_status_node_locked(
|
||||
self, mock_acquire, mock_log):
|
||||
|
Loading…
Reference in New Issue
Block a user