Merge "Simplify heartbeating by removing use of select()"
This commit is contained in:
commit
faeb9441d3
@ -14,9 +14,7 @@
|
|||||||
|
|
||||||
import collections
|
import collections
|
||||||
import ipaddress
|
import ipaddress
|
||||||
import os
|
|
||||||
import random
|
import random
|
||||||
import select
|
|
||||||
import socket
|
import socket
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@ -79,13 +77,6 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
min_jitter_multiplier = 0.3
|
min_jitter_multiplier = 0.3
|
||||||
max_jitter_multiplier = 0.6
|
max_jitter_multiplier = 0.6
|
||||||
|
|
||||||
# Exponential backoff values used in case of an error. In reality we will
|
|
||||||
# only wait a portion of either of these delays based on the jitter
|
|
||||||
# multipliers.
|
|
||||||
initial_delay = 1.0
|
|
||||||
max_delay = 300.0
|
|
||||||
backoff_factor = 2.7
|
|
||||||
|
|
||||||
def __init__(self, agent):
|
def __init__(self, agent):
|
||||||
"""Initialize the heartbeat thread.
|
"""Initialize the heartbeat thread.
|
||||||
|
|
||||||
@ -94,39 +85,19 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
"""
|
"""
|
||||||
super(IronicPythonAgentHeartbeater, self).__init__()
|
super(IronicPythonAgentHeartbeater, self).__init__()
|
||||||
self.agent = agent
|
self.agent = agent
|
||||||
|
self.stop_event = threading.Event()
|
||||||
self.api = agent.api_client
|
self.api = agent.api_client
|
||||||
self.error_delay = self.initial_delay
|
self.interval = 0
|
||||||
self.reader = None
|
|
||||||
self.writer = None
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
"""Start the heartbeat thread."""
|
"""Start the heartbeat thread."""
|
||||||
# The first heartbeat happens immediately
|
# The first heartbeat happens immediately
|
||||||
LOG.info('starting heartbeater')
|
LOG.info('Starting heartbeater')
|
||||||
interval = 0
|
|
||||||
self.agent.set_agent_advertise_addr()
|
self.agent.set_agent_advertise_addr()
|
||||||
|
|
||||||
self.reader, self.writer = os.pipe()
|
while not self.stop_event.wait(self.interval):
|
||||||
p = select.poll()
|
self.do_heartbeat()
|
||||||
p.register(self.reader, select.POLLIN)
|
eventlet.sleep(0)
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
if p.poll(interval * 1000):
|
|
||||||
if os.read(self.reader, 1).decode() == 'a':
|
|
||||||
break
|
|
||||||
|
|
||||||
self.do_heartbeat()
|
|
||||||
interval_multiplier = random.uniform(
|
|
||||||
self.min_jitter_multiplier,
|
|
||||||
self.max_jitter_multiplier)
|
|
||||||
interval = self.agent.heartbeat_timeout * interval_multiplier
|
|
||||||
log_msg = 'sleeping before next heartbeat, interval: {}'
|
|
||||||
LOG.info(log_msg.format(interval))
|
|
||||||
finally:
|
|
||||||
os.close(self.reader)
|
|
||||||
os.close(self.writer)
|
|
||||||
self.reader = None
|
|
||||||
self.writer = None
|
|
||||||
|
|
||||||
def do_heartbeat(self):
|
def do_heartbeat(self):
|
||||||
"""Send a heartbeat to Ironic."""
|
"""Send a heartbeat to Ironic."""
|
||||||
@ -137,28 +108,28 @@ class IronicPythonAgentHeartbeater(threading.Thread):
|
|||||||
advertise_protocol=self.agent.advertise_protocol,
|
advertise_protocol=self.agent.advertise_protocol,
|
||||||
generated_cert=self.agent.generated_cert,
|
generated_cert=self.agent.generated_cert,
|
||||||
)
|
)
|
||||||
self.error_delay = self.initial_delay
|
|
||||||
LOG.info('heartbeat successful')
|
LOG.info('heartbeat successful')
|
||||||
except errors.HeartbeatConflictError:
|
except errors.HeartbeatConflictError:
|
||||||
LOG.warning('conflict error sending heartbeat to {}'.format(
|
LOG.warning('conflict error sending heartbeat to {}'.format(
|
||||||
self.agent.api_url))
|
self.agent.api_url))
|
||||||
self.error_delay = min(self.error_delay * self.backoff_factor,
|
|
||||||
self.max_delay)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception('error sending heartbeat to {}'.format(
|
LOG.exception('error sending heartbeat to {}'.format(
|
||||||
self.agent.api_url))
|
self.agent.api_url))
|
||||||
self.error_delay = min(self.error_delay * self.backoff_factor,
|
finally:
|
||||||
self.max_delay)
|
interval_multiplier = random.uniform(self.min_jitter_multiplier,
|
||||||
|
self.max_jitter_multiplier)
|
||||||
|
self.interval = self.agent.heartbeat_timeout * interval_multiplier
|
||||||
|
log_msg = 'sleeping before next heartbeat, interval: {0}'
|
||||||
|
LOG.info(log_msg.format(self.interval))
|
||||||
|
|
||||||
def force_heartbeat(self):
|
def force_heartbeat(self):
|
||||||
os.write(self.writer, b'b')
|
self.do_heartbeat()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""Stop the heartbeat thread."""
|
"""Stop the heartbeat thread."""
|
||||||
if self.writer is not None:
|
LOG.info('stopping heartbeater')
|
||||||
LOG.info('stopping heartbeater')
|
self.stop_event.set()
|
||||||
os.write(self.writer, b'a')
|
return self.join()
|
||||||
return self.join()
|
|
||||||
|
|
||||||
|
|
||||||
class IronicPythonAgent(base.ExecuteCommandMixin):
|
class IronicPythonAgent(base.ExecuteCommandMixin):
|
||||||
|
@ -61,21 +61,19 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
hardware.HardwareManager)
|
hardware.HardwareManager)
|
||||||
self.heartbeater.stop_event = mock.Mock()
|
self.heartbeater.stop_event = mock.Mock()
|
||||||
|
|
||||||
@mock.patch('os.read', autospec=True)
|
|
||||||
@mock.patch('select.poll', autospec=True)
|
|
||||||
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
@mock.patch('ironic_python_agent.agent._time', autospec=True)
|
||||||
@mock.patch('random.uniform', autospec=True)
|
@mock.patch('random.uniform', autospec=True)
|
||||||
def test_heartbeat(self, mock_uniform, mock_time, mock_poll, mock_read):
|
def test_heartbeat(self, mock_uniform, mock_time):
|
||||||
time_responses = []
|
time_responses = []
|
||||||
uniform_responses = []
|
uniform_responses = []
|
||||||
heartbeat_responses = []
|
heartbeat_responses = []
|
||||||
poll_responses = []
|
wait_responses = []
|
||||||
expected_poll_calls = []
|
expected_stop_calls = []
|
||||||
|
|
||||||
# FIRST RUN:
|
# FIRST RUN:
|
||||||
# initial delay is 0
|
# initial delay is 0
|
||||||
expected_poll_calls.append(mock.call(0))
|
expected_stop_calls.append(mock.call(0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# next heartbeat due at t=100
|
# next heartbeat due at t=100
|
||||||
heartbeat_responses.append(100)
|
heartbeat_responses.append(100)
|
||||||
# random interval multiplier is 0.5
|
# random interval multiplier is 0.5
|
||||||
@ -85,8 +83,8 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
|
|
||||||
# SECOND RUN:
|
# SECOND RUN:
|
||||||
# 50 * .5 = 25
|
# 50 * .5 = 25
|
||||||
expected_poll_calls.append(mock.call(1000 * 25.0))
|
expected_stop_calls.append(mock.call(25.0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# next heartbeat due at t=180
|
# next heartbeat due at t=180
|
||||||
heartbeat_responses.append(180)
|
heartbeat_responses.append(180)
|
||||||
# random interval multiplier is 0.4
|
# random interval multiplier is 0.4
|
||||||
@ -96,36 +94,34 @@ class TestHeartbeater(ironic_agent_base.IronicAgentTest):
|
|||||||
|
|
||||||
# THIRD RUN:
|
# THIRD RUN:
|
||||||
# 50 * .4 = 20
|
# 50 * .4 = 20
|
||||||
expected_poll_calls.append(mock.call(1000 * 20.0))
|
expected_stop_calls.append(mock.call(20.0))
|
||||||
poll_responses.append(False)
|
wait_responses.append(False)
|
||||||
# this heartbeat attempt fails
|
# this heartbeat attempt fails
|
||||||
heartbeat_responses.append(Exception('uh oh!'))
|
heartbeat_responses.append(Exception('uh oh!'))
|
||||||
# we check the time to generate a fake deadline, now t=125
|
|
||||||
time_responses.append(125)
|
|
||||||
# random interval multiplier is 0.5
|
# random interval multiplier is 0.5
|
||||||
uniform_responses.append(0.5)
|
uniform_responses.append(0.5)
|
||||||
|
# we check the time to generate a fake deadline, now t=125
|
||||||
|
time_responses.append(125)
|
||||||
# time is now 125.5
|
# time is now 125.5
|
||||||
time_responses.append(125.5)
|
time_responses.append(125.5)
|
||||||
|
|
||||||
# FOURTH RUN:
|
# FOURTH RUN:
|
||||||
# 50 * .5 = 25
|
# 50 * .5 = 20
|
||||||
expected_poll_calls.append(mock.call(1000 * 25.0))
|
expected_stop_calls.append(mock.call(25.0))
|
||||||
# Stop now
|
# Stop now
|
||||||
poll_responses.append(True)
|
wait_responses.append(True)
|
||||||
mock_read.return_value = b'a'
|
|
||||||
|
|
||||||
# Hook it up and run it
|
# Hook it up and run it
|
||||||
mock_time.side_effect = time_responses
|
mock_time.side_effect = time_responses
|
||||||
mock_uniform.side_effect = uniform_responses
|
mock_uniform.side_effect = uniform_responses
|
||||||
self.mock_agent.heartbeat_timeout = 50
|
self.mock_agent.heartbeat_timeout = 50
|
||||||
self.heartbeater.api.heartbeat.side_effect = heartbeat_responses
|
self.heartbeater.api.heartbeat.side_effect = heartbeat_responses
|
||||||
mock_poll.return_value.poll.side_effect = poll_responses
|
self.heartbeater.stop_event.wait.side_effect = wait_responses
|
||||||
self.heartbeater.run()
|
self.heartbeater.run()
|
||||||
|
|
||||||
# Validate expectations
|
# Validate expectations
|
||||||
self.assertEqual(expected_poll_calls,
|
self.assertEqual(expected_stop_calls,
|
||||||
mock_poll.return_value.poll.call_args_list)
|
self.heartbeater.stop_event.wait.call_args_list)
|
||||||
self.assertEqual(2.7, self.heartbeater.error_delay)
|
|
||||||
|
|
||||||
|
|
||||||
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
@mock.patch.object(hardware, '_md_scan_and_assemble', lambda: None)
|
||||||
|
@ -0,0 +1,7 @@
|
|||||||
|
---
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
IPA heartbeat intervals now rely on accurate clock time. Any clean or
|
||||||
|
deploy steps which attempt to sync the clock may cause heartbeats to not
|
||||||
|
be emitted. IPA syncs time at startup and shutdown, so these steps should
|
||||||
|
not be required.
|
Loading…
x
Reference in New Issue
Block a user