Merge "Stop moving partitions unnecessarily when overload is on."

This commit is contained in:
Jenkins 2015-07-10 20:12:42 +00:00 committed by Gerrit Code Review
commit d8da37fd5a
2 changed files with 402 additions and 17 deletions

View File

@ -81,6 +81,7 @@ class RingBuilder(object):
self.devs_changed = False
self.version = 0
self.overload = 0.0
self._effective_overload = None
# _replica2part2dev maps from replica number to partition number to
# device id. So, for a three replica, 2**23 ring, it's an array of
@ -380,6 +381,11 @@ class RingBuilder(object):
if seed is not None:
random.seed(seed)
self._effective_overload = min(self.overload,
self.get_required_overload())
self.logger.debug("Using effective overload of %f",
self._effective_overload)
self._ring = None
if self._last_part_moves_epoch is None:
self.logger.debug("New builder; performing initial balance")
@ -401,7 +407,8 @@ class RingBuilder(object):
while True:
reassign_parts = self._gather_reassign_parts()
changed_parts += len(reassign_parts)
self.logger.debug("Gathered %d parts", changed_parts)
self.logger.debug("Gathered %d parts thus far (%d this pass)",
changed_parts, len(reassign_parts))
self._reassign_parts(reassign_parts)
self.logger.debug("Assigned %d parts", changed_parts)
while self._remove_devs:
@ -602,6 +609,151 @@ class RingBuilder(object):
balance = dev_balance
return balance
def get_required_overload(self):
"""
Returns the minimum overload value required to make the ring maximally
dispersed.
"""
self.logger.debug("computing required overload")
tfd, sibling_tiers = self._compute_sibling_tiers()
max_allowed_replicas = self._build_max_replicas_by_tier()
# We're computing a bunch of different things here, but iterating
# over all the devs once is more efficient than doing it a bunch of
# times.
all_tiers = set([()])
tier_weight = defaultdict(float)
total_weight = 0.0
tier2children = defaultdict(set)
for dev in self._iter_devs():
dev_weight = dev['weight']
total_weight += dev_weight
for tier in tfd[dev['id']]:
all_tiers.add(tier)
tier_weight[tier] += dev_weight
tier2children[tier[:-1]].add(tier)
tier_weight[()] = total_weight
max_required_overload = 0.0
for tier in all_tiers:
if tier not in tier2children:
continue
if tier_weight[tier] <= 0:
continue
# Example 1: Consider a 3-replica cluster with 2 regions. If one
# region has more than 2/3 the total weight, then (ignoring
# overload) some partitions will reside entirely in the big
# region.
#
# Example 2: Consider a 3-replica cluster with 3 regions. If any
# region has more than 1/3 the total weight, some partitions will
# not have replicas spread across all regions.
#
# Example 3: Consider a 3-replica cluster with 4 regions. If any
# region has more than 1/3 the total weight, some partitions will
# not have replicas spread across all regions.
#
# Example 4: Consider a 3-replica cluster with 100 regions. If
# any region has more than 1/3 the total weight, some partitions
# will not have replicas spread across all regions. The fact
# that there's 100 regions doesn't matter; if one region is big
# enough, it'll get multiple replicas of some partitions.
#
# Example 5: Consider a 5-replica cluster with 2 regions. If the
# bigger region has more than 3/5 the weight, some partitions
# will have more than 3 replicas in the big region. (Optimal
# dispersion is 3 replicas in some region and 2 in the other; 4
# and 1 is not good enough.)
#
# In general, what we do is split this tier's child tiers
# into two groups: "big" and "small". "Big" child tiers are
# ones whose weight exceeds their fraction of the replicas.
# For example, given 3 replicas and 4 zones of total weight
# 12,000, a zone with weight greater than 1/3 of 12,000 (=
# 4,000) would be considered big. "Small" child tiers are
# those which are not big.
#
# Once we've divided the child tiers into big and small, we
# figure out how many replicas should wind up on the small
# child tiers (all together), and then compute the needed
# overload factor to boost their weights so they can take
# that many replicas.
child_tiers = tier2children[tier]
tier_replicas = max_allowed_replicas[tier]
big_child_count = small_child_count = 0
big_child_weight = small_child_weight = 0.0
max_child_replicas = math.ceil(tier_replicas / len(child_tiers))
bigness_threshold = (
max_child_replicas / tier_replicas * tier_weight[tier])
for child_tier in tier2children[tier]:
child_weight = tier_weight[child_tier]
if child_weight == 0:
# If it's got 0 weight, it's not taking any
# partitions at all, so it doesn't count.
continue
if child_weight >= bigness_threshold:
big_child_count += 1
big_child_weight += child_weight
else:
small_child_count += 1
small_child_weight += child_weight
if big_child_count == 0 or small_child_count == 0:
# We only need overload if we have both big and small
# tiers. Usually, all small tiers means things can
# balance, while all big tiers means that we have
# exactly one child tier (e.g. a cluster with only one
# region).
continue
# We assume each big child tier takes the maximum possible
# number of replicas for optimal dispersion, but no more.
# That leaves the remainder for the small child tiers.
big_child_replicas = max_child_replicas * big_child_count
small_child_replicas = tier_replicas - big_child_replicas
if small_child_replicas == 0:
# If we're not putting any replicas on small child
# tiers, then there's no need for overload. This also
# avoids a division-by-zero below.
continue
# We want the overloaded small tiers to take up their fair
# share of the replicas. We can express this as follows:
#
# Let Ws be the sum of the weights of the small child tiers.
#
# Let Wb be the sum of the weights of the big child tiers.
#
# Let Rt be the number of replicas at the current tier.
#
# Let Rs be the desired number of replicas for the small
# child tiers.
#
# Let L be the overload.
#
# Then, we have the following:
#
# (L * Ws) / (Wb + L * Ws) = Rs / Rt
#
# Solving for L, we get:
#
# L = 1 / (Ws / Wb * (Rt / Rs - 1))
required_overload = 1.0 / (
(small_child_weight / big_child_weight)
* (tier_replicas / small_child_replicas - 1)) - 1
if required_overload > max_required_overload:
self.logger.debug("Required overload for %r is %f [NEW HIGH]",
tier, required_overload)
max_required_overload = required_overload
else:
self.logger.debug("Required overload for %r is %f",
tier, required_overload)
return max_required_overload
def pretend_min_part_hours_passed(self):
"""
Override min_part_hours by marking all partitions as having been moved
@ -643,6 +795,8 @@ class RingBuilder(object):
used to sort the devices according to "most wanted" during rebalancing
to best distribute partitions. A negative parts_wanted indicates the
device is "overweight" and wishes to give partitions away if possible.
Note: parts_wanted does *not* consider overload.
"""
weight_of_one_part = self.weight_of_one_part()
@ -767,29 +921,30 @@ class RingBuilder(object):
Returns a dict of (tier: available parts in other tiers) for all tiers
in the ring.
Devices that have too much partitions (negative parts_wanted) are
ignored, otherwise the sum of all parts_wanted is 0 +/- rounding
errors.
Devices that have too many partitions (negative parts_wanted plus
overload) are ignored, otherwise the sum of all returned values is 0
+/- rounding errors.
This takes overload into account.
"""
wanted_parts_for_tier = {}
for dev in self._iter_devs():
pw = (max(0, dev['parts_wanted']) +
max(int(math.ceil(
(dev['parts_wanted'] + dev['parts']) * self.overload)),
0))
extra_overload_parts = self._n_overload_parts(dev)
pw = max(dev['parts_wanted'] + extra_overload_parts, 0)
for tier in tiers_for_dev(dev):
wanted_parts_for_tier.setdefault(tier, 0)
wanted_parts_for_tier[tier] += pw
return wanted_parts_for_tier
def _gather_reassign_parts(self):
def _compute_sibling_tiers(self):
"""
Returns a list of (partition, replicas) pairs to be reassigned by
gathering from removed devices, insufficiently-far-apart replicas, and
overweight drives.
Returns a 2-tuple; the first value is a dictionary mapping each
device's id to its tiers, and the second is a dictionary mapping
a-tier: list-of-sibling-tiers.
"""
# inline memoization of tiers_for_dev() results (profiling reveals it
# as a hot-spot).
# as a hot-spot). We also return it so callers don't have to
# rebuild it.
tfd = {}
tiers_by_len = defaultdict(set)
@ -807,6 +962,15 @@ class RingBuilder(object):
for i, tier in enumerate(tiers):
sibling_tiers[tier] = [t for t in (tiers[:i] + tiers[(i + 1):])
if t[:-1] == tier[:-1]]
return (tfd, sibling_tiers)
def _gather_reassign_parts(self):
"""
Returns a list of (partition, replicas) pairs to be reassigned by
gathering from removed devices, insufficiently-far-apart replicas, and
overweight drives.
"""
tfd, sibling_tiers = self._compute_sibling_tiers()
# First we gather partitions from removed devices. Since removed
# devices usually indicate device failures, we have no choice but to
@ -917,6 +1081,7 @@ class RingBuilder(object):
start += random.randint(0, self.parts / 2) # GRAH PEP8!!!
self._last_part_gather_start = start
for replica, part2dev in enumerate(self._replica2part2dev):
# If we've got a partial replica, start may be out of
# range. Scale it down so that we get a similar movement
@ -930,7 +1095,8 @@ class RingBuilder(object):
if part in removed_dev_parts or part in spread_out_parts:
continue
dev = self.devs[part2dev[part]]
if dev['parts_wanted'] < 0:
fudge = self._n_overload_parts(dev)
if dev['parts_wanted'] + fudge < 0:
self._last_part_moves[part] = 0
dev['parts_wanted'] += 1
dev['parts'] -= 1
@ -953,6 +1119,14 @@ class RingBuilder(object):
random.shuffle(reassign_parts_list)
return reassign_parts_list
def _n_overload_parts(self, dev):
"""
The number of extra partitions a device can take due to overload.
"""
return max(int(math.ceil(
(dev['parts_wanted'] + dev['parts'])
* self._effective_overload)), 0)
def _reassign_parts(self, reassign_parts):
"""
For an existing ring data set, partitions are reassigned similarly to
@ -992,9 +1166,7 @@ class RingBuilder(object):
# with partitions to shed, which is any time a device is being
# removed, which is a pretty frequent operation.
wanted = max(dev['parts_wanted'], 0)
fudge = max(int(math.ceil(
(dev['parts_wanted'] + dev['parts']) * self.overload)),
0)
fudge = self._n_overload_parts(dev)
for tier in tiers:
fudge_available_in_tier[tier] += (wanted + fudge)
parts_available_in_tier[tier] += wanted

