Refactor AgentBase.heartbeat and process_next_step

These methods are reaching the complexity limits, split them.
Also update messages to be more human-friendly.

The complexity limit is returned back to 18 as before hacking 3.0.

Change-Id: I918af9508f3c99321625f6df96b1bf12ead26243
Story: #2006963
This commit is contained in:
Dmitry Tantsur 2020-04-07 16:59:12 +02:00
parent 45d9390187
commit d150aa287f
3 changed files with 158 additions and 132 deletions

View File

@ -459,6 +459,85 @@ class HeartbeatMixin(object):
'maintenance mode; not taking any action.',
{'node': node.uuid})
def _heartbeat_deploy_wait(self, task):
msg = _('Failed checking if deploy is done')
node = task.node
try:
# NOTE(mgoddard): Only handle heartbeats during DEPLOYWAIT if we
# are currently in the core deploy.deploy step. Other deploy steps
# may cause the agent to boot, but we should not trigger deployment
# at that point if the driver is polling for completion of a step.
if self.in_core_deploy_step(task):
if not self.deploy_has_started(task):
msg = _('Node failed to deploy')
self.continue_deploy(task)
elif self.deploy_is_done(task):
msg = _('Node failed to move to active state')
self.reboot_to_instance(task)
else:
node.touch_provisioning()
else:
# The exceptions from RPC are not possible as we using cast
# here
# Check if the driver is polling for completion of a step,
# via the 'deployment_polling' flag.
polling = node.driver_internal_info.get(
'deployment_polling', False)
if not polling:
manager_utils.notify_conductor_resume_deploy(task)
node.touch_provisioning()
except Exception as e:
last_error = _('%(msg)s. Error: %(exc)s') % {'msg': msg, 'exc': e}
LOG.exception('Asynchronous exception for node %(node)s: %(err)s',
{'node': task.node.uuid, 'err': last_error})
# Do not call the error handler is the node is already DEPLOYFAIL
if node.provision_state in (states.DEPLOYING, states.DEPLOYWAIT):
deploy_utils.set_failed_state(
task, last_error, collect_logs=bool(self._client))
def _heartbeat_clean_wait(self, task):
node = task.node
msg = _('Failed checking if cleaning is done')
try:
node.touch_provisioning()
if not node.clean_step:
LOG.debug('Node %s just booted to start cleaning.',
node.uuid)
msg = _('Node failed to start the first cleaning step')
# First, cache the clean steps
self.refresh_clean_steps(task)
# Then set/verify node clean steps and start cleaning
conductor_steps.set_node_cleaning_steps(task)
# The exceptions from RPC are not possible as we using cast
# here
manager_utils.notify_conductor_resume_clean(task)
else:
msg = _('Node failed to check cleaning progress')
# Check if the driver is polling for completion of a step,
# via the 'cleaning_polling' flag.
polling = node.driver_internal_info.get(
'cleaning_polling', False)
if not polling:
self.continue_cleaning(task)
except Exception as e:
last_error = _('%(msg)s. Error: %(exc)s') % {'msg': msg, 'exc': e}
LOG.exception('Asynchronous exception for node %(node)s: %(err)s',
{'node': task.node.uuid, 'err': last_error})
if node.provision_state in (states.CLEANING, states.CLEANWAIT):
manager_utils.cleaning_error_handler(task, last_error)
def _heartbeat_rescue_wait(self, task):
msg = _('Node failed to perform rescue operation')
try:
self._finalize_rescue(task)
except Exception as e:
last_error = _('%(msg)s. Error: %(exc)s') % {'msg': msg, 'exc': e}
LOG.exception('Asynchronous exception for node %(node)s: %(err)s',
{'node': task.node.uuid, 'err': last_error})
if task.node.provision_state in (states.RESCUING,
states.RESCUEWAIT):
manager_utils.rescuing_error_handler(task, last_error)
@METRICS.timer('HeartbeatMixin.heartbeat')
def heartbeat(self, task, callback_url, agent_version):
"""Process a heartbeat.
@ -508,71 +587,12 @@ class HeartbeatMixin(object):
if node.maintenance:
return self._heartbeat_in_maintenance(task)
# Async call backs don't set error state on their own
# TODO(jimrollenhagen) improve error messages here
msg = _('Failed checking if deploy is done.')
try:
# NOTE(mgoddard): Only handle heartbeats during DEPLOYWAIT if we
# are currently in the core deploy.deploy step. Other deploy steps
# may cause the agent to boot, but we should not trigger deployment
# at that point if the driver is polling for completion of a step.
if node.provision_state == states.DEPLOYWAIT:
if self.in_core_deploy_step(task):
if not self.deploy_has_started(task):
msg = _('Node failed to deploy.')
self.continue_deploy(task)
elif self.deploy_is_done(task):
msg = _('Node failed to move to active state.')
self.reboot_to_instance(task)
else:
node.touch_provisioning()
else:
# The exceptions from RPC are not possible as we using cast
# here
# Check if the driver is polling for completion of a step,
# via the 'deployment_polling' flag.
polling = node.driver_internal_info.get(
'deployment_polling', False)
if not polling:
manager_utils.notify_conductor_resume_deploy(task)
node.touch_provisioning()
self._heartbeat_deploy_wait(task)
elif node.provision_state == states.CLEANWAIT:
node.touch_provisioning()
if not node.clean_step:
LOG.debug('Node %s just booted to start cleaning.',
node.uuid)
msg = _('Node failed to start the first cleaning step.')
# First, cache the clean steps
self.refresh_clean_steps(task)
# Then set/verify node clean steps and start cleaning
conductor_steps.set_node_cleaning_steps(task)
# The exceptions from RPC are not possible as we using cast
# here
manager_utils.notify_conductor_resume_clean(task)
else:
msg = _('Node failed to check cleaning progress.')
# Check if the driver is polling for completion of a step,
# via the 'cleaning_polling' flag.
polling = node.driver_internal_info.get(
'cleaning_polling', False)
if not polling:
self.continue_cleaning(task)
elif (node.provision_state == states.RESCUEWAIT):
msg = _('Node failed to perform rescue operation.')
self._finalize_rescue(task)
except Exception as e:
err_info = {'msg': msg, 'e': e}
last_error = _('Asynchronous exception: %(msg)s '
'Exception: %(e)s for node') % err_info
errmsg = last_error + ' %(node)s'
LOG.exception(errmsg, {'node': node.uuid})
if node.provision_state in (states.CLEANING, states.CLEANWAIT):
manager_utils.cleaning_error_handler(task, last_error)
elif node.provision_state in (states.DEPLOYING, states.DEPLOYWAIT):
deploy_utils.set_failed_state(
task, last_error, collect_logs=bool(self._client))
elif node.provision_state in (states.RESCUING, states.RESCUEWAIT):
manager_utils.rescuing_error_handler(task, last_error)
self._heartbeat_clean_wait(task)
elif node.provision_state == states.RESCUEWAIT:
self._heartbeat_rescue_wait(task)
def _finalize_rescue(self, task):
"""Call ramdisk to prepare rescue mode and verify result.
@ -704,72 +724,12 @@ class AgentDeployMixin(HeartbeatMixin):
"""
return execute_step(task, step, 'clean')
@METRICS.timer('AgentDeployMixin.process_next_step')
def process_next_step(self, task, step_type, **kwargs):
"""Start the next clean/deploy step if the previous one is complete.
In order to avoid errors and make agent upgrades painless, the agent
compares the version of all hardware managers at the start of the
process (the agent's get_clean|deploy_steps() call) and before
executing each step. If the version has changed between steps,
the agent is unable to tell if an ordering change will cause an issue
so it returns CLEAN_VERSION_MISMATCH. For automated cleaning, we
restart the entire cleaning cycle. For manual cleaning or deploy,
we don't.
Additionally, if a step includes the reboot_requested property
set to True, this method will coordinate the reboot once the step is
completed.
"""
assert step_type in ('clean', 'deploy')
def _process_version_mismatch(self, task, step_type):
node = task.node
# For manual clean, the target provision state is MANAGEABLE, whereas
# for automated cleaning, it is (the default) AVAILABLE.
manual_clean = node.target_provision_state == states.MANAGEABLE
agent_commands = self._client.get_commands_status(task.node)
if not agent_commands:
field = ('cleaning_reboot' if step_type == 'clean'
else 'deployment_reboot')
if task.node.driver_internal_info.get(field):
# Node finished a cleaning step that requested a reboot, and
# this is the first heartbeat after booting. Continue cleaning.
info = task.node.driver_internal_info
info.pop(field, None)
task.node.driver_internal_info = info
task.node.save()
manager_utils.notify_conductor_resume_operation(task,
step_type)
return
else:
# Agent has no commands whatsoever
return
current_step = (node.clean_step if step_type == 'clean'
else node.deploy_step)
command = _get_completed_command(task, agent_commands, step_type)
LOG.debug('%(type)s command status for node %(node)s on step %(step)s:'
' %(command)s', {'node': node.uuid,
'step': current_step,
'command': command,
'type': step_type})
if not command:
# Agent command in progress
return
if command.get('command_status') == 'FAILED':
msg = (_('Agent returned error for %(type)s step %(step)s on node '
'%(node)s : %(err)s.') %
{'node': node.uuid,
'err': command.get('command_error'),
'step': current_step,
'type': step_type})
LOG.error(msg)
return manager_utils.cleaning_error_handler(task, msg)
elif command.get('command_status') in ('CLEAN_VERSION_MISMATCH',
'DEPLOY_VERSION_MISMATCH'):
# Cache the new clean steps (and 'hardware_manager_version')
try:
self.refresh_steps(task, step_type)
@ -825,6 +785,70 @@ class AgentDeployMixin(HeartbeatMixin):
manager_utils.notify_conductor_resume_operation(task, step_type)
@METRICS.timer('AgentDeployMixin.process_next_step')
def process_next_step(self, task, step_type, **kwargs):
"""Start the next clean/deploy step if the previous one is complete.
In order to avoid errors and make agent upgrades painless, the agent
compares the version of all hardware managers at the start of the
process (the agent's get_clean|deploy_steps() call) and before
executing each step. If the version has changed between steps,
the agent is unable to tell if an ordering change will cause an issue
so it returns CLEAN_VERSION_MISMATCH. For automated cleaning, we
restart the entire cleaning cycle. For manual cleaning or deploy,
we don't.
Additionally, if a step includes the reboot_requested property
set to True, this method will coordinate the reboot once the step is
completed.
"""
assert step_type in ('clean', 'deploy')
node = task.node
agent_commands = self._client.get_commands_status(task.node)
if not agent_commands:
field = ('cleaning_reboot' if step_type == 'clean'
else 'deployment_reboot')
if task.node.driver_internal_info.get(field):
# Node finished a cleaning step that requested a reboot, and
# this is the first heartbeat after booting. Continue cleaning.
info = task.node.driver_internal_info
info.pop(field, None)
task.node.driver_internal_info = info
task.node.save()
manager_utils.notify_conductor_resume_operation(task,
step_type)
return
else:
# Agent has no commands whatsoever
return
current_step = (node.clean_step if step_type == 'clean'
else node.deploy_step)
command = _get_completed_command(task, agent_commands, step_type)
LOG.debug('%(type)s command status for node %(node)s on step %(step)s:'
' %(command)s', {'node': node.uuid,
'step': current_step,
'command': command,
'type': step_type})
if not command:
# Agent command in progress
return
if command.get('command_status') == 'FAILED':
msg = (_('Agent returned error for %(type)s step %(step)s on node '
'%(node)s : %(err)s.') %
{'node': node.uuid,
'err': command.get('command_error'),
'step': current_step,
'type': step_type})
LOG.error(msg)
return manager_utils.cleaning_error_handler(task, msg)
elif command.get('command_status') in ('CLEAN_VERSION_MISMATCH',
'DEPLOY_VERSION_MISMATCH'):
self._process_version_mismatch(task, step_type)
elif command.get('command_status') == 'SUCCEEDED':
step_hook = _get_post_step_hook(node, step_type)
if step_hook is not None:

View File

@ -384,9 +384,10 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
failed_mock.assert_called_once_with(
task, mock.ANY, collect_logs=True)
log_mock.assert_called_once_with(
'Asynchronous exception: Failed checking if deploy is done. '
'Exception: LlamaException for node %(node)s',
{'node': task.node.uuid})
'Asynchronous exception for node %(node)s: %(err)s',
{'err': 'Failed checking if deploy is done. '
'Error: LlamaException',
'node': task.node.uuid})
@mock.patch.object(agent_base.HeartbeatMixin,
'in_core_deploy_step', autospec=True)
@ -420,9 +421,10 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
# deploy_utils.set_failed_state anymore
self.assertFalse(failed_mock.called)
log_mock.assert_called_once_with(
'Asynchronous exception: Failed checking if deploy is done. '
'Exception: LlamaException for node %(node)s',
{'node': task.node.uuid})
'Asynchronous exception for node %(node)s: %(err)s',
{'err': 'Failed checking if deploy is done. '
'Error: LlamaException',
'node': task.node.uuid})
@mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
@mock.patch.object(agent_base.HeartbeatMixin,
@ -574,8 +576,8 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
mock_finalize.assert_called_once_with(mock.ANY, task)
mock_rescue_err_handler.assert_called_once_with(
task, 'Asynchronous exception: Node failed to perform '
'rescue operation. Exception: some failure for node')
task, 'Node failed to perform '
'rescue operation. Error: some failure')
@mock.patch.object(agent_base.HeartbeatMixin,
'in_core_deploy_step', autospec=True)

View File

@ -116,7 +116,7 @@ filename = *.py,app.wsgi
exclude = .venv,.git,.tox,dist,doc,*lib/python*,*egg,build
import-order-style = pep8
application-import-names = ironic
max-complexity=20
max-complexity=18
# [H106] Don't put vim configuration in source files.
# [H203] Use assertIs(Not)None to check for None.
# [H204] Use assert(Not)Equal to check for equality.