Merge "Optimize node locking on heartbeat"
This commit is contained in:
commit
87fe7cce4e
@ -258,6 +258,11 @@ class HeartbeatMixin(object):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def heartbeat_allowed_states(self):
|
||||||
|
"""Define node states where heartbeating is allowed"""
|
||||||
|
return (states.DEPLOYWAIT, states.CLEANWAIT)
|
||||||
|
|
||||||
@METRICS.timer('HeartbeatMixin.heartbeat')
|
@METRICS.timer('HeartbeatMixin.heartbeat')
|
||||||
def heartbeat(self, task, callback_url):
|
def heartbeat(self, task, callback_url):
|
||||||
"""Process a heartbeat.
|
"""Process a heartbeat.
|
||||||
@ -265,8 +270,14 @@ class HeartbeatMixin(object):
|
|||||||
:param task: task to work with.
|
:param task: task to work with.
|
||||||
:param callback_url: agent HTTP API URL.
|
:param callback_url: agent HTTP API URL.
|
||||||
"""
|
"""
|
||||||
# TODO(dtantsur): upgrade lock only if we actually take action other
|
# NOTE(pas-ha) immediately skip the rest if nothing to do
|
||||||
# than updating the last timestamp.
|
if task.node.provision_state not in self.heartbeat_allowed_states:
|
||||||
|
LOG.debug('Heartbeat from node %(node)s in unsupported '
|
||||||
|
'provision state %(state), not taking any action.',
|
||||||
|
{'node': task.node.uuid,
|
||||||
|
'state': task.node.provision_state})
|
||||||
|
return
|
||||||
|
|
||||||
task.upgrade_lock()
|
task.upgrade_lock()
|
||||||
|
|
||||||
node = task.node
|
node = task.node
|
||||||
@ -274,13 +285,6 @@ class HeartbeatMixin(object):
|
|||||||
|
|
||||||
driver_internal_info = node.driver_internal_info
|
driver_internal_info = node.driver_internal_info
|
||||||
driver_internal_info['agent_url'] = callback_url
|
driver_internal_info['agent_url'] = callback_url
|
||||||
|
|
||||||
# TODO(rloo): 'agent_last_heartbeat' was deprecated since it wasn't
|
|
||||||
# being used so remove that entry if it exists.
|
|
||||||
# Hopefully all nodes will have been updated by Pike, so
|
|
||||||
# we can delete this code then.
|
|
||||||
driver_internal_info.pop('agent_last_heartbeat', None)
|
|
||||||
|
|
||||||
node.driver_internal_info = driver_internal_info
|
node.driver_internal_info = driver_internal_info
|
||||||
node.save()
|
node.save()
|
||||||
|
|
||||||
|
@ -552,12 +552,6 @@ class TestAgentDeploy(db_base.DbTestCase):
|
|||||||
tear_down_cleaning_mock.assert_called_once_with(
|
tear_down_cleaning_mock.assert_called_once_with(
|
||||||
task, manage_boot=False)
|
task, manage_boot=False)
|
||||||
|
|
||||||
def test_heartbeat(self):
|
|
||||||
with task_manager.acquire(self.context, self.node.uuid,
|
|
||||||
shared=True) as task:
|
|
||||||
self.driver.heartbeat(task, 'url')
|
|
||||||
self.assertFalse(task.shared)
|
|
||||||
|
|
||||||
def _test_continue_deploy(self, additional_driver_info=None,
|
def _test_continue_deploy(self, additional_driver_info=None,
|
||||||
additional_expected_image_info=None):
|
additional_expected_image_info=None):
|
||||||
self.node.provision_state = states.DEPLOYWAIT
|
self.node.provision_state = states.DEPLOYWAIT
|
||||||
|
@ -64,6 +64,53 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
|
|||||||
super(HeartbeatMixinTest, self).setUp()
|
super(HeartbeatMixinTest, self).setUp()
|
||||||
self.deploy = agent_base_vendor.HeartbeatMixin()
|
self.deploy = agent_base_vendor.HeartbeatMixin()
|
||||||
|
|
||||||
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin, 'continue_deploy',
|
||||||
|
autospec=True)
|
||||||
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
||||||
|
'reboot_to_instance', autospec=True)
|
||||||
|
@mock.patch.object(agent_base_vendor, '_notify_conductor_resume_clean',
|
||||||
|
autospec=True)
|
||||||
|
def test_heartbeat_in_maintenance(self, ncrc_mock, rti_mock, cd_mock):
|
||||||
|
# NOTE(pas-ha) checking only for states that are not noop
|
||||||
|
for state in (states.DEPLOYWAIT, states.CLEANWAIT):
|
||||||
|
for m in (ncrc_mock, rti_mock, cd_mock):
|
||||||
|
m.reset_mock()
|
||||||
|
self.node.provision_state = state
|
||||||
|
self.node.maintenance = True
|
||||||
|
self.node.save()
|
||||||
|
agent_url = 'url-%s' % state
|
||||||
|
with task_manager.acquire(self.context, self.node.uuid,
|
||||||
|
shared=True) as task:
|
||||||
|
self.deploy.heartbeat(task, agent_url)
|
||||||
|
self.assertFalse(task.shared)
|
||||||
|
self.assertEqual(
|
||||||
|
agent_url,
|
||||||
|
task.node.driver_internal_info['agent_url'])
|
||||||
|
self.assertEqual(0, ncrc_mock.call_count)
|
||||||
|
self.assertEqual(0, rti_mock.call_count)
|
||||||
|
self.assertEqual(0, cd_mock.call_count)
|
||||||
|
|
||||||
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin, 'continue_deploy',
|
||||||
|
autospec=True)
|
||||||
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
||||||
|
'reboot_to_instance', autospec=True)
|
||||||
|
@mock.patch.object(agent_base_vendor, '_notify_conductor_resume_clean',
|
||||||
|
autospec=True)
|
||||||
|
def test_heartbeat_noops_in_wrong_state(self, ncrc_mock, rti_mock,
|
||||||
|
cd_mock):
|
||||||
|
allowed = {states.DEPLOYWAIT, states.CLEANWAIT}
|
||||||
|
for state in set(states.machine.states) - allowed:
|
||||||
|
for m in (ncrc_mock, rti_mock, cd_mock):
|
||||||
|
m.reset_mock()
|
||||||
|
with task_manager.acquire(self.context, self.node.uuid,
|
||||||
|
shared=True) as task:
|
||||||
|
self.node.provision_state = state
|
||||||
|
self.deploy.heartbeat(task, 'url')
|
||||||
|
self.assertTrue(task.shared)
|
||||||
|
self.assertEqual(0, ncrc_mock.call_count)
|
||||||
|
self.assertEqual(0, rti_mock.call_count)
|
||||||
|
self.assertEqual(0, cd_mock.call_count)
|
||||||
|
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
||||||
'deploy_has_started', autospec=True)
|
'deploy_has_started', autospec=True)
|
||||||
@mock.patch.object(deploy_utils, 'set_failed_state', autospec=True)
|
@mock.patch.object(deploy_utils, 'set_failed_state', autospec=True)
|
||||||
@ -215,57 +262,11 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
|
|||||||
mock_continue.assert_called_once_with(mock.ANY, task)
|
mock_continue.assert_called_once_with(mock.ANY, task)
|
||||||
mock_handler.assert_called_once_with(task, mock.ANY)
|
mock_handler.assert_called_once_with(task, mock.ANY)
|
||||||
|
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin, 'continue_deploy',
|
|
||||||
autospec=True)
|
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
|
||||||
'reboot_to_instance', autospec=True)
|
|
||||||
@mock.patch.object(agent_base_vendor, '_notify_conductor_resume_clean',
|
|
||||||
autospec=True)
|
|
||||||
def test_heartbeat_no_agent_last_heartbeat(self, ncrc_mock, rti_mock,
|
|
||||||
cd_mock):
|
|
||||||
"""node.driver_internal_info doesn't have 'agent_last_heartbeat'."""
|
|
||||||
node = self.node
|
|
||||||
node.maintenance = True
|
|
||||||
node.provision_state = states.AVAILABLE
|
|
||||||
driver_internal_info = {'agent_last_heartbeat': 'time'}
|
|
||||||
node.driver_internal_info = driver_internal_info
|
|
||||||
node.save()
|
|
||||||
with task_manager.acquire(
|
|
||||||
self.context, node['uuid'], shared=False) as task:
|
|
||||||
self.deploy.heartbeat(task, 'http://127.0.0.1:8080')
|
|
||||||
|
|
||||||
self.assertEqual(0, ncrc_mock.call_count)
|
|
||||||
self.assertEqual(0, rti_mock.call_count)
|
|
||||||
self.assertEqual(0, cd_mock.call_count)
|
|
||||||
node.refresh()
|
|
||||||
self.assertNotIn('agent_last_heartbeat', node.driver_internal_info)
|
|
||||||
|
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin, 'continue_deploy',
|
|
||||||
autospec=True)
|
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
|
||||||
'reboot_to_instance', autospec=True)
|
|
||||||
@mock.patch.object(agent_base_vendor, '_notify_conductor_resume_clean',
|
|
||||||
autospec=True)
|
|
||||||
def test_heartbeat_noops_maintenance_mode(self, ncrc_mock, rti_mock,
|
|
||||||
cd_mock):
|
|
||||||
"""Ensures that heartbeat() no-ops for a maintenance node."""
|
|
||||||
self.node.maintenance = True
|
|
||||||
for state in (states.AVAILABLE, states.DEPLOYWAIT, states.DEPLOYING,
|
|
||||||
states.CLEANING):
|
|
||||||
self.node.provision_state = state
|
|
||||||
self.node.save()
|
|
||||||
with task_manager.acquire(
|
|
||||||
self.context, self.node['uuid'], shared=False) as task:
|
|
||||||
self.deploy.heartbeat(task, 'http://127.0.0.1:8080')
|
|
||||||
|
|
||||||
self.assertEqual(0, ncrc_mock.call_count)
|
|
||||||
self.assertEqual(0, rti_mock.call_count)
|
|
||||||
self.assertEqual(0, cd_mock.call_count)
|
|
||||||
|
|
||||||
@mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
|
@mock.patch.object(objects.node.Node, 'touch_provisioning', autospec=True)
|
||||||
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
@mock.patch.object(agent_base_vendor.HeartbeatMixin,
|
||||||
'deploy_has_started', autospec=True)
|
'deploy_has_started', autospec=True)
|
||||||
def test_heartbeat_touch_provisioning(self, mock_deploy_started,
|
def test_heartbeat_touch_provisioning_and_url_save(self,
|
||||||
|
mock_deploy_started,
|
||||||
mock_touch):
|
mock_touch):
|
||||||
mock_deploy_started.return_value = True
|
mock_deploy_started.return_value = True
|
||||||
|
|
||||||
@ -274,7 +275,8 @@ class HeartbeatMixinTest(AgentDeployMixinBaseTest):
|
|||||||
with task_manager.acquire(
|
with task_manager.acquire(
|
||||||
self.context, self.node.uuid, shared=False) as task:
|
self.context, self.node.uuid, shared=False) as task:
|
||||||
self.deploy.heartbeat(task, 'http://127.0.0.1:8080')
|
self.deploy.heartbeat(task, 'http://127.0.0.1:8080')
|
||||||
|
self.assertEqual('http://127.0.0.1:8080',
|
||||||
|
task.node.driver_internal_info['agent_url'])
|
||||||
mock_touch.assert_called_once_with(mock.ANY)
|
mock_touch.assert_called_once_with(mock.ANY)
|
||||||
|
|
||||||
|
|
||||||
|
@ -786,12 +786,6 @@ class ISCSIDeployTestCase(db_base.DbTestCase):
|
|||||||
agent_execute_clean_step_mock.assert_called_once_with(
|
agent_execute_clean_step_mock.assert_called_once_with(
|
||||||
task, {'some-step': 'step-info'})
|
task, {'some-step': 'step-info'})
|
||||||
|
|
||||||
def test_heartbeat(self):
|
|
||||||
with task_manager.acquire(self.context, self.node.uuid,
|
|
||||||
shared=True) as task:
|
|
||||||
self.driver.deploy.heartbeat(task, 'url')
|
|
||||||
self.assertFalse(task.shared)
|
|
||||||
|
|
||||||
@mock.patch.object(agent_base_vendor.AgentDeployMixin,
|
@mock.patch.object(agent_base_vendor.AgentDeployMixin,
|
||||||
'reboot_and_finish_deploy', autospec=True)
|
'reboot_and_finish_deploy', autospec=True)
|
||||||
@mock.patch.object(iscsi_deploy, 'do_agent_iscsi_deploy', autospec=True)
|
@mock.patch.object(iscsi_deploy, 'do_agent_iscsi_deploy', autospec=True)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user