View File

@ -1071,6 +1071,75 @@ class TestRingBuilder(unittest.TestCase):
self.assertEqual(part_counts[1], 256)
self.assertEqual(part_counts[2], 256)
def test_unoverload(self):
# Start off needing overload to balance, then add capacity until we
# don't need overload any more and see that things still balance.
# Overload doesn't prevent optimal balancing.
rb = ring.RingBuilder(8, 3, 1)
rb.set_overload(0.125)
rb.add_dev({'id': 0, 'region': 0, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'region': 0, 'zone': 0, 'weight': 2,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.rebalance(seed=12345)
# sanity check: our overload is big enough to balance things
part_counts = self._partition_counts(rb)
self.assertEqual(part_counts[0], 216)
self.assertEqual(part_counts[1], 216)
self.assertEqual(part_counts[2], 336)
# Add some weight: balance improves
rb.set_dev_weight(0, 1.5)
rb.set_dev_weight(1, 1.5)
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb)
self.assertEqual(part_counts[0], 236)
self.assertEqual(part_counts[1], 236)
self.assertEqual(part_counts[2], 296)
# Even out the weights: balance becomes perfect
rb.set_dev_weight(0, 2)
rb.set_dev_weight(1, 2)
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb)
self.assertEqual(part_counts[0], 256)
self.assertEqual(part_counts[1], 256)
self.assertEqual(part_counts[2], 256)
# Add some new devices: balance stays optimal
rb.add_dev({'id': 3, 'region': 0, 'region': 0, 'zone': 0,
'weight': 2.0 / 3,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdd'})
rb.add_dev({'id': 4, 'region': 0, 'region': 0, 'zone': 0,
'weight': 2.0 / 3,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sde'})
rb.add_dev({'id': 5, 'region': 0, 'region': 0, 'zone': 0,
'weight': 2.0 / 3,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdf'})
# we're moving more than 1/3 of the replicas but fewer than 2/3, so
# we have to do this twice
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
rb.pretend_min_part_hours_passed()
rb.rebalance(seed=12345)
part_counts = self._partition_counts(rb)
self.assertEqual(part_counts[0], 192)
self.assertEqual(part_counts[1], 192)
self.assertEqual(part_counts[2], 192)
self.assertEqual(part_counts[3], 64)
self.assertEqual(part_counts[4], 64)
self.assertEqual(part_counts[5], 64)
def test_overload_keeps_balanceable_things_balanced_initially(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'region': 0, 'zone': 0, 'weight': 8,
@ -1595,5 +1664,149 @@ class TestRingBuilder(unittest.TestCase):
})
class TestGetRequiredOverload(unittest.TestCase):
def assertApproximately(self, a, b, error=1e-6):
self.assertTrue(abs(a - b) < error,
"%f and %f differ by more than %f" % (a, b, error))
def test_none_needed(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 0, 'weight': 1,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdd'})
# 4 equal-weight devs and 3 replicas: this can be balanced without
# resorting to overload at all
self.assertApproximately(rb.get_required_overload(), 0)
# 3 equal-weight devs and 3 replicas: this can also be balanced
rb.remove_dev(3)
self.assertApproximately(rb.get_required_overload(), 0)
def test_small_zone(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 4,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 4,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 1, 'weight': 4,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 1, 'weight': 4,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdd'})
rb.add_dev({'id': 4, 'region': 0, 'zone': 2, 'weight': 4,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdc'})
rb.add_dev({'id': 5, 'region': 0, 'zone': 2, 'weight': 3,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdd'})
# Zone 2 has 7/8 of the capacity of the other two zones, so an
# overload of 1/7 will allow things to balance out.
self.assertApproximately(rb.get_required_overload(), 1.0 / 7)
def test_big_zone(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 1, 'weight': 60,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 1, 'weight': 60,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 4, 'region': 0, 'zone': 2, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 5, 'region': 0, 'zone': 2, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 6, 'region': 0, 'zone': 3, 'weight': 60,
'ip': '127.0.0.3', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 7, 'region': 0, 'zone': 3, 'weight': 60,
'ip': '127.0.0.3', 'port': 10000, 'device': 'sdb'})
# Zone 1 has weight 200, while zones 2, 3, and 4 together have only
# 360. The small zones would need to go from 360 to 400 to balance
# out zone 1, for an overload of 40/360 = 1/9.
self.assertApproximately(rb.get_required_overload(), 1.0 / 9)
def test_enormous_zone(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 1000,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 1000,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 1, 'weight': 60,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 1, 'weight': 60,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 4, 'region': 0, 'zone': 2, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 5, 'region': 0, 'zone': 2, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 6, 'region': 0, 'zone': 3, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 7, 'region': 0, 'zone': 3, 'weight': 60,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
# Zone 1 has weight 2000, while zones 2, 3, and 4 together have only
# 360. The small zones would need to go from 360 to 4000 to balance
# out zone 1, for an overload of 3640/360.
self.assertApproximately(rb.get_required_overload(), 3640.0 / 360)
def test_two_big_two_small(self):
rb = ring.RingBuilder(8, 3, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 1, 'weight': 100,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 1, 'weight': 100,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 4, 'region': 0, 'zone': 2, 'weight': 45,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 5, 'region': 0, 'zone': 2, 'weight': 45,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 6, 'region': 0, 'zone': 3, 'weight': 35,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 7, 'region': 0, 'zone': 3, 'weight': 35,
'ip': '127.0.0.2', 'port': 10000, 'device': 'sdb'})
# Zones 1 and 2 each have weight 200, while zones 3 and 4 together
# have only 160. The small zones would need to go from 160 to 200 to
# balance out the big zones, for an overload of 40/160 = 1/4.
self.assertApproximately(rb.get_required_overload(), 1.0 / 4)
def test_multiple_replicas_each(self):
rb = ring.RingBuilder(8, 7, 1)
rb.add_dev({'id': 0, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 1, 'region': 0, 'zone': 0, 'weight': 100,
'ip': '127.0.0.0', 'port': 10000, 'device': 'sdb'})
rb.add_dev({'id': 2, 'region': 0, 'zone': 1, 'weight': 70,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sda'})
rb.add_dev({'id': 3, 'region': 0, 'zone': 1, 'weight': 70,
'ip': '127.0.0.1', 'port': 10000, 'device': 'sdb'})
# Zone 0 has more than 4/7 of the weight, so we'll need to bring
# zone 1 up to a total of 150 so it can take 3 replicas, so the
# overload should be 10/140.
self.assertApproximately(rb.get_required_overload(), 10.0 / 140)
if __name__ == '__main__':
unittest.main()