agent: poll long-running commands till completion

Currently for install_bootloader we use wait=True with a longer
timeout. As a more robust alternative, poll the agent until
the command completes. This avoids trying to guess how long
the command will actually take.

Change-Id: I62e9086441fa2b164aee42f7489d12aed4076f49
Story: #2006963
This commit is contained in:
Dmitry Tantsur 2020-06-07 13:05:13 +02:00
parent a2ad31ddef
commit 7828fe8b64
5 changed files with 157 additions and 56 deletions

View File

@ -664,6 +664,10 @@ class AgentConnectionFailed(IronicException):
_msg_fmt = _("Connection to agent failed: %(reason)s") _msg_fmt = _("Connection to agent failed: %(reason)s")
class AgentCommandTimeout(IronicException):
_msg_fmt = _("Timeout executing command %(command)s on node %(node)s")
class NodeProtected(HTTPForbidden): class NodeProtected(HTTPForbidden):
_msg_fmt = _("Node %(node)s is protected and cannot be undeployed, " _msg_fmt = _("Node %(node)s is protected and cannot be undeployed, "
"rebuilt or deleted") "rebuilt or deleted")

View File

@ -113,16 +113,20 @@ opts = [
cfg.IntOpt('command_timeout', cfg.IntOpt('command_timeout',
default=60, default=60,
mutable=True, mutable=True,
help=_('Timeout (in seconds) for IPA commands. ' help=_('Timeout (in seconds) for IPA commands.')),
'Please note, the bootloader installation command '
'to the agent is permitted a timeout of twice the '
'value set here as these are IO heavy operations '
'depending on the configuration of the instance.')),
cfg.IntOpt('max_command_attempts', cfg.IntOpt('max_command_attempts',
default=3, default=3,
help=_('This is the maximum number of attempts that will be ' help=_('This is the maximum number of attempts that will be '
'done for IPA commands that fails due to network ' 'done for IPA commands that fails due to network '
'problems.')), 'problems.')),
cfg.IntOpt('command_wait_attempts',
default=100,
help=_('Number of attempts to check for asynchronous commands '
'completion before timing out.')),
cfg.IntOpt('command_wait_interval',
default=6,
help=_('Number of seconds to wait for between checks for '
'asynchronous commands completion.')),
cfg.IntOpt('neutron_agent_poll_interval', cfg.IntOpt('neutron_agent_poll_interval',
default=2, default=2,
mutable=True, mutable=True,

View File

@ -56,13 +56,60 @@ class AgentClient(object):
'params': params, 'params': params,
}) })
def _raise_if_typeerror(self, result, node, method):
error = result.get('command_error')
if error and error.get('type') == 'TypeError':
LOG.error('Agent command %(method)s for node %(node)s failed. '
'Internal TypeError detected: Error %(error)s',
{'method': method, 'node': node.uuid, 'error': error})
raise exception.AgentAPIError(node=node.uuid,
status=error.get('code'),
error=result.get('faultstring'))
@METRICS.timer('AgentClient._wait_for_command')
@retrying.retry(
retry_on_exception=(
lambda e: isinstance(e, exception.AgentCommandTimeout)),
stop_max_attempt_number=CONF.agent.command_wait_attempts,
wait_fixed=CONF.agent.command_wait_interval * 1000)
def _wait_for_command(self, node, method):
"""Wait for a command to complete.
:param node: A Node object.
:param method: A string represents the command executed by agent.
"""
try:
method = method.split('.', 1)[1]
except IndexError:
pass
commands = self.get_commands_status(node)
try:
result = next(c for c in reversed(commands)
if c.get('command_name') == method)
except StopIteration:
LOG.debug('Command %(cmd)s is not in the executing commands list '
'for node %(node)s',
{'cmd': method, 'node': node.uuid})
raise exception.AgentCommandTimeout(command=method, node=node.uuid)
if result.get('command_status') == 'RUNNING':
LOG.debug('Command %(cmd)s has not finished yet for node %(node)s',
{'cmd': method, 'node': node.uuid})
raise exception.AgentCommandTimeout(command=method, node=node.uuid)
else:
LOG.debug('Command %(cmd)s has finished for node %(node)s with '
'result %(result)s',
{'cmd': method, 'node': node.uuid, 'result': result})
self._raise_if_typeerror(result, node, method)
return result
@METRICS.timer('AgentClient._command') @METRICS.timer('AgentClient._command')
@retrying.retry( @retrying.retry(
retry_on_exception=( retry_on_exception=(
lambda e: isinstance(e, exception.AgentConnectionFailed)), lambda e: isinstance(e, exception.AgentConnectionFailed)),
stop_max_attempt_number=CONF.agent.max_command_attempts) stop_max_attempt_number=CONF.agent.max_command_attempts)
def _command(self, node, method, params, wait=False, def _command(self, node, method, params, wait=False, poll=False):
command_timeout_factor=1):
"""Sends command to agent. """Sends command to agent.
:param node: A Node object. :param node: A Node object.
@ -72,19 +119,16 @@ class AgentClient(object):
body. body.
:param wait: True to wait for the command to finish executing, False :param wait: True to wait for the command to finish executing, False
otherwise. otherwise.
:param command_timeout_factor: An integer, default 1, by which to :param poll: Whether to poll the command until completion. Provides
multiply the [agent]command_timeout a better alternative to `wait` for long-running commands.
value. This is intended for use with
extremely long running commands to
the agent ramdisk where a general
timeout value should not be extended
in all cases.
:raises: IronicException when failed to issue the request or there was :raises: IronicException when failed to issue the request or there was
a malformed response from the agent. a malformed response from the agent.
:raises: AgentAPIError when agent failed to execute specified command. :raises: AgentAPIError when agent failed to execute specified command.
:returns: A dict containing command result from agent, see :returns: A dict containing command result from agent, see
get_commands_status for a sample. get_commands_status for a sample.
""" """
assert not (wait and poll)
url = self._get_command_url(node) url = self._get_command_url(node)
body = self._get_command_body(method, params) body = self._get_command_body(method, params)
request_params = { request_params = {
@ -99,7 +143,7 @@ class AgentClient(object):
try: try:
response = self.session.post( response = self.session.post(
url, params=request_params, data=body, url, params=request_params, data=body,
timeout=CONF.agent.command_timeout * command_timeout_factor) timeout=CONF.agent.command_timeout)
except (requests.ConnectionError, requests.Timeout) as e: except (requests.ConnectionError, requests.Timeout) as e:
msg = (_('Failed to connect to the agent running on node %(node)s ' msg = (_('Failed to connect to the agent running on node %(node)s '
'for invoking command %(method)s. Error: %(error)s') % 'for invoking command %(method)s. Error: %(error)s') %
@ -128,12 +172,6 @@ class AgentClient(object):
raise exception.IronicException(msg) raise exception.IronicException(msg)
error = result.get('command_error') error = result.get('command_error')
exc_type = None
if error:
# if an error, we should see if a type field exists. This type
# field may signal an exception that is compatability based.
exc_type = error.get('type')
LOG.debug('Agent command %(method)s for node %(node)s returned ' LOG.debug('Agent command %(method)s for node %(node)s returned '
'result %(res)s, error %(error)s, HTTP status code %(code)d', 'result %(res)s, error %(error)s, HTTP status code %(code)d',
{'node': node.uuid, 'method': method, {'node': node.uuid, 'method': method,
@ -149,14 +187,11 @@ class AgentClient(object):
raise exception.AgentAPIError(node=node.uuid, raise exception.AgentAPIError(node=node.uuid,
status=response.status_code, status=response.status_code,
error=result.get('faultstring')) error=result.get('faultstring'))
if exc_type == 'TypeError':
LOG.error('Agent command %(method)s for node %(node)s failed. ' self._raise_if_typeerror(result, node, method)
'Internal %(exc_type)s error detected: Error %(error)s',
{'method': method, 'node': node.uuid, if poll:
'exc_type': exc_type, 'error': error}) result = self._wait_for_command(node, method)
raise exception.AgentAPIError(node=node.uuid,
status=error.get('code'),
error=result.get('faultstring'))
return result return result
@ -245,7 +280,7 @@ class AgentClient(object):
return self._command(node=node, return self._command(node=node,
method='standby.prepare_image', method='standby.prepare_image',
params=params, params=params,
wait=wait) poll=wait)
@METRICS.timer('AgentClient.start_iscsi_target') @METRICS.timer('AgentClient.start_iscsi_target')
def start_iscsi_target(self, node, iqn, def start_iscsi_target(self, node, iqn,
@ -313,8 +348,7 @@ class AgentClient(object):
return self._command(node=node, return self._command(node=node,
method='image.install_bootloader', method='image.install_bootloader',
params=params, params=params,
wait=True, poll=True)
command_timeout_factor=2)
except exception.AgentAPIError: except exception.AgentAPIError:
# NOTE(arne_wiebalck): If for software RAID and 'uefi' as the boot # NOTE(arne_wiebalck): If for software RAID and 'uefi' as the boot
# mode, we find that the IPA does not yet support the additional # mode, we find that the IPA does not yet support the additional
@ -338,8 +372,7 @@ class AgentClient(object):
return self._command(node=node, return self._command(node=node,
method='image.install_bootloader', method='image.install_bootloader',
params=params, params=params,
wait=True, poll=True)
command_timeout_factor=2)
@METRICS.timer('AgentClient.get_clean_steps') @METRICS.timer('AgentClient.get_clean_steps')
def get_clean_steps(self, node, ports): def get_clean_steps(self, node, ports):

View File

@ -29,13 +29,29 @@ CONF = conf.CONF
class MockResponse(object): class MockResponse(object):
def __init__(self, text, status_code=http_client.OK): def __init__(self, data=None, status_code=http_client.OK, text=None):
assert isinstance(text, str) assert not (data and text)
self.text = text self.text = text
self.data = data
self.status_code = status_code self.status_code = status_code
def json(self): def json(self):
return json.loads(self.text) if self.text:
return json.loads(self.text)
else:
return self.data
class MockCommandStatus(MockResponse):
def __init__(self, status, name='fake', error=None):
super().__init__({
'commands': [
{'command_name': name,
'command_status': status,
'command_result': 'I did something',
'command_error': error}
]
})
class MockNode(object): class MockNode(object):
@ -87,8 +103,7 @@ class TestAgentClient(base.TestCase):
def test__command(self): def test__command(self):
response_data = {'status': 'ok'} response_data = {'status': 'ok'}
response_text = json.dumps(response_data) self.client.session.post.return_value = MockResponse(response_data)
self.client.session.post.return_value = MockResponse(response_text)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
@ -106,7 +121,8 @@ class TestAgentClient(base.TestCase):
def test__command_fail_json(self): def test__command_fail_json(self):
response_text = 'this be not json matey!' response_text = 'this be not json matey!'
self.client.session.post.return_value = MockResponse(response_text) self.client.session.post.return_value = MockResponse(
text=response_text)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
@ -159,7 +175,7 @@ class TestAgentClient(base.TestCase):
'error': error}, str(e)) 'error': error}, str(e))
def test__command_error_code(self): def test__command_error_code(self):
response_text = '{"faultstring": "you dun goofd"}' response_text = {"faultstring": "you dun goofd"}
self.client.session.post.return_value = MockResponse( self.client.session.post.return_value = MockResponse(
response_text, status_code=http_client.BAD_REQUEST) response_text, status_code=http_client.BAD_REQUEST)
method = 'standby.run_image' method = 'standby.run_image'
@ -179,10 +195,9 @@ class TestAgentClient(base.TestCase):
timeout=60) timeout=60)
def test__command_error_code_okay_error_typeerror_embedded(self): def test__command_error_code_okay_error_typeerror_embedded(self):
response_text = ('{"faultstring": "you dun goofd", ' response_data = {"faultstring": "you dun goofd",
'"command_error": {"type": "TypeError"}}') "command_error": {"type": "TypeError"}}
self.client.session.post.return_value = MockResponse( self.client.session.post.return_value = MockResponse(response_data)
response_text)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
@ -199,6 +214,36 @@ class TestAgentClient(base.TestCase):
params={'wait': 'false'}, params={'wait': 'false'},
timeout=60) timeout=60)
@mock.patch('time.sleep', lambda seconds: None)
def test__command_poll(self):
response_data = {'status': 'ok'}
final_status = MockCommandStatus('SUCCEEDED', name='run_image')
self.client.session.post.return_value = MockResponse(response_data)
self.client.session.get.side_effect = [
MockCommandStatus('RUNNING', name='run_image'),
final_status,
]
method = 'standby.run_image'
image_info = {'image_id': 'test_image'}
params = {'image_info': image_info}
expected = {'command_error': None,
'command_name': 'run_image',
'command_result': 'I did something',
'command_status': 'SUCCEEDED'}
url = self.client._get_command_url(self.node)
body = self.client._get_command_body(method, params)
response = self.client._command(self.node, method, params, poll=True)
self.assertEqual(expected, response)
self.client.session.post.assert_called_once_with(
url,
data=body,
params={'wait': 'false'},
timeout=60)
self.client.session.get.assert_called_with(url, timeout=60)
def test_get_commands_status(self): def test_get_commands_status(self):
with mock.patch.object(self.client.session, 'get', with mock.patch.object(self.client.session, 'get',
autospec=True) as mock_get: autospec=True) as mock_get:
@ -234,7 +279,7 @@ class TestAgentClient(base.TestCase):
wait=False) wait=False)
self.client._command.assert_called_once_with( self.client._command.assert_called_once_with(
node=self.node, method='standby.prepare_image', node=self.node, method='standby.prepare_image',
params=params, wait=False) params=params, poll=False)
def test_prepare_image_with_configdrive(self): def test_prepare_image_with_configdrive(self):
self.client._command = mock.MagicMock(spec_set=[]) self.client._command = mock.MagicMock(spec_set=[])
@ -251,7 +296,19 @@ class TestAgentClient(base.TestCase):
wait=False) wait=False)
self.client._command.assert_called_once_with( self.client._command.assert_called_once_with(
node=self.node, method='standby.prepare_image', node=self.node, method='standby.prepare_image',
params=params, wait=False) params=params, poll=False)
def test_prepare_image_with_wait(self):
self.client._command = mock.MagicMock(spec_set=[])
image_info = {'image_id': 'image'}
params = {'image_info': image_info}
self.client.prepare_image(self.node,
image_info,
wait=True)
self.client._command.assert_called_once_with(
node=self.node, method='standby.prepare_image',
params=params, poll=True)
def test_start_iscsi_target(self): def test_start_iscsi_target(self):
self.client._command = mock.MagicMock(spec_set=[]) self.client._command = mock.MagicMock(spec_set=[])
@ -305,9 +362,8 @@ class TestAgentClient(base.TestCase):
self.node, root_uuid, efi_system_part_uuid=efi_system_part_uuid, self.node, root_uuid, efi_system_part_uuid=efi_system_part_uuid,
prep_boot_part_uuid=prep_boot_part_uuid, target_boot_mode='hello') prep_boot_part_uuid=prep_boot_part_uuid, target_boot_mode='hello')
self.client._command.assert_called_once_with( self.client._command.assert_called_once_with(
command_timeout_factor=2, node=self.node, node=self.node, method='image.install_bootloader', params=params,
method='image.install_bootloader', params=params, poll=True)
wait=True)
def test_install_bootloader(self): def test_install_bootloader(self):
self._test_install_bootloader(root_uuid='fake-root-uuid', self._test_install_bootloader(root_uuid='fake-root-uuid',
@ -415,8 +471,7 @@ class TestAgentClient(base.TestCase):
def test__command_agent_client(self): def test__command_agent_client(self):
response_data = {'status': 'ok'} response_data = {'status': 'ok'}
response_text = json.dumps(response_data) self.client.session.post.return_value = MockResponse(response_data)
self.client.session.post.return_value = MockResponse(response_text)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
@ -472,13 +527,12 @@ class TestAgentClientAttempts(base.TestCase):
mock_sleep.return_value = None mock_sleep.return_value = None
error = 'Connection Timeout' error = 'Connection Timeout'
response_data = {'status': 'ok'} response_data = {'status': 'ok'}
response_text = json.dumps(response_data)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
self.client.session.post.side_effect = [requests.Timeout(error), self.client.session.post.side_effect = [requests.Timeout(error),
requests.Timeout(error), requests.Timeout(error),
MockResponse(response_text)] MockResponse(response_data)]
response = self.client._command(self.node, method, params) response = self.client._command(self.node, method, params)
self.assertEqual(3, self.client.session.post.call_count) self.assertEqual(3, self.client.session.post.call_count)
@ -494,12 +548,11 @@ class TestAgentClientAttempts(base.TestCase):
mock_sleep.return_value = None mock_sleep.return_value = None
error = 'Connection Timeout' error = 'Connection Timeout'
response_data = {'status': 'ok'} response_data = {'status': 'ok'}
response_text = json.dumps(response_data)
method = 'standby.run_image' method = 'standby.run_image'
image_info = {'image_id': 'test_image'} image_info = {'image_id': 'test_image'}
params = {'image_info': image_info} params = {'image_info': image_info}
self.client.session.post.side_effect = [requests.Timeout(error), self.client.session.post.side_effect = [requests.Timeout(error),
MockResponse(response_text), MockResponse(response_data),
requests.Timeout(error)] requests.Timeout(error)]
response = self.client._command(self.node, method, params) response = self.client._command(self.node, method, params)

View File

@ -0,0 +1,7 @@
---
fixes:
- |
Instead of increasing timeout when running long synchronous tasks on
ironic-python-agent, ironic now runs them asynchronously and polls
the agent until completion. It is no longer necessary to account for
long-running tasks when setting ``[agent]command_timeout``.