Merge "Respect co-builder partition moves when min_part_hours is zero"
This commit is contained in:
commit
bd76a8deb2
@ -130,7 +130,7 @@ class RingBuilder(object):
|
|||||||
# within a given number of hours (24 is my usual test). Removing
|
# within a given number of hours (24 is my usual test). Removing
|
||||||
# a device overrides this behavior as it's assumed that's only
|
# a device overrides this behavior as it's assumed that's only
|
||||||
# done because of device failure.
|
# done because of device failure.
|
||||||
self._last_part_moves = None
|
self._last_part_moves = array('B', itertools.repeat(0, self.parts))
|
||||||
# _part_moved_bitmap record parts have been moved
|
# _part_moved_bitmap record parts have been moved
|
||||||
self._part_moved_bitmap = None
|
self._part_moved_bitmap = None
|
||||||
# _last_part_moves_epoch indicates the time the offsets in
|
# _last_part_moves_epoch indicates the time the offsets in
|
||||||
@ -167,7 +167,7 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def ever_rebalanced(self):
|
def ever_rebalanced(self):
|
||||||
return self._last_part_moves is not None
|
return self._replica2part2dev is not None
|
||||||
|
|
||||||
def _set_part_moved(self, part):
|
def _set_part_moved(self, part):
|
||||||
self._last_part_moves[part] = 0
|
self._last_part_moves[part] = 0
|
||||||
@ -507,7 +507,7 @@ class RingBuilder(object):
|
|||||||
|
|
||||||
if not self.ever_rebalanced:
|
if not self.ever_rebalanced:
|
||||||
self.logger.debug("New builder; performing initial balance")
|
self.logger.debug("New builder; performing initial balance")
|
||||||
self._last_part_moves = array('B', itertools.repeat(0, self.parts))
|
|
||||||
self._update_last_part_moves()
|
self._update_last_part_moves()
|
||||||
|
|
||||||
with _set_random_seed(seed):
|
with _set_random_seed(seed):
|
||||||
@ -925,7 +925,7 @@ class RingBuilder(object):
|
|||||||
"""
|
"""
|
||||||
self._part_moved_bitmap = bytearray(max(2 ** (self.part_power - 3), 1))
|
self._part_moved_bitmap = bytearray(max(2 ** (self.part_power - 3), 1))
|
||||||
elapsed_hours = int(time() - self._last_part_moves_epoch) / 3600
|
elapsed_hours = int(time() - self._last_part_moves_epoch) / 3600
|
||||||
if elapsed_hours <= 0 or not self._last_part_moves:
|
if elapsed_hours <= 0:
|
||||||
return
|
return
|
||||||
for part in range(self.parts):
|
for part in range(self.parts):
|
||||||
# The "min(self._last_part_moves[part] + elapsed_hours, 0xff)"
|
# The "min(self._last_part_moves[part] + elapsed_hours, 0xff)"
|
||||||
|
@ -639,6 +639,7 @@ class CompositeRingBuilder(object):
|
|||||||
component builder.
|
component builder.
|
||||||
"""
|
"""
|
||||||
self._load_components()
|
self._load_components()
|
||||||
|
self.update_last_part_moves()
|
||||||
component_builders = zip(self._builder_files, self._builders)
|
component_builders = zip(self._builder_files, self._builders)
|
||||||
# don't let the same builder go first each time
|
# don't let the same builder go first each time
|
||||||
shuffle(component_builders)
|
shuffle(component_builders)
|
||||||
@ -678,10 +679,10 @@ class CompositeRingBuilder(object):
|
|||||||
Updates the record of how many hours ago each partition was moved in
|
Updates the record of how many hours ago each partition was moved in
|
||||||
all component builders.
|
all component builders.
|
||||||
"""
|
"""
|
||||||
# Called by component builders. We need all component builders to be at
|
# Called at start of each composite rebalance. We need all component
|
||||||
# same last_part_moves epoch before any builder starts moving parts;
|
# builders to be at same last_part_moves epoch before any builder
|
||||||
# this will effectively be a no-op for builders that have already been
|
# starts moving parts; this will effectively be a no-op for builders
|
||||||
# updated in last hour
|
# that have already been updated in last hour
|
||||||
for b in self._builders:
|
for b in self._builders:
|
||||||
b.update_last_part_moves()
|
b.update_last_part_moves()
|
||||||
|
|
||||||
@ -723,8 +724,11 @@ class CooperativeRingBuilder(RingBuilder):
|
|||||||
super(CooperativeRingBuilder, self)._can_part_move(part))
|
super(CooperativeRingBuilder, self)._can_part_move(part))
|
||||||
|
|
||||||
def _update_last_part_moves(self):
|
def _update_last_part_moves(self):
|
||||||
# overrides superclass method to delegate to parent builder
|
# overrides superclass method - parent builder should have called
|
||||||
return self.parent_builder.update_last_part_moves()
|
# update_last_part_moves() before rebalance; calling the superclass
|
||||||
|
# method here would reset _part_moved_bitmap which is state we rely on
|
||||||
|
# when min_part_hours is zero
|
||||||
|
pass
|
||||||
|
|
||||||
def update_last_part_moves(self):
|
def update_last_part_moves(self):
|
||||||
"""
|
"""
|
||||||
|
@ -69,6 +69,7 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
self.assertEqual(rb.devs, [])
|
self.assertEqual(rb.devs, [])
|
||||||
self.assertFalse(rb.devs_changed)
|
self.assertFalse(rb.devs_changed)
|
||||||
self.assertEqual(rb.version, 0)
|
self.assertEqual(rb.version, 0)
|
||||||
|
self.assertIsNotNone(rb._last_part_moves)
|
||||||
|
|
||||||
def test_overlarge_part_powers(self):
|
def test_overlarge_part_powers(self):
|
||||||
expected_msg = 'part_power must be at most 32 (was 33)'
|
expected_msg = 'part_power must be at most 32 (was 33)'
|
||||||
@ -841,6 +842,25 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
}
|
}
|
||||||
self.assertEqual(parts_with_moved_count, expected)
|
self.assertEqual(parts_with_moved_count, expected)
|
||||||
|
|
||||||
|
def test_ever_rebalanced(self):
|
||||||
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
|
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 1, 'region': 0, 'zone': 1, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10001, 'device': 'sda1'})
|
||||||
|
rb.add_dev({'id': 2, 'region': 0, 'zone': 2, 'weight': 1,
|
||||||
|
'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'})
|
||||||
|
self.assertFalse(rb.ever_rebalanced)
|
||||||
|
builder_file = os.path.join(self.testdir, 'test.buider')
|
||||||
|
rb.save(builder_file)
|
||||||
|
rb = ring.RingBuilder.load(builder_file)
|
||||||
|
self.assertFalse(rb.ever_rebalanced)
|
||||||
|
rb.rebalance()
|
||||||
|
self.assertTrue(rb.ever_rebalanced)
|
||||||
|
rb.save(builder_file)
|
||||||
|
rb = ring.RingBuilder.load(builder_file)
|
||||||
|
self.assertTrue(rb.ever_rebalanced)
|
||||||
|
|
||||||
def test_rerebalance(self):
|
def test_rerebalance(self):
|
||||||
rb = ring.RingBuilder(8, 3, 1)
|
rb = ring.RingBuilder(8, 3, 1)
|
||||||
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
|
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
|
||||||
@ -849,13 +869,16 @@ class TestRingBuilder(unittest.TestCase):
|
|||||||
'ip': '127.0.0.1', 'port': 10001, 'device': 'sda1'})
|
'ip': '127.0.0.1', 'port': 10001, 'device': 'sda1'})
|
||||||
rb.add_dev({'id': 2, 'region': 0, 'zone': 2, 'weight': 1,
|
rb.add_dev({'id': 2, 'region': 0, 'zone': 2, 'weight': 1,
|
||||||
'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'})
|
'ip': '127.0.0.1', 'port': 10002, 'device': 'sda1'})
|
||||||
|
self.assertFalse(rb.ever_rebalanced)
|
||||||
rb.rebalance()
|
rb.rebalance()
|
||||||
|
self.assertTrue(rb.ever_rebalanced)
|
||||||
counts = self._partition_counts(rb)
|
counts = self._partition_counts(rb)
|
||||||
self.assertEqual(counts, {0: 256, 1: 256, 2: 256})
|
self.assertEqual(counts, {0: 256, 1: 256, 2: 256})
|
||||||
rb.add_dev({'id': 3, 'region': 0, 'zone': 3, 'weight': 1,
|
rb.add_dev({'id': 3, 'region': 0, 'zone': 3, 'weight': 1,
|
||||||
'ip': '127.0.0.1', 'port': 10003, 'device': 'sda1'})
|
'ip': '127.0.0.1', 'port': 10003, 'device': 'sda1'})
|
||||||
rb.pretend_min_part_hours_passed()
|
rb.pretend_min_part_hours_passed()
|
||||||
rb.rebalance()
|
rb.rebalance()
|
||||||
|
self.assertTrue(rb.ever_rebalanced)
|
||||||
counts = self._partition_counts(rb)
|
counts = self._partition_counts(rb)
|
||||||
self.assertEqual(counts, {0: 192, 1: 192, 2: 192, 3: 192})
|
self.assertEqual(counts, {0: 192, 1: 192, 2: 192, 3: 192})
|
||||||
rb.set_dev_weight(3, 100)
|
rb.set_dev_weight(3, 100)
|
||||||
|
@ -22,6 +22,7 @@ import tempfile
|
|||||||
import unittest
|
import unittest
|
||||||
import shutil
|
import shutil
|
||||||
import copy
|
import copy
|
||||||
|
import time
|
||||||
|
|
||||||
from collections import defaultdict, Counter
|
from collections import defaultdict, Counter
|
||||||
|
|
||||||
@ -965,8 +966,9 @@ class TestComposeLoadComponents(TestLoadComponents):
|
|||||||
|
|
||||||
|
|
||||||
class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
|
class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
|
||||||
def _make_coop_builder(self, region, composite_builder, rebalance=False):
|
def _make_coop_builder(self, region, composite_builder, rebalance=False,
|
||||||
rb = CooperativeRingBuilder(8, 3, 1, composite_builder)
|
min_part_hours=1):
|
||||||
|
rb = CooperativeRingBuilder(8, 3, min_part_hours, composite_builder)
|
||||||
if composite_builder._builders is None:
|
if composite_builder._builders is None:
|
||||||
composite_builder._builders = [rb]
|
composite_builder._builders = [rb]
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
@ -986,93 +988,231 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
|
|||||||
for part2dev_id in builder._replica2part2dev
|
for part2dev_id in builder._replica2part2dev
|
||||||
for dev_id in part2dev_id)
|
for dev_id in part2dev_id)
|
||||||
|
|
||||||
|
def get_moved_parts(self, after, before):
|
||||||
|
def uniqueness(dev):
|
||||||
|
return dev['ip'], dev['port'], dev['device']
|
||||||
|
moved_parts = set()
|
||||||
|
for p in range(before.parts):
|
||||||
|
if ({uniqueness(dev) for dev in before._devs_for_part(p)} !=
|
||||||
|
{uniqueness(dev) for dev in after._devs_for_part(p)}):
|
||||||
|
moved_parts.add(p)
|
||||||
|
return moved_parts
|
||||||
|
|
||||||
|
def num_parts_can_move(self, builder):
|
||||||
|
# note that can_part_move() gives consideration to the
|
||||||
|
# _part_moved_bitmap which is only reset when a rebalance starts
|
||||||
|
return len(
|
||||||
|
[p for p in range(builder.parts)
|
||||||
|
if super(CooperativeRingBuilder, builder)._can_part_move(p)])
|
||||||
|
|
||||||
@mock.patch('swift.common.ring.builder.time')
|
@mock.patch('swift.common.ring.builder.time')
|
||||||
def test_rebalance_respects_cobuilder_part_moves(self, mock_time):
|
def _check_rebalance_respects_cobuilder_part_moves(
|
||||||
def do_rebalance(builder):
|
self, min_part_hours, mock_time):
|
||||||
old_part_devs = [builder._devs_for_part(part)
|
mock_time.return_value = now = int(time.time())
|
||||||
for part in range(builder.parts)]
|
builder_files = []
|
||||||
num_moved, _, _ = builder.rebalance()
|
|
||||||
moved_parts = {
|
|
||||||
p for p in range(builder.parts)
|
|
||||||
if old_part_devs[p] != builder._devs_for_part(p)}
|
|
||||||
self.assertEqual(len(moved_parts), num_moved) # sanity check
|
|
||||||
return num_moved, moved_parts
|
|
||||||
|
|
||||||
def num_parts_can_move(builder):
|
|
||||||
# note that can_part_move() gives consideration to the
|
|
||||||
# _part_moved_bitmap which is only reset when a rebalance starts
|
|
||||||
return len(
|
|
||||||
[p for p in range(builder.parts)
|
|
||||||
if super(CooperativeRingBuilder, builder)._can_part_move(p)])
|
|
||||||
|
|
||||||
mock_time.return_value = 0
|
|
||||||
cb = CompositeRingBuilder()
|
cb = CompositeRingBuilder()
|
||||||
rb1 = self._make_coop_builder(1, cb)
|
for i in (1, 2, 3):
|
||||||
rb2 = self._make_coop_builder(2, cb)
|
b = self._make_coop_builder(i, cb, min_part_hours=min_part_hours)
|
||||||
rb3 = self._make_coop_builder(3, cb)
|
fname = os.path.join(self.tmpdir, 'builder_%s.builder' % i)
|
||||||
cb._builders = [rb1, rb2, rb3]
|
b.save(fname)
|
||||||
|
builder_files.append(fname)
|
||||||
|
builder_files, builders = cb.load_components(builder_files)
|
||||||
|
|
||||||
# all cobuilders can perform initial rebalance
|
# all cobuilders can perform initial rebalance
|
||||||
for rb in (rb1, rb2, rb3):
|
cb.rebalance()
|
||||||
rb.rebalance()
|
exp = {0: 256, 1: 256, 2: 256}
|
||||||
actual = self._partition_counts(rb)
|
self.assertEqual(exp, self._partition_counts(builders[0]))
|
||||||
exp = {0: 256, 1: 256, 2: 256}
|
self.assertEqual(exp, self._partition_counts(builders[1]))
|
||||||
self.assertEqual(exp, actual,
|
self.assertEqual(exp, self._partition_counts(builders[2]))
|
||||||
'Expected %s but got %s for region %s' %
|
exp = min_part_hours * 3600
|
||||||
(exp, actual, next(rb._iter_devs())['region']))
|
self.assertEqual(exp, builders[0].min_part_seconds_left)
|
||||||
|
self.assertEqual(exp, builders[1].min_part_seconds_left)
|
||||||
|
self.assertEqual(exp, builders[2].min_part_seconds_left)
|
||||||
|
|
||||||
# jump forwards min_part_hours, both builders can move all parts
|
# jump forwards min_part_hours
|
||||||
mock_time.return_value = 3600
|
now += min_part_hours * 3600
|
||||||
self.add_dev(rb1)
|
mock_time.return_value = now
|
||||||
# sanity checks: rb1 and rb2 are both ready for rebalance
|
old_builders = []
|
||||||
self.assertEqual(0, rb2.min_part_seconds_left)
|
for builder in builders:
|
||||||
self.assertEqual(0, rb1.min_part_seconds_left)
|
old_builder = CooperativeRingBuilder(8, 3, min_part_hours, None)
|
||||||
|
old_builder.copy_from(copy.deepcopy(builder.to_dict()))
|
||||||
|
old_builders.append(old_builder)
|
||||||
|
|
||||||
|
for builder in builders:
|
||||||
|
self.add_dev(builder)
|
||||||
|
# sanity checks: all builders are ready for rebalance
|
||||||
|
self.assertEqual(0, builders[0].min_part_seconds_left)
|
||||||
|
self.assertEqual(0, builders[1].min_part_seconds_left)
|
||||||
|
self.assertEqual(0, builders[2].min_part_seconds_left)
|
||||||
# ... but last_part_moves not yet updated to current epoch
|
# ... but last_part_moves not yet updated to current epoch
|
||||||
self.assertEqual(0, num_parts_can_move(rb1))
|
if min_part_hours > 0:
|
||||||
self.assertEqual(0, num_parts_can_move(rb2))
|
self.assertEqual(0, self.num_parts_can_move(builders[0]))
|
||||||
# rebalancing rb1 will update epoch for both builders' last_part_moves
|
self.assertEqual(0, self.num_parts_can_move(builders[1]))
|
||||||
num_moved, rb1_parts_moved = do_rebalance(rb1)
|
self.assertEqual(0, self.num_parts_can_move(builders[2]))
|
||||||
self.assertEqual(192, num_moved)
|
|
||||||
self.assertEqual(self._partition_counts(rb1),
|
with mock.patch('swift.common.ring.composite_builder.shuffle',
|
||||||
|
lambda x: x):
|
||||||
|
cb.rebalance()
|
||||||
|
|
||||||
|
rb1_parts_moved = self.get_moved_parts(builders[0], old_builders[0])
|
||||||
|
self.assertEqual(192, len(rb1_parts_moved))
|
||||||
|
self.assertEqual(self._partition_counts(builders[0]),
|
||||||
{0: 192, 1: 192, 2: 192, 3: 192})
|
{0: 192, 1: 192, 2: 192, 3: 192})
|
||||||
|
|
||||||
|
rb2_parts_moved = self.get_moved_parts(builders[1], old_builders[1])
|
||||||
|
self.assertEqual(64, len(rb2_parts_moved))
|
||||||
|
counts = self._partition_counts(builders[1])
|
||||||
|
self.assertEqual(counts[3], 64)
|
||||||
|
self.assertEqual([234, 235, 235], sorted(counts.values()[:3]))
|
||||||
|
self.assertFalse(rb2_parts_moved.intersection(rb1_parts_moved))
|
||||||
|
|
||||||
|
# rb3 can't rebalance - all parts moved while rebalancing rb1 and rb2
|
||||||
|
self.assertEqual(
|
||||||
|
0, len(self.get_moved_parts(builders[2], old_builders[2])))
|
||||||
|
|
||||||
|
# jump forwards min_part_hours, all builders can move all parts again,
|
||||||
|
# so now rb2 should be able to further rebalance
|
||||||
|
now += min_part_hours * 3600
|
||||||
|
mock_time.return_value = now
|
||||||
|
old_builders = []
|
||||||
|
for builder in builders:
|
||||||
|
old_builder = CooperativeRingBuilder(8, 3, min_part_hours, None)
|
||||||
|
old_builder.copy_from(copy.deepcopy(builder.to_dict()))
|
||||||
|
old_builders.append(old_builder)
|
||||||
|
with mock.patch('swift.common.ring.composite_builder.shuffle',
|
||||||
|
lambda x: x):
|
||||||
|
cb.rebalance()
|
||||||
|
|
||||||
|
rb2_parts_moved = self.get_moved_parts(builders[1], old_builders[1])
|
||||||
|
self.assertGreater(len(rb2_parts_moved), 64)
|
||||||
|
self.assertGreater(self._partition_counts(builders[1])[3], 64)
|
||||||
|
self.assertLess(self.num_parts_can_move(builders[2]), 256)
|
||||||
|
self.assertEqual(256, self.num_parts_can_move(builders[0]))
|
||||||
|
# and rb3 should also have been able to move some parts
|
||||||
|
rb3_parts_moved = self.get_moved_parts(builders[2], old_builders[2])
|
||||||
|
self.assertGreater(len(rb3_parts_moved), 0)
|
||||||
|
self.assertFalse(rb3_parts_moved.intersection(rb2_parts_moved))
|
||||||
|
|
||||||
|
# but cobuilders will not prevent a new rb rebalancing for first time
|
||||||
|
rb4 = self._make_coop_builder(4, cb, rebalance=False,
|
||||||
|
min_part_hours=min_part_hours)
|
||||||
|
builders.append(rb4)
|
||||||
|
builder_files = []
|
||||||
|
for i, builder in enumerate(builders):
|
||||||
|
fname = os.path.join(self.tmpdir, 'builder_%s.builder' % i)
|
||||||
|
builder.save(fname)
|
||||||
|
builder_files.append(fname)
|
||||||
|
cb = CompositeRingBuilder()
|
||||||
|
builder_files, builders = cb.load_components(builder_files)
|
||||||
|
cb.rebalance()
|
||||||
|
self.assertEqual(256, len(self.get_moved_parts(builders[3], rb4)))
|
||||||
|
|
||||||
|
def test_rebalance_respects_cobuilder_part_moves(self):
|
||||||
|
self._check_rebalance_respects_cobuilder_part_moves(1)
|
||||||
|
self._check_rebalance_respects_cobuilder_part_moves(0)
|
||||||
|
|
||||||
|
@mock.patch('swift.common.ring.builder.time')
|
||||||
|
def _check_rebalance_cobuilder_states(
|
||||||
|
self, min_part_hours, mock_time):
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def mock_rebalance():
|
||||||
|
# wrap rebalance() in order to capture builder states before and
|
||||||
|
# after each component rebalance
|
||||||
|
orig_rebalance = RingBuilder.rebalance
|
||||||
|
# a dict mapping builder -> (list of captured builder states)
|
||||||
|
captured_builder_states = defaultdict(list)
|
||||||
|
|
||||||
|
def update_states():
|
||||||
|
for b in cb._builders:
|
||||||
|
rb = CooperativeRingBuilder(8, 3, min_part_hours, None)
|
||||||
|
rb.copy_from(copy.deepcopy(b.to_dict()))
|
||||||
|
rb._part_moved_bitmap = bytearray(b._part_moved_bitmap)
|
||||||
|
captured_builder_states[b].append(rb)
|
||||||
|
|
||||||
|
def wrap_rebalance(builder_instance):
|
||||||
|
update_states()
|
||||||
|
results = orig_rebalance(builder_instance)
|
||||||
|
update_states()
|
||||||
|
return results
|
||||||
|
|
||||||
|
with mock.patch('swift.common.ring.RingBuilder.rebalance',
|
||||||
|
wrap_rebalance):
|
||||||
|
yield captured_builder_states
|
||||||
|
|
||||||
|
mock_time.return_value = now = int(time.time())
|
||||||
|
builder_files = []
|
||||||
|
cb = CompositeRingBuilder()
|
||||||
|
for i in (1, 2, 3):
|
||||||
|
b = self._make_coop_builder(i, cb, min_part_hours=min_part_hours)
|
||||||
|
fname = os.path.join(self.tmpdir, 'builder_%s.builder' % i)
|
||||||
|
b.save(fname)
|
||||||
|
builder_files.append(fname)
|
||||||
|
builder_files, builders = cb.load_components(builder_files)
|
||||||
|
|
||||||
|
# all cobuilders can perform initial rebalance
|
||||||
|
cb.rebalance()
|
||||||
|
# jump forwards min_part_hours
|
||||||
|
now += min_part_hours * 3600
|
||||||
|
mock_time.return_value = now
|
||||||
|
for builder in builders:
|
||||||
|
self.add_dev(builder)
|
||||||
|
|
||||||
|
with mock.patch('swift.common.ring.composite_builder.shuffle',
|
||||||
|
lambda x: x):
|
||||||
|
with mock_rebalance() as captured_states:
|
||||||
|
cb.rebalance()
|
||||||
|
|
||||||
|
# sanity - state captured before and after each component rebalance
|
||||||
|
self.assertEqual(len(builders), len(captured_states))
|
||||||
|
for states in captured_states.values():
|
||||||
|
self.assertEqual(2 * len(builders), len(states))
|
||||||
|
# for each component we have a list of it's builder states
|
||||||
|
rb1s = captured_states[builders[0]]
|
||||||
|
rb2s = captured_states[builders[1]]
|
||||||
|
rb3s = captured_states[builders[2]]
|
||||||
|
|
||||||
|
# rebalancing will update epoch for all builders' last_part_moves
|
||||||
|
self.assertEqual(now, rb1s[0]._last_part_moves_epoch)
|
||||||
|
self.assertEqual(now, rb2s[0]._last_part_moves_epoch)
|
||||||
|
self.assertEqual(now, rb3s[0]._last_part_moves_epoch)
|
||||||
|
# so, in state before any component rebalance, all can now move parts
|
||||||
# N.B. num_parts_can_move gathers super class's (i.e. RingBuilder)
|
# N.B. num_parts_can_move gathers super class's (i.e. RingBuilder)
|
||||||
# _can_part_move so that it doesn't refer cobuilders state.
|
# _can_part_move so that it doesn't refer to cobuilders state.
|
||||||
self.assertEqual(256, num_parts_can_move(rb2))
|
self.assertEqual(256, self.num_parts_can_move(rb1s[0]))
|
||||||
self.assertEqual(64, num_parts_can_move(rb1))
|
self.assertEqual(256, self.num_parts_can_move(rb2s[0]))
|
||||||
|
self.assertEqual(256, self.num_parts_can_move(rb3s[0]))
|
||||||
|
|
||||||
|
# after first component has been rebalanced it has moved parts
|
||||||
|
self.assertEqual(64, self.num_parts_can_move(rb1s[1]))
|
||||||
|
self.assertEqual(256, self.num_parts_can_move(rb2s[2]))
|
||||||
|
self.assertEqual(256, self.num_parts_can_move(rb3s[2]))
|
||||||
|
|
||||||
|
rb1_parts_moved = self.get_moved_parts(rb1s[1], rb1s[0])
|
||||||
|
self.assertEqual(192, len(rb1_parts_moved))
|
||||||
|
self.assertEqual(self._partition_counts(rb1s[1]),
|
||||||
|
{0: 192, 1: 192, 2: 192, 3: 192})
|
||||||
|
|
||||||
# rebalancing rb2 - rb2 in isolation could potentially move all parts
|
# rebalancing rb2 - rb2 in isolation could potentially move all parts
|
||||||
# so would move 192 parts to new device, but it is constrained by rb1
|
# so would move 192 parts to new device, but it is constrained by rb1
|
||||||
# only having 64 parts that can move
|
# only having 64 parts that can move
|
||||||
self.add_dev(rb2)
|
rb2_parts_moved = self.get_moved_parts(rb2s[3], rb2s[2])
|
||||||
num_moved, rb2_parts_moved = do_rebalance(rb2)
|
self.assertEqual(64, len(rb2_parts_moved))
|
||||||
self.assertEqual(64, num_moved)
|
counts = self._partition_counts(rb2s[3])
|
||||||
counts = self._partition_counts(rb2)
|
|
||||||
self.assertEqual(counts[3], 64)
|
self.assertEqual(counts[3], 64)
|
||||||
self.assertEqual([234, 235, 235], sorted(counts.values()[:3]))
|
self.assertEqual([234, 235, 235], sorted(counts.values()[:3]))
|
||||||
self.assertFalse(rb2_parts_moved.intersection(rb1_parts_moved))
|
self.assertFalse(rb2_parts_moved.intersection(rb1_parts_moved))
|
||||||
self.assertEqual(192, num_parts_can_move(rb2))
|
self.assertEqual(192, self.num_parts_can_move(rb2s[3]))
|
||||||
self.assertEqual(64, num_parts_can_move(rb1))
|
self.assertEqual(64, self.num_parts_can_move(rb1s[3]))
|
||||||
|
|
||||||
# rb3 can't rebalance - all parts moved while rebalancing rb1 and rb2
|
# rb3 can't rebalance - all parts moved while rebalancing rb1 and rb2
|
||||||
self.add_dev(rb3)
|
self.assertEqual(0, len(self.get_moved_parts(rb3s[5], rb3s[0])))
|
||||||
num_moved, rb3_parts_moved = do_rebalance(rb3)
|
|
||||||
self.assertEqual(0, num_moved)
|
|
||||||
|
|
||||||
# jump forwards min_part_hours, both builders can move all parts again,
|
def test_rebalance_cobuilder_states(self):
|
||||||
# so now rb2 should be able to further rebalance
|
self._check_rebalance_cobuilder_states(1)
|
||||||
mock_time.return_value = 7200
|
self._check_rebalance_cobuilder_states(0)
|
||||||
do_rebalance(rb2)
|
|
||||||
self.assertGreater(self._partition_counts(rb2)[3], 64)
|
|
||||||
self.assertLess(num_parts_can_move(rb2), 256)
|
|
||||||
self.assertEqual(256, num_parts_can_move(rb1)) # sanity check
|
|
||||||
|
|
||||||
# but cobuilders will not prevent a rb rebalancing for first time
|
def _check_rebalance_cobuilders_calls(self, min_part_hours):
|
||||||
rb4 = self._make_coop_builder(4, cb, rebalance=False)
|
|
||||||
cb._builders.append(rb4)
|
|
||||||
num_moved, _, _ = rb4.rebalance()
|
|
||||||
self.assertEqual(3 * 256, num_moved)
|
|
||||||
|
|
||||||
def test_rebalance_cobuilders(self):
|
|
||||||
# verify that co-builder methods are called during one builder's
|
# verify that co-builder methods are called during one builder's
|
||||||
# rebalance
|
# rebalance
|
||||||
@contextmanager
|
@contextmanager
|
||||||
@ -1107,26 +1247,20 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
|
|||||||
fake_can_part_move):
|
fake_can_part_move):
|
||||||
yield calls
|
yield calls
|
||||||
|
|
||||||
# single component builder in parent builder
|
|
||||||
cb = CompositeRingBuilder()
|
cb = CompositeRingBuilder()
|
||||||
rb1 = self._make_coop_builder(1, cb)
|
rb1 = self._make_coop_builder(1, cb, min_part_hours=min_part_hours)
|
||||||
with mock_update_last_part_moves() as update_calls:
|
rb2 = self._make_coop_builder(2, cb, min_part_hours=min_part_hours)
|
||||||
with mock_can_part_move() as can_part_move_calls:
|
|
||||||
rb1.rebalance()
|
|
||||||
self.assertEqual([rb1], update_calls)
|
|
||||||
self.assertEqual([rb1], can_part_move_calls.keys())
|
|
||||||
self.assertEqual(768, len(can_part_move_calls[rb1]))
|
|
||||||
|
|
||||||
# two component builders with same parent builder
|
|
||||||
cb = CompositeRingBuilder()
|
|
||||||
rb1 = self._make_coop_builder(1, cb)
|
|
||||||
rb2 = self._make_coop_builder(2, cb)
|
|
||||||
cb._builders = [rb1, rb2]
|
cb._builders = [rb1, rb2]
|
||||||
|
# composite rebalance updates last_part_moves before any component
|
||||||
|
# rebalance - after that expect no more updates
|
||||||
|
with mock_update_last_part_moves() as update_calls:
|
||||||
|
cb.update_last_part_moves()
|
||||||
|
self.assertEqual(sorted([rb1, rb2]), sorted(update_calls))
|
||||||
|
|
||||||
with mock_update_last_part_moves() as update_calls:
|
with mock_update_last_part_moves() as update_calls:
|
||||||
with mock_can_part_move() as can_part_move_calls:
|
with mock_can_part_move() as can_part_move_calls:
|
||||||
rb2.rebalance()
|
rb2.rebalance()
|
||||||
# both builders get updated
|
self.assertFalse(update_calls)
|
||||||
self.assertEqual(sorted([rb1, rb2]), sorted(update_calls))
|
|
||||||
# rb1 has never been rebalanced so no calls propagate from its
|
# rb1 has never been rebalanced so no calls propagate from its
|
||||||
# can_part_move method to its superclass _can_part_move method
|
# can_part_move method to its superclass _can_part_move method
|
||||||
self.assertEqual([rb2], can_part_move_calls.keys())
|
self.assertEqual([rb2], can_part_move_calls.keys())
|
||||||
@ -1134,14 +1268,16 @@ class TestCooperativeRingBuilder(BaseTestCompositeBuilder):
|
|||||||
with mock_update_last_part_moves() as update_calls:
|
with mock_update_last_part_moves() as update_calls:
|
||||||
with mock_can_part_move() as can_part_move_calls:
|
with mock_can_part_move() as can_part_move_calls:
|
||||||
rb1.rebalance()
|
rb1.rebalance()
|
||||||
# both builders get updated
|
self.assertFalse(update_calls)
|
||||||
self.assertEqual(sorted([rb1, rb2]), sorted(update_calls))
|
|
||||||
|
|
||||||
# rb1 is being rebalanced so gets checked, and rb2 also gets checked
|
# rb1 is being rebalanced so gets checked, and rb2 also gets checked
|
||||||
self.assertEqual(sorted([rb1, rb2]), sorted(can_part_move_calls))
|
self.assertEqual(sorted([rb1, rb2]), sorted(can_part_move_calls))
|
||||||
self.assertEqual(768, len(can_part_move_calls[rb1]))
|
self.assertEqual(768, len(can_part_move_calls[rb1]))
|
||||||
self.assertEqual(768, len(can_part_move_calls[rb2]))
|
self.assertEqual(768, len(can_part_move_calls[rb2]))
|
||||||
|
|
||||||
|
def test_rebalance_cobuilders_calls(self):
|
||||||
|
self._check_rebalance_cobuilders_calls(1)
|
||||||
|
self._check_rebalance_cobuilders_calls(0)
|
||||||
|
|
||||||
def test_save_then_load(self):
|
def test_save_then_load(self):
|
||||||
cb = CompositeRingBuilder()
|
cb = CompositeRingBuilder()
|
||||||
coop_rb = self._make_coop_builder(1, cb, rebalance=True)
|
coop_rb = self._make_coop_builder(1, cb, rebalance=True)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user