Merge "Prioritize sloppy nodes for power sync"
This commit is contained in:
commit
e34941c327
@ -1665,20 +1665,28 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
def _sync_power_states(self, context):
|
||||
"""Periodic task to sync power states for the nodes."""
|
||||
filters = {'maintenance': False}
|
||||
nodes = queue.Queue()
|
||||
for node_info in self.iter_nodes(fields=['id'], filters=filters):
|
||||
nodes.put(node_info)
|
||||
|
||||
# NOTE(etingof): prioritize non-responding nodes to fail them fast
|
||||
nodes = sorted(
|
||||
self.iter_nodes(fields=['id'], filters=filters),
|
||||
key=lambda n: -self.power_state_sync_count.get(n[0], 0)
|
||||
)
|
||||
|
||||
nodes_queue = queue.Queue()
|
||||
|
||||
for node_info in nodes:
|
||||
nodes_queue.put(node_info)
|
||||
|
||||
number_of_workers = min(CONF.conductor.sync_power_state_workers,
|
||||
CONF.conductor.periodic_max_workers,
|
||||
nodes.qsize())
|
||||
nodes_queue.qsize())
|
||||
futures = []
|
||||
|
||||
for worker_number in range(max(0, number_of_workers - 1)):
|
||||
try:
|
||||
futures.append(
|
||||
self._spawn_worker(self._sync_power_state_nodes_task,
|
||||
context, nodes))
|
||||
context, nodes_queue))
|
||||
except exception.NoFreeConductorWorker:
|
||||
LOG.warning("There are no more conductor workers for "
|
||||
"power sync task. %(workers)d workers have "
|
||||
@ -1687,7 +1695,7 @@ class ConductorManager(base_manager.BaseConductorManager):
|
||||
break
|
||||
|
||||
try:
|
||||
self._sync_power_state_nodes_task(context, nodes)
|
||||
self._sync_power_state_nodes_task(context, nodes_queue)
|
||||
|
||||
finally:
|
||||
waiters.wait_for_all(futures)
|
||||
|
@ -7160,7 +7160,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 9)):
|
||||
new=mock.MagicMock(return_value=[[0]] * 9)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
@ -7174,7 +7174,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 6)):
|
||||
new=mock.MagicMock(return_value=[[0]] * 6)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
@ -7188,7 +7188,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
CONF.set_override('sync_power_state_workers', 8, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None])):
|
||||
new=mock.MagicMock(return_value=[[0]])):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
@ -7202,7 +7202,7 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||
|
||||
with mock.patch.object(self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[None] * 9)):
|
||||
new=mock.MagicMock(return_value=[[0]] * 9)):
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
@ -7210,6 +7210,26 @@ class ParallelPowerSyncTestCase(mgr_utils.CommonMixIn, db_base.DbTestCase):
|
||||
self.assertEqual(1, sync_mock.call_count)
|
||||
self.assertEqual(1, waiter_mock.call_count)
|
||||
|
||||
@mock.patch.object(queue, 'Queue', autospec=True)
|
||||
def test__sync_power_states_node_prioritization(
|
||||
self, queue_mock, sync_mock, spawn_mock, waiter_mock):
|
||||
|
||||
CONF.set_override('sync_power_state_workers', 1, group='conductor')
|
||||
|
||||
with mock.patch.object(
|
||||
self.service, 'iter_nodes',
|
||||
new=mock.MagicMock(return_value=[[0], [1], [2]])
|
||||
), mock.patch.dict(
|
||||
self.service.power_state_sync_count,
|
||||
{0: 1, 1: 0, 2: 2}, clear=True):
|
||||
|
||||
queue_mock.return_value.qsize.return_value = 0
|
||||
|
||||
self.service._sync_power_states(self.context)
|
||||
|
||||
expected_calls = [mock.call([2]), mock.call([0]), mock.call([1])]
|
||||
queue_mock.return_value.put.assert_has_calls(expected_calls)
|
||||
|
||||
|
||||
@mock.patch.object(task_manager, 'acquire')
|
||||
@mock.patch.object(manager.ConductorManager, '_mapped_to_this_conductor')
|
||||
|
Loading…
x
Reference in New Issue
Block a user