Merge "Use remote frag index to calculate suffix diff"

This commit is contained in:
Zuul 2019-01-16 01:42:16 +00:00 committed by Gerrit Code Review
commit 9aa4cafa0e
2 changed files with 85 additions and 50 deletions

View File

@ -718,7 +718,8 @@ class ObjectReconstructor(Daemon):
suffixes = self.get_suffix_delta(job['hashes'], suffixes = self.get_suffix_delta(job['hashes'],
job['frag_index'], job['frag_index'],
remote_suffixes, remote_suffixes,
node['index']) job['policy'].get_backend_index(
node['index']))
# now recalculate local hashes for suffixes that don't # now recalculate local hashes for suffixes that don't
# match so we're comparing the latest # match so we're comparing the latest
local_suff = self._get_hashes(job['local_dev']['device'], local_suff = self._get_hashes(job['local_dev']['device'],
@ -728,7 +729,8 @@ class ObjectReconstructor(Daemon):
suffixes = self.get_suffix_delta(local_suff, suffixes = self.get_suffix_delta(local_suff,
job['frag_index'], job['frag_index'],
remote_suffixes, remote_suffixes,
node['index']) job['policy'].get_backend_index(
node['index']))
self.suffix_count += len(suffixes) self.suffix_count += len(suffixes)
return suffixes return suffixes

View File

@ -3436,6 +3436,39 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
r['headers']['X-Backend-Storage-Policy-Index'] r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests]) for r in request_log.requests])
def test_get_suffixes_in_sync(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
local_hashes = {
'123': {job['frag_index']: 'hash', None: 'hash'},
'abc': {job['frag_index']: 'hash', None: 'hash'},
}
remote_index = (
job['frag_index'] - 1) % self.policy.ec_n_unique_fragments
remote_hashes = {
'123': {remote_index: 'hash', None: 'hash'},
'abc': {remote_index: 'hash', None: 'hash'},
}
remote_response = pickle.dumps(remote_hashes)
with mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, local_hashes)), \
mocked_http_conn(200, body=remote_response) as request_log:
suffixes = self.reconstructor._get_suffixes_to_sync(job, node)
self.assertEqual([node['replication_ip']],
[r['ip'] for r in request_log.requests])
self.assertEqual(suffixes, [])
def test_get_suffix_delta(self): def test_get_suffix_delta(self):
# different # different
local_suff = {'123': {None: 'abc', 0: 'def'}} local_suff = {'123': {None: 'abc', 0: 'def'}}
@ -3474,22 +3507,23 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(suffs, ['123']) self.assertEqual(suffs, ['123'])
def test_process_job_primary_in_sync(self): def test_process_job_primary_in_sync(self):
replicas = self.policy.object_ring.replicas partition = 0
frag_index = random.randint( part_nodes = self.policy.object_ring.get_part_nodes(partition)
0, self.policy.ec_n_unique_fragments - 1) local_dev = random.choice(part_nodes)
sync_to = [n for n in self.policy.object_ring.devs frag_index = self.policy.get_backend_index(local_dev['index'])
if n != self.local_dev][:2] sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
# setup left and right hashes # setup left and right hashes
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
} }
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas left_index = self.policy.get_backend_index(sync_to[0]['index'])
left_hashes = { left_hashes = {
'123': {left_index: 'hash', None: 'hash'}, '123': {left_index: 'hash', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'}, 'abc': {left_index: 'hash', None: 'hash'},
} }
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas right_index = self.policy.get_backend_index(sync_to[1]['index'])
right_hashes = { right_hashes = {
'123': {right_index: 'hash', None: 'hash'}, '123': {right_index: 'hash', None: 'hash'},
'abc': {right_index: 'hash', None: 'hash'}, 'abc': {right_index: 'hash', None: 'hash'},
@ -3523,8 +3557,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.reconstructor.process_job(job) self.reconstructor.process_job(job)
expected_suffix_calls = set([ expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'), (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
('10.0.0.2', '/sdc/0'), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
]) ])
self.assertEqual(expected_suffix_calls, self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path']) set((r['ip'], r['path'])
@ -3533,19 +3567,18 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertFalse(ssync_calls) self.assertFalse(ssync_calls)
def test_process_job_primary_not_in_sync(self): def test_process_job_primary_not_in_sync(self):
replicas = self.policy.object_ring.replicas partition = 0
frag_index = random.randint( part_nodes = self.policy.object_ring.get_part_nodes(partition)
0, self.policy.ec_n_unique_fragments - 1) local_dev = random.choice(part_nodes)
sync_to = [n for n in self.policy.object_ring.devs frag_index = self.policy.get_backend_index(local_dev['index'])
if n != self.local_dev][:2] sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
# setup left and right hashes # setup left and right hashes
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
} }
sync_to[0]['index'] = (frag_index - 1) % replicas
left_hashes = {} left_hashes = {}
sync_to[1]['index'] = (frag_index + 1) % replicas
right_hashes = {} right_hashes = {}
partition = 0 partition = 0
@ -3576,18 +3609,18 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.reconstructor.process_job(job) self.reconstructor.process_job(job)
expected_suffix_calls = set([ expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'), (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
('10.0.0.1', '/sdb/0/123-abc'), (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
('10.0.0.2', '/sdc/0'), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
('10.0.0.2', '/sdc/0/123-abc'), (sync_to[1]['ip'], '/%s/0/123-abc' % sync_to[1]['device']),
]) ])
self.assertEqual(expected_suffix_calls, self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path']) set((r['ip'], r['path'])
for r in request_log.requests)) for r in request_log.requests))
expected_ssync_calls = sorted([ expected_ssync_calls = sorted([
('10.0.0.1', 0, set(['123', 'abc'])), (sync_to[0]['ip'], 0, set(['123', 'abc'])),
('10.0.0.2', 0, set(['123', 'abc'])), (sync_to[1]['ip'], 0, set(['123', 'abc'])),
]) ])
self.assertEqual(expected_ssync_calls, sorted(( self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'], c['node']['ip'],
@ -3596,30 +3629,30 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
) for c in ssync_calls)) ) for c in ssync_calls))
def test_process_job_sync_missing_durable(self): def test_process_job_sync_missing_durable(self):
replicas = self.policy.object_ring.replicas partition = 0
frag_index = random.randint( part_nodes = self.policy.object_ring.get_part_nodes(partition)
0, self.policy.ec_n_unique_fragments - 1) local_dev = random.choice(part_nodes)
sync_to = [n for n in self.policy.object_ring.devs frag_index = self.policy.get_backend_index(local_dev['index'])
if n != self.local_dev][:2] sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
# setup left and right hashes # setup left and right hashes
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
} }
# left hand side is in sync # left hand side is in sync
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas left_index = self.policy.get_backend_index(sync_to[0]['index'])
left_hashes = { left_hashes = {
'123': {left_index: 'hash', None: 'hash'}, '123': {left_index: 'hash', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'}, 'abc': {left_index: 'hash', None: 'hash'},
} }
# right hand side has fragment, but no durable (None key is whack) # right hand side has fragment, but no durable (None key is whack)
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas right_index = self.policy.get_backend_index(sync_to[1]['index'])
right_hashes = { right_hashes = {
'123': {right_index: 'hash', None: 'hash'}, '123': {right_index: 'hash', None: 'hash'},
'abc': {right_index: 'hash', None: 'different-because-durable'}, 'abc': {right_index: 'hash', None: 'different-because-durable'},
} }
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'], part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), diskfile.get_data_dir(self.policy),
str(partition)) str(partition))
@ -3647,16 +3680,16 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.reconstructor.process_job(job) self.reconstructor.process_job(job)
expected_suffix_calls = set([ expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'), (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
('10.0.0.2', '/sdc/0'), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
('10.0.0.2', '/sdc/0/abc'), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']),
]) ])
self.assertEqual(expected_suffix_calls, self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path']) set((r['ip'], r['path'])
for r in request_log.requests)) for r in request_log.requests))
expected_ssync_calls = sorted([ expected_ssync_calls = sorted([
('10.0.0.2', 0, ['abc']), (sync_to[1]['ip'], 0, ['abc']),
]) ])
self.assertEqual(expected_ssync_calls, sorted(( self.assertEqual(expected_ssync_calls, sorted((
c['node']['ip'], c['node']['ip'],
@ -3665,26 +3698,26 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
) for c in ssync_calls)) ) for c in ssync_calls))
def test_process_job_primary_some_in_sync(self): def test_process_job_primary_some_in_sync(self):
replicas = self.policy.object_ring.replicas partition = 0
frag_index = random.randint( part_nodes = self.policy.object_ring.get_part_nodes(partition)
0, self.policy.ec_n_unique_fragments - 1) local_dev = random.choice(part_nodes)
sync_to = [n for n in self.policy.object_ring.devs frag_index = self.policy.get_backend_index(local_dev['index'])
if n != self.local_dev][:2] sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
# setup left and right hashes # setup left and right hashes
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'}, 'abc': {frag_index: 'hash', None: 'hash'},
} }
left_index = sync_to[0]['index'] = (frag_index - 1) % replicas left_index = self.policy.get_backend_index(sync_to[0]['index'])
left_hashes = { left_hashes = {
'123': {left_index: 'hashX', None: 'hash'}, '123': {left_index: 'hashX', None: 'hash'},
'abc': {left_index: 'hash', None: 'hash'}, 'abc': {left_index: 'hash', None: 'hash'},
} }
right_index = sync_to[1]['index'] = (frag_index + 1) % replicas right_index = self.policy.get_backend_index(sync_to[1]['index'])
right_hashes = { right_hashes = {
'123': {right_index: 'hash', None: 'hash'}, '123': {right_index: 'hash', None: 'hash'},
} }
partition = 0
part_path = os.path.join(self.devices, self.local_dev['device'], part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), diskfile.get_data_dir(self.policy),
str(partition)) str(partition))
@ -3713,10 +3746,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.reconstructor.process_job(job) self.reconstructor.process_job(job)
expected_suffix_calls = set([ expected_suffix_calls = set([
('10.0.0.1', '/sdb/0'), (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
('10.0.0.1', '/sdb/0/123'), (sync_to[0]['ip'], '/%s/0/123' % sync_to[0]['device']),
('10.0.0.2', '/sdc/0'), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
('10.0.0.2', '/sdc/0/abc'), (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']),
]) ])
self.assertEqual(expected_suffix_calls, self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path']) set((r['ip'], r['path'])
@ -3726,8 +3759,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
dict(collections.Counter( dict(collections.Counter(
(c['node']['index'], tuple(c['suffixes'])) (c['node']['index'], tuple(c['suffixes']))
for c in ssync_calls)), for c in ssync_calls)),
{(left_index, ('123', )): 1, {(sync_to[0]['index'], ('123', )): 1,
(right_index, ('abc', )): 1}) (sync_to[1]['index'], ('abc', )): 1})
def test_process_job_primary_down(self): def test_process_job_primary_down(self):
partition = 0 partition = 0