Respond to rpc requests on stop until hash ring reset
Currently when a conductor is stopped, the rpc service stops responding to requests as soon as self.manager.del_host returns. This means that until the hash ring is reset on the whole cluster, requests can be sent to a service which is stopped. This change waits for the remaining seconds to delay stopping until CONF.hash_ring_reset_interval has elapsed. This will improve the reliability of the cluster when scaling down or rolling out updates. This delay only occurs when there is more than one online conductor, to allow fast restarts on single-node ironic installs (bifrost, metal3). Change-Id: I643eb34f9605532c5c12dd2a42f4ea67bf3e0b40
This commit is contained in:
parent
eb03345006
commit
e54ee2ba4c
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import signal
|
||||
import sys
|
||||
import time
|
||||
@ -24,6 +25,7 @@ from oslo_log import log
|
||||
import oslo_messaging as messaging
|
||||
from oslo_service import service
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ironic.common import context
|
||||
from ironic.common import rpc
|
||||
@ -93,6 +95,26 @@ class RPCService(service.Service):
|
||||
'transport': CONF.rpc_transport})
|
||||
|
||||
def stop(self):
|
||||
initial_time = timeutils.utcnow()
|
||||
extend_time = initial_time + datetime.timedelta(
|
||||
seconds=CONF.hash_ring_reset_interval)
|
||||
|
||||
try:
|
||||
self.manager.del_host(deregister=self.deregister)
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when cleaning up '
|
||||
'the RPC manager. Error: %s', e)
|
||||
|
||||
if self.manager.get_online_conductor_count() > 1:
|
||||
# Delay stopping the server until the hash ring has been
|
||||
# reset on the cluster
|
||||
stop_time = timeutils.utcnow()
|
||||
if stop_time < extend_time:
|
||||
stop_wait = max(0, (extend_time - stop_time).seconds)
|
||||
LOG.info('Waiting %(stop_wait)s seconds for hash ring reset.',
|
||||
{'stop_wait': stop_wait})
|
||||
time.sleep(stop_wait)
|
||||
|
||||
try:
|
||||
if self.rpcserver is not None:
|
||||
self.rpcserver.stop()
|
||||
@ -100,11 +122,6 @@ class RPCService(service.Service):
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when stopping the '
|
||||
'RPC server. Error: %s', e)
|
||||
try:
|
||||
self.manager.del_host(deregister=self.deregister)
|
||||
except Exception as e:
|
||||
LOG.exception('Service error occurred when cleaning up '
|
||||
'the RPC manager. Error: %s', e)
|
||||
|
||||
super(RPCService, self).stop(graceful=True)
|
||||
LOG.info('Stopped RPC server for service %(service)s on host '
|
||||
|
@ -334,6 +334,10 @@ class BaseConductorManager(object):
|
||||
|
||||
self._started = False
|
||||
|
||||
def get_online_conductor_count(self):
|
||||
"""Return a count of currently online conductors"""
|
||||
return len(self.dbapi.get_online_conductors())
|
||||
|
||||
def _register_and_validate_hardware_interfaces(self, hardware_types):
|
||||
"""Register and validate hardware interfaces for this conductor.
|
||||
|
||||
|
@ -10,24 +10,28 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
import oslo_messaging
|
||||
from oslo_service import service as base_service
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from ironic.common import context
|
||||
from ironic.common import rpc
|
||||
from ironic.common import rpc_service
|
||||
from ironic.conductor import manager
|
||||
from ironic.objects import base as objects_base
|
||||
from ironic.tests import base
|
||||
from ironic.tests.unit.db import base as db_base
|
||||
from ironic.tests.unit.db import utils as db_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
@mock.patch.object(base_service.Service, '__init__', lambda *_, **__: None)
|
||||
class TestRPCService(base.TestCase):
|
||||
class TestRPCService(db_base.DbTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRPCService, self).setUp()
|
||||
@ -35,6 +39,7 @@ class TestRPCService(base.TestCase):
|
||||
mgr_module = "ironic.conductor.manager"
|
||||
mgr_class = "ConductorManager"
|
||||
self.rpc_svc = rpc_service.RPCService(host, mgr_module, mgr_class)
|
||||
self.rpc_svc.manager.dbapi = self.dbapi
|
||||
|
||||
@mock.patch.object(manager.ConductorManager, 'prepare_host', autospec=True)
|
||||
@mock.patch.object(oslo_messaging, 'Target', autospec=True)
|
||||
@ -108,3 +113,75 @@ class TestRPCService(base.TestCase):
|
||||
self.assertFalse(self.rpc_svc._started)
|
||||
self.assertIn("boom", self.rpc_svc._failure)
|
||||
self.assertRaises(SystemExit, self.rpc_svc.wait_for_start)
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_instant(self, mock_sleep, mock_utcnow):
|
||||
# del_host returns instantly
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
mock_cond_list.return_value = [conductor1]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# single conductor so exit immediately without waiting
|
||||
mock_sleep.assert_not_called()
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_after_full_reset_interval(self, mock_sleep, mock_utcnow):
|
||||
# del_host returns instantly
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# wait the total CONF.hash_ring_reset_interval 15 seconds
|
||||
mock_sleep.assert_has_calls([mock.call(15)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_after_remaining_interval(self, mock_sleep, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
|
||||
# del_host returns after 5 seconds
|
||||
mock_utcnow.side_effect = [
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 0),
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 5),
|
||||
]
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# wait the remaining 10 seconds
|
||||
mock_sleep.assert_has_calls([mock.call(10)])
|
||||
|
||||
@mock.patch.object(timeutils, 'utcnow', autospec=True)
|
||||
@mock.patch.object(time, 'sleep', autospec=True)
|
||||
def test_stop_slow(self, mock_sleep, mock_utcnow):
|
||||
mock_utcnow.return_value = datetime.datetime(2023, 2, 2, 21, 10, 0)
|
||||
conductor1 = db_utils.get_test_conductor(hostname='fake_host')
|
||||
conductor2 = db_utils.get_test_conductor(hostname='other_fake_host')
|
||||
|
||||
# del_host returns after 16 seconds
|
||||
mock_utcnow.side_effect = [
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 0),
|
||||
datetime.datetime(2023, 2, 2, 21, 10, 16),
|
||||
]
|
||||
with mock.patch.object(self.dbapi, 'get_online_conductors',
|
||||
autospec=True) as mock_cond_list:
|
||||
# multiple conductors, so wait for hash_ring_reset_interval
|
||||
mock_cond_list.return_value = [conductor1, conductor2]
|
||||
self.rpc_svc.stop()
|
||||
|
||||
# no wait required, CONF.hash_ring_reset_interval already exceeded
|
||||
mock_sleep.assert_not_called()
|
||||
|
@ -0,0 +1,13 @@
|
||||
---
|
||||
fixes:
|
||||
- |
|
||||
When a conductor service is stopped it will now continue to respond to RPC
|
||||
requests until ``[DEFAULT]hash_ring_reset_interval`` has elapsed, allowing
|
||||
a hash ring reset to complete on the cluster after conductor is
|
||||
unregistered. This will improve the reliability of the cluster when scaling
|
||||
down or rolling out updates.
|
||||
|
||||
This delay only occurs when there is more than one online conductor,
|
||||
to allow fast restarts on single-node ironic installs (bifrost,
|
||||
metal3).
|
||||
|
Loading…
Reference in New Issue
Block a user