Implement drain shutdown support
Sending signal ``SIGUSR2`` to a conductor process will now trigger a drain shutdown. This is similar to a ``SIGTERM`` graceful shutdown but the timeout is determined by ``[DEFAULT]drain_shutdown_timeout`` which defaults to ``1800`` seconds. This is enough time for running tasks on existing reserved nodes to either complete or reach their own failure timeout. During the drain period the conductor needs to be removed from the hash ring to prevent new tasks from starting. Other conductors also need to not fail reserved nodes on the draining conductor which would appear to be orphaned. This is achieved by running the conductor keepalive heartbeat for this period, but setting the ``online`` state to ``False``. When this feature was proposed, SIGINT was suggested as the signal to use to trigger a drain shutdown. However this is already used by oslo_service fast exit[1] so using this for drain would be a change in existing behaviour. [1] https://opendev.org/openstack/oslo.service/src/branch/master/oslo_service/service.py#L340 Change-Id: I777898f5a14844c9ac9967168f33d55c4f97dfb9
This commit is contained in:
parent
ff4e836c55
commit
81acd5df24
@ -172,7 +172,7 @@ Graceful conductor service shutdown
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The ironic-conductor service is a Python process listening for messages on a
|
||||
message queue. When the operator sends the SIGTERM signal to the process, the
|
||||
message queue. When the operator sends the ``SIGTERM`` signal to the process, the
|
||||
service stops consuming messages from the queue, so that no additional work is
|
||||
picked up. It completes any outstanding work and then terminates. During this
|
||||
process, messages can be left on the queue and will be processed after the
|
||||
@ -183,11 +183,25 @@ older code, and start up a service using newer code with minimal impact.
|
||||
This was tested with RabbitMQ messaging backend and may vary with other
|
||||
backends.
|
||||
|
||||
Nodes that are being acted upon by an ironic-conductor process, which are
|
||||
not in a stable state, may encounter failures. Node failures that occur
|
||||
during an upgrade are likely due to timeouts, resulting from delays
|
||||
involving messages being processed and acted upon by a conductor
|
||||
during long running, multi-step processes such as deployment or cleaning.
|
||||
Nodes that are being acted upon by an ironic-conductor process, which are not in
|
||||
a stable state, will be put into a failed state when
|
||||
``[DEFAULT]graceful_shutdown_timeout`` is reached. Node failures that occur
|
||||
during an upgrade are likely due to timeouts, resulting from delays involving
|
||||
messages being processed and acted upon by a conductor during long running,
|
||||
multi-step processes such as deployment or cleaning.
|
||||
|
||||
Drain conductor service shutdown
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
A drain shutdown is similar to graceful shutdown, differing in the following ways:
|
||||
|
||||
* Triggered by sending signal ``SIGUSR2`` to the process instead of ``SIGTERM``
|
||||
* The timeout for process termination is determined by
|
||||
``[DEFAULT]drain_shutdown_timeout`` instead of ``[DEFAULT]graceful_shutdown_timeout``
|
||||
|
||||
``[DEFAULT]drain_shutdown_timeout`` is set long enough so that any node in a not
|
||||
stable state will have time to reach a stable state (complete or failed) before
|
||||
the ironic-conductor process terminates.
|
||||
|
||||
API load balancer draining
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -48,6 +48,7 @@ class RPCService(service.Service):
|
||||
self.deregister = True
|
||||
self._failure = None
|
||||
self._started = False
|
||||
self.draining = False
|
||||
|
||||
def wait_for_start(self):
|
||||
while not self._started and not self._failure:
|
||||
@ -130,31 +131,54 @@ class RPCService(service.Service):
|
||||
{'service': self.topic, 'host': self.host})
|
||||
|
||||
# Wait for reservation locks held by this conductor.
|
||||
# The conductor process will end when:
|
||||
# The conductor process will end when one of the following occurs:
|
||||
# - All reservations for this conductor are released
|
||||
# - CONF.graceful_shutdown_timeout has elapsed
|
||||
# - shutdown_timeout has elapsed
|
||||
# - The process manager (systemd, kubernetes) sends SIGKILL after the
|
||||
# configured graceful period
|
||||
graceful_time = initial_time + datetime.timedelta(
|
||||
seconds=CONF.graceful_shutdown_timeout)
|
||||
# configured timeout period
|
||||
while (self.manager.has_reserved()
|
||||
and graceful_time > timeutils.utcnow()):
|
||||
and not self._shutdown_timeout_reached(initial_time)):
|
||||
LOG.info('Waiting for reserved nodes to clear on host %(host)s',
|
||||
{'host': self.host})
|
||||
time.sleep(1)
|
||||
|
||||
# Stop the keepalive heartbeat greenthread sending touch(online=False)
|
||||
self.manager.keepalive_halt()
|
||||
|
||||
rpc.set_global_manager(None)
|
||||
|
||||
def _handle_signal(self, signo, frame):
|
||||
def _shutdown_timeout_reached(self, initial_time):
|
||||
if self.draining:
|
||||
shutdown_timeout = CONF.drain_shutdown_timeout
|
||||
else:
|
||||
shutdown_timeout = CONF.graceful_shutdown_timeout
|
||||
if shutdown_timeout == 0:
|
||||
# No timeout, run until no nodes are reserved
|
||||
return False
|
||||
shutdown_time = initial_time + datetime.timedelta(
|
||||
seconds=shutdown_timeout)
|
||||
return shutdown_time < timeutils.utcnow()
|
||||
|
||||
def _handle_no_deregister(self, signo, frame):
|
||||
LOG.info('Got signal SIGUSR1. Not deregistering on next shutdown '
|
||||
'of service %(service)s on host %(host)s.',
|
||||
{'service': self.topic, 'host': self.host})
|
||||
self.deregister = False
|
||||
|
||||
def handle_signal(self):
|
||||
"""Add a signal handler for SIGUSR1.
|
||||
def _handle_drain(self, signo, frame):
|
||||
LOG.info('Got signal SIGUSR2. Starting drain shutdown'
|
||||
'of service %(service)s on host %(host)s.',
|
||||
{'service': self.topic, 'host': self.host})
|
||||
self.draining = True
|
||||
self.stop()
|
||||
|
||||
The handler ensures that the manager is not deregistered when it is
|
||||
shutdown.
|
||||
def handle_signal(self):
|
||||
"""Add a signal handler for SIGUSR1, SIGUSR2.
|
||||
|
||||
The SIGUSR1 handler ensures that the manager is not deregistered when
|
||||
it is shutdown.
|
||||
|
||||
The SIGUSR2 handler starts a drain shutdown.
|
||||
"""
|
||||
signal.signal(signal.SIGUSR1, self._handle_signal)
|
||||
signal.signal(signal.SIGUSR1, self._handle_no_deregister)
|
||||
signal.signal(signal.SIGUSR2, self._handle_drain)
|
||||
|
@ -322,6 +322,8 @@ class BaseConductorManager(object):
|
||||
self._periodic_task_callables = periodic_task_callables
|
||||
|
||||
def keepalive_halt(self):
|
||||
if not hasattr(self, '_keepalive_evt'):
|
||||
return
|
||||
self._keepalive_evt.set()
|
||||
|
||||
def del_host(self, deregister=True, clear_node_reservations=True):
|
||||
@ -329,8 +331,10 @@ class BaseConductorManager(object):
|
||||
# conductor (e.g. when rpc server is unreachable).
|
||||
if not hasattr(self, 'conductor'):
|
||||
return
|
||||
|
||||
# the keepalive heartbeat greenthread will continue to run, but will
|
||||
# now be setting online=False
|
||||
self._shutdown = True
|
||||
self.keepalive_halt()
|
||||
|
||||
if clear_node_reservations:
|
||||
# clear all locks held by this conductor before deregistering
|
||||
|
@ -397,6 +397,14 @@ service_opts = [
|
||||
help=_('Number of retries to hold onto the worker before '
|
||||
'failing or returning the thread to the pool if '
|
||||
'the conductor can automatically retry.')),
|
||||
cfg.IntOpt('drain_shutdown_timeout',
|
||||
mutable=True,
|
||||
default=1800,
|
||||
help=_('Timeout (seconds) after which a server will exit '
|
||||
'from a drain shutdown. Drain shutdowns are '
|
||||
'triggered by sending the signal SIGUSR2. '
|
||||
'Zero value means shutdown will never be triggered by '
|
||||
'a timeout.')),
|
||||
]
|
||||
|
||||
utils_opts = [
|
||||
|
@ -227,3 +227,54 @@ class TestRPCService(db_base.DbTestCase):
|
||||
# returns False
|
||||
mock_sleep.assert_has_calls(
|
||||
[mock.call(15), mock.call(1), mock.call(1)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_drain_has_reserved(self, mock_sleep, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
with mock.patch.object(self.dbapi, 'get_nodeinfo_list',
|
||||
autospec=True) as mock_nodeinfo_list:
|
||||
# 3 calls to manager has_reserved until all reservation locks
|
||||
# are released
|
||||
mock_nodeinfo_list.side_effect = [['a', 'b'], ['a'], []]
|
||||
self.rpc_svc._handle_drain(None, None)
|
||||
self.assertEqual(3, mock_nodeinfo_list.call_count)
|
||||
|
||||
# wait the remaining 15 seconds, then wait until has_reserved
|
||||
# returns False
|
||||
mock_sleep.assert_has_calls(
|
||||
[mock.call(15), mock.call(1), mock.call(1)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
def test_shutdown_timeout_reached(self, mock_utcnow):
|
||||
|
||||
initial_time = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
before_graceful = initial_time + datetime.timedelta(seconds=30)
|
||||
after_graceful = initial_time + datetime.timedelta(seconds=90)
|
||||
before_drain = initial_time + datetime.timedelta(seconds=1700)
|
||||
after_drain = initial_time + datetime.timedelta(seconds=1900)
|
||||
|
||||
mock_utcnow.return_value = before_graceful
|
||||
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
||||
mock_utcnow.return_value = after_graceful
|
||||
self.assertTrue(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
||||
self.rpc_svc.draining = True
|
||||
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
||||
mock_utcnow.return_value = before_drain
|
||||
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
||||
mock_utcnow.return_value = after_drain
|
||||
self.assertTrue(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
||||
CONF.set_override('drain_shutdown_timeout', 0)
|
||||
self.assertFalse(self.rpc_svc._shutdown_timeout_reached(initial_time))
|
||||
|
14
releasenotes/notes/drain-5eafd17e0868e21a.yaml
Normal file
14
releasenotes/notes/drain-5eafd17e0868e21a.yaml
Normal file
@ -0,0 +1,14 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Sending signal ``SIGUSR2`` to a conductor process will now trigger a drain
|
||||
shutdown. This is similar to a ``SIGTERM`` graceful shutdown but the timeout
|
||||
is determined by ``[DEFAULT]drain_shutdown_timeout`` which defaults to
|
||||
``1800`` seconds. This is enough time for running tasks on existing reserved
|
||||
nodes to either complete or reach their own failure timeout.
|
||||
|
||||
During the drain period the conductor will be removed from the hash ring to
|
||||
prevent new tasks from starting. Other conductors will no longer fail
|
||||
reserved nodes on the draining conductor, which previously appeared to be
|
||||
orphaned. This is achieved by running the conductor keepalive heartbeat for
|
||||
this period, but setting the ``online`` state to ``False``.
|
Loading…
Reference in New Issue
Block a user