Merge "Update handoff algorithm to use IP/port pairs"

This commit is contained in:
Jenkins 2013-12-06 22:31:19 +00:00 committed by Gerrit Code Review
commit 46e7ba0e70
2 changed files with 85 additions and 47 deletions

View File

@ -168,14 +168,18 @@ class Ring(object):
# doing it on every call to get_more_nodes(). # doing it on every call to get_more_nodes().
regions = set() regions = set()
zones = set() zones = set()
ip_ports = set()
self._num_devs = 0 self._num_devs = 0
for dev in self._devs: for dev in self._devs:
if dev: if dev:
regions.add(dev['region']) regions.add(dev['region'])
zones.add((dev['region'], dev['zone'])) zones.add((dev['region'], dev['zone']))
ip_ports.add((dev['region'], dev['zone'],
dev['ip'], dev['port']))
self._num_devs += 1 self._num_devs += 1
self._num_regions = len(regions) self._num_regions = len(regions)
self._num_zones = len(zones) self._num_zones = len(zones)
self._num_ip_ports = len(ip_ports)
def _rebuild_tier_data(self): def _rebuild_tier_data(self):
self.tier2devs = defaultdict(list) self.tier2devs = defaultdict(list)
@ -313,6 +317,8 @@ class Ring(object):
used = set(d['id'] for d in primary_nodes) used = set(d['id'] for d in primary_nodes)
same_regions = set(d['region'] for d in primary_nodes) same_regions = set(d['region'] for d in primary_nodes)
same_zones = set((d['region'], d['zone']) for d in primary_nodes) same_zones = set((d['region'], d['zone']) for d in primary_nodes)
same_ip_ports = set((d['region'], d['zone'], d['ip'], d['port'])
for d in primary_nodes)
parts = len(self._replica2part2dev_id[0]) parts = len(self._replica2part2dev_id[0])
start = struct.unpack_from( start = struct.unpack_from(
@ -333,12 +339,14 @@ class Ring(object):
dev_id = part2dev_id[handoff_part] dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id] dev = self._devs[dev_id]
region = dev['region'] region = dev['region']
zone = (region, dev['zone'])
if dev_id not in used and region not in same_regions: if dev_id not in used and region not in same_regions:
yield dev yield dev
used.add(dev_id) used.add(dev_id)
same_regions.add(region) same_regions.add(region)
same_zones.add(zone) zone = dev['zone']
ip_port = (region, zone, dev['ip'], dev['port'])
same_zones.add((region, zone))
same_ip_ports.add(ip_port)
if len(same_regions) == self._num_regions: if len(same_regions) == self._num_regions:
hit_all_regions = True hit_all_regions = True
break break
@ -360,10 +368,34 @@ class Ring(object):
yield dev yield dev
used.add(dev_id) used.add(dev_id)
same_zones.add(zone) same_zones.add(zone)
ip_port = zone + (dev['ip'], dev['port'])
same_ip_ports.add(ip_port)
if len(same_zones) == self._num_zones: if len(same_zones) == self._num_zones:
hit_all_zones = True hit_all_zones = True
break break
hit_all_ip_ports = len(same_ip_ports) == self._num_ip_ports
for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc),
start, inc)):
if hit_all_ip_ports:
# We've exhausted the pool of unused backends, so stop
# looking.
break
for part2dev_id in self._replica2part2dev_id:
if handoff_part < len(part2dev_id):
dev_id = part2dev_id[handoff_part]
dev = self._devs[dev_id]
ip_port = (dev['region'], dev['zone'],
dev['ip'], dev['port'])
if dev_id not in used and ip_port not in same_ip_ports:
yield dev
used.add(dev_id)
same_ip_ports.add(ip_port)
if len(same_ip_ports) == self._num_ip_ports:
hit_all_ip_ports = True
break
hit_all_devs = len(used) == self._num_devs hit_all_devs = len(used) == self._num_devs
for handoff_part in chain(xrange(start, parts, inc), for handoff_part in chain(xrange(start, parts, inc),
xrange(inc - ((parts - start) % inc), xrange(inc - ((parts - start) % inc),

View File

@ -44,8 +44,8 @@ class TestRingData(unittest.TestCase):
def test_attrs(self): def test_attrs(self):
r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]] r2p2d = [[0, 1, 0, 1], [0, 1, 0, 1]]
d = [{'id': 0, 'zone': 0, 'region': 0}, d = [{'id': 0, 'zone': 0, 'region': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'region': 1}] {'id': 1, 'zone': 1, 'region': 1, 'ip': '10.1.1.1', 'port': 7000}]
s = 30 s = 30
rd = ring.RingData(r2p2d, d, s) rd = ring.RingData(r2p2d, d, s)
self.assertEquals(rd._replica2part2dev_id, r2p2d) self.assertEquals(rd._replica2part2dev_id, r2p2d)
@ -55,7 +55,9 @@ class TestRingData(unittest.TestCase):
def test_can_load_pickled_ring_data(self): def test_can_load_pickled_ring_data(self):
rd = ring.RingData( rd = ring.RingData(
[[0, 1, 0, 1], [0, 1, 0, 1]], [[0, 1, 0, 1], [0, 1, 0, 1]],
[{'id': 0, 'zone': 0}, {'id': 1, 'zone': 1}], 30) [{'id': 0, 'zone': 0, 'ip': '10.1.1.0', 'port': 7000},
{'id': 1, 'zone': 1, 'ip': '10.1.1.1', 'port': 7000}],
30)
ring_fname = os.path.join(self.testdir, 'foo.ring.gz') ring_fname = os.path.join(self.testdir, 'foo.ring.gz')
for p in xrange(pickle.HIGHEST_PROTOCOL): for p in xrange(pickle.HIGHEST_PROTOCOL):
with closing(GzipFile(ring_fname, 'wb')) as f: with closing(GzipFile(ring_fname, 'wb')) as f:
@ -168,7 +170,8 @@ class TestRing(unittest.TestCase):
orig_mtime = self.ring._mtime orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 5) self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append( self.intended_devs.append(
{'id': 3, 'region': 0, 'zone': 3, 'weight': 1.0}) {'id': 3, 'region': 0, 'zone': 3, 'weight': 1.0,
'ip': '10.1.1.1', 'port': 9876})
ring.RingData( ring.RingData(
self.intended_replica2part2dev_id, self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz) self.intended_devs, self.intended_part_shift).save(self.testgz)
@ -183,7 +186,8 @@ class TestRing(unittest.TestCase):
orig_mtime = self.ring._mtime orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 6) self.assertEquals(len(self.ring.devs), 6)
self.intended_devs.append( self.intended_devs.append(
{'id': 5, 'region': 0, 'zone': 4, 'weight': 1.0}) {'id': 5, 'region': 0, 'zone': 4, 'weight': 1.0,
'ip': '10.5.5.5', 'port': 9876})
ring.RingData( ring.RingData(
self.intended_replica2part2dev_id, self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz) self.intended_devs, self.intended_part_shift).save(self.testgz)
@ -199,7 +203,8 @@ class TestRing(unittest.TestCase):
part, nodes = self.ring.get_nodes('a') part, nodes = self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 7) self.assertEquals(len(self.ring.devs), 7)
self.intended_devs.append( self.intended_devs.append(
{'id': 6, 'region': 0, 'zone': 5, 'weight': 1.0}) {'id': 6, 'region': 0, 'zone': 5, 'weight': 1.0,
'ip': '10.6.6.6', 'port': 6000})
ring.RingData( ring.RingData(
self.intended_replica2part2dev_id, self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz) self.intended_devs, self.intended_part_shift).save(self.testgz)
@ -214,7 +219,8 @@ class TestRing(unittest.TestCase):
orig_mtime = self.ring._mtime orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 8) self.assertEquals(len(self.ring.devs), 8)
self.intended_devs.append( self.intended_devs.append(
{'id': 5, 'region': 0, 'zone': 4, 'weight': 1.0}) {'id': 5, 'region': 0, 'zone': 4, 'weight': 1.0,
'ip': '10.5.5.5', 'port': 6000})
ring.RingData( ring.RingData(
self.intended_replica2part2dev_id, self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift).save(self.testgz) self.intended_devs, self.intended_part_shift).save(self.testgz)
@ -401,11 +407,11 @@ class TestRing(unittest.TestCase):
exp_devs = [48, 93, 96] exp_devs = [48, 93, 96]
exp_zones = set([5, 8, 9]) exp_zones = set([5, 8, 9])
exp_handoffs = [11, 47, 25, 76, 69, 23, 99, 59, 106, 64, 107, 43, 50, exp_handoffs = [11, 47, 25, 76, 69, 23, 99, 59, 106, 64, 43, 34, 88, 3,
34, 88, 3, 57, 30, 83, 31, 16, 27, 103, 39, 32, 60, 77, 30, 83, 16, 27, 103, 39, 60, 0, 8, 72, 56, 19, 91, 13,
24, 0, 42, 8, 100, 72, 56, 19, 71, 26, 9, 20, 35, 91, 84, 38, 66, 52, 78, 107, 50, 57, 31, 32, 77, 24, 42,
13, 84, 5, 38, 14, 94, 28, 41, 18, 66, 102, 52, 101, 100, 71, 26, 9, 20, 35, 5, 14, 94, 28, 41, 18, 102,
61, 95, 21, 81, 1, 78, 105, 58, 74, 90, 86, 46, 4, 68, 101, 61, 95, 21, 81, 1, 105, 58, 74, 90, 86, 46, 4, 68,
40, 80, 54, 75, 45, 79, 44, 49, 62, 29, 7, 15, 70, 87, 40, 80, 54, 75, 45, 79, 44, 49, 62, 29, 7, 15, 70, 87,
65, 12, 82, 17, 104, 97, 55, 22, 6, 89, 2, 67, 37, 63, 65, 12, 82, 17, 104, 97, 55, 22, 6, 89, 2, 67, 37, 63,
53, 92, 33, 85, 73, 51, 98, 36, 10] 53, 92, 33, 85, 73, 51, 98, 36, 10]
@ -512,14 +518,13 @@ class TestRing(unittest.TestCase):
# Change expectations # Change expectations
# The long string of handoff nodes for the partition were the same for # The long string of handoff nodes for the partition were the same for
# the first 20, which is pretty good. # the first 20, which is pretty good.
exp_handoffs[20:] = [16, 27, 103, 39, 32, 60, 77, 24, 108, 42, 8, 100, exp_handoffs[20:] = [60, 108, 8, 72, 56, 19, 91, 13, 84, 38, 66, 52,
72, 56, 19, 71, 26, 9, 20, 35, 91, 13, 84, 5, 38, 1, 78, 107, 50, 57, 31, 32, 77, 24, 42, 100, 71,
14, 94, 28, 41, 18, 66, 102, 52, 101, 61, 95, 21, 26, 9, 20, 35, 5, 14, 94, 28, 41, 18, 102, 101,
81, 1, 78, 105, 58, 74, 90, 86, 46, 4, 68, 40, 80, 61, 95, 21, 81, 105, 58, 74, 90, 86, 46, 4, 68,
54, 75, 45, 79, 44, 49, 62, 29, 7, 15, 70, 87, 65, 40, 80, 54, 75, 45, 79, 44, 49, 62, 29, 7, 15, 70,
12, 82, 17, 104, 97, 55, 22, 6, 89, 2, 67, 37, 63, 87, 65, 12, 82, 17, 104, 97, 55, 22, 6, 89, 2, 67,
53, 92, 33, 85, 73, 51, 98, 36, 10] 37, 63, 53, 92, 33, 85, 73, 51, 98, 36, 10]
# Just a few of the first handoffs changed # Just a few of the first handoffs changed
exp_first_handoffs[3] = 68 exp_first_handoffs[3] = 68
exp_first_handoffs[55] = 104 exp_first_handoffs[55] = 104
@ -565,15 +570,15 @@ class TestRing(unittest.TestCase):
exp_zones.add(4) exp_zones.add(4)
# Caused some major changes in the sequence of handoffs for our test # Caused some major changes in the sequence of handoffs for our test
# partition, but at least the first stayed the same. # partition, but at least the first stayed the same.
exp_handoffs[1:] = [81, 25, 69, 23, 99, 59, 76, 3, 106, 45, 64, 107, exp_handoffs[1:] = [81, 25, 69, 23, 99, 59, 76, 3, 106, 64, 43, 13, 34,
43, 13, 50, 34, 88, 57, 30, 16, 83, 31, 46, 27, 88, 30, 16, 27, 103, 39, 74, 60, 108, 8, 56, 19,
103, 39, 74, 32, 60, 77, 24, 108, 42, 63, 8, 100, 91, 52, 84, 38, 66, 1, 78, 45, 107, 50, 57, 83, 31,
72, 56, 19, 71, 7, 26, 9, 20, 35, 91, 52, 84, 5, 46, 32, 77, 24, 42, 63, 100, 72, 71, 7, 26, 9, 20,
87, 38, 14, 94, 62, 28, 41, 90, 18, 66, 82, 102, 35, 5, 87, 14, 94, 62, 28, 41, 90, 18, 82, 102, 22,
22, 101, 61, 85, 95, 21, 98, 1, 67, 78, 105, 58, 101, 61, 85, 95, 21, 98, 67, 105, 58, 86, 4, 79,
86, 4, 79, 68, 40, 80, 54, 75, 44, 49, 6, 29, 15, 68, 40, 80, 54, 75, 44, 49, 6, 29, 15, 70, 65, 12,
70, 65, 12, 17, 104, 97, 55, 89, 2, 37, 53, 92, 17, 104, 97, 55, 89, 2, 37, 53, 92, 33, 73, 51, 36,
33, 73, 51, 36, 10] 10]
# Lots of first handoffs changed, but 30 of 256 is still just 11.72%. # Lots of first handoffs changed, but 30 of 256 is still just 11.72%.
exp_first_handoffs[1] = 6 exp_first_handoffs[1] = 6
@ -639,14 +644,15 @@ class TestRing(unittest.TestCase):
exp_part2 = 136 exp_part2 = 136
exp_devs2 = [52, 76, 97] exp_devs2 = [52, 76, 97]
exp_zones2 = set([9, 5, 7]) exp_zones2 = set([9, 5, 7])
exp_handoffs2 = [2, 67, 37, 92, 33, 23, 107, 96, 63, 53, 44, 103, exp_handoffs2 = [2, 67, 37, 92, 33, 23, 107, 63, 44, 103, 108, 85,
108, 85, 73, 51, 42, 98, 35, 36, 10, 89, 80, 84, 43, 73, 10, 89, 80, 4, 17, 49, 32, 12, 41, 58, 20, 25,
4, 17, 49, 104, 32, 12, 41, 58, 31, 65, 20, 25, 61, 1, 61, 94, 47, 69, 56, 101, 28, 83, 8, 96, 53, 51, 42,
40, 9, 94, 47, 69, 56, 74, 101, 95, 45, 5, 71, 86, 78, 98, 35, 36, 84, 43, 104, 31, 65, 1, 40, 9, 74, 95,
30, 93, 48, 28, 91, 15, 88, 39, 18, 57, 83, 72, 70, 45, 5, 71, 86, 78, 30, 93, 48, 91, 15, 88, 39, 18,
27, 54, 16, 24, 21, 14, 11, 8, 77, 62, 50, 6, 105, 26, 57, 72, 70, 27, 54, 16, 24, 21, 14, 11, 77, 62, 50,
55, 29, 60, 34, 13, 87, 59, 38, 99, 75, 106, 3, 82, 6, 105, 26, 55, 29, 60, 34, 13, 87, 59, 38, 99, 75,
66, 79, 7, 46, 64, 81, 22, 68, 19, 102, 90, 100] 106, 3, 82, 66, 79, 7, 46, 64, 81, 22, 68, 19, 102,
90, 100]
part2, devs2 = r.get_nodes('a', 'c', 'o2') part2, devs2 = r.get_nodes('a', 'c', 'o2')
primary_zones2 = set([d['zone'] for d in devs2]) primary_zones2 = set([d['zone'] for d in devs2])
@ -701,14 +707,14 @@ class TestRing(unittest.TestCase):
# Here's a brittle canary-in-the-coalmine test to make sure the region # Here's a brittle canary-in-the-coalmine test to make sure the region
# handoff computation didn't change accidentally # handoff computation didn't change accidentally
exp_handoffs = [111, 112, 74, 54, 93, 31, 2, 43, 100, 22, 71, 32, 92, exp_handoffs = [111, 112, 74, 54, 93, 31, 2, 43, 100, 22, 71, 92, 35,
35, 9, 50, 41, 76, 80, 84, 88, 17, 94, 101, 1, 10, 96, 9, 50, 41, 76, 80, 84, 88, 17, 96, 6, 102, 37, 29,
44, 73, 6, 75, 102, 37, 21, 97, 29, 105, 5, 28, 47, 105, 5, 47, 20, 13, 108, 66, 81, 53, 65, 25, 58, 32,
106, 30, 16, 39, 77, 42, 72, 20, 13, 34, 99, 108, 14, 94, 101, 1, 10, 44, 73, 75, 21, 97, 28, 106, 30, 16,
66, 61, 81, 90, 4, 40, 3, 45, 62, 7, 15, 87, 12, 83, 39, 77, 42, 72, 34, 99, 14, 61, 90, 4, 40, 3, 45, 62,
89, 53, 33, 98, 49, 65, 25, 107, 56, 58, 86, 48, 57, 7, 15, 87, 12, 83, 89, 33, 98, 49, 107, 56, 86, 48,
24, 11, 23, 26, 46, 64, 69, 38, 36, 79, 63, 104, 51, 57, 24, 11, 23, 26, 46, 64, 69, 38, 36, 79, 63, 104,
70, 82, 67, 68, 8, 95, 91, 55, 59, 85] 51, 70, 82, 67, 68, 8, 95, 91, 55, 59, 85]
dev_ids = [d['id'] for d in more_devs] dev_ids = [d['id'] for d in more_devs]
self.assertEquals(len(dev_ids), len(exp_handoffs)) self.assertEquals(len(dev_ids), len(exp_handoffs))