Cleanup EC and SSYNC frag index parameters

An object node should reject a PUT with 409 when the timestamp is less
than or equal to the timestamp of an existing version of the object.

However, if the PUT is part of an SSYNC, and the fragment archive has a
different index than the one on disk we may store it.

We should store it we're the primary holder for that fragment index.

Back before the related change we used to revert fragments to handoffs
and it caused a lot of problems.  Mainly multiple frag indexes piling up
on one handoff node.  Eventually we settled on handoffs only reverting
to primaries but there was some crufty flailing left over.

When EC frag duplication (multi-region EC) came in we also added a new
complexity because a node's primary index (the index in part_nodes list)
was no longer universially equal to the EC frag index (the storage
policy backend end index).  There was a few places we assumed
node_index == frag_index, some of which caused bugs which we've fixed.

This change tries to clean all that up.

Related-Change-Id: Ie351d8342fc8e589b143f981e95ce74e70e52784

Change-Id: I3c5935e2d5f1cd140cf52df779596ebd6442686c
This commit is contained in:
Clay Gerrard 2019-01-30 14:39:12 -06:00
parent 1d4249ee9d
commit fb0e7837af
8 changed files with 195 additions and 86 deletions

View File

@ -369,14 +369,13 @@ class ObjectReconstructor(Daemon):
:returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed
"""
part_nodes = job['policy'].object_ring.get_part_nodes(
job['partition'])
part_nodes.remove(node)
# don't try and fetch a fragment from the node we're rebuilding to
part_nodes = [n for n in job['policy'].object_ring.get_part_nodes(
job['partition']) if n['id'] != node['id']]
# the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list
fi_to_rebuild = job['policy'].get_backend_index(node['index'])
fi_to_rebuild = node['backend_index']
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
@ -829,6 +828,8 @@ class ObjectReconstructor(Daemon):
syncd_with += 1
continue
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
# ssync any out-of-sync suffixes with the remote node
success, _ = ssync_sender(
self, node, job, suffixes)()
@ -850,6 +851,8 @@ class ObjectReconstructor(Daemon):
syncd_with = 0
reverted_objs = {}
for node in job['sync_to']:
node['backend_index'] = job['policy'].get_backend_index(
node['index'])
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
if success:

View File

@ -766,9 +766,8 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
raise HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
# SSYNC will include Frag-Index header for subrequests, in which case
# get_diskfile will ignore non-matching on-disk data files
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
next_part_power = request.headers.get('X-Backend-Next-Part-Power')
try:

View File

@ -219,7 +219,7 @@ class Receiver(object):
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
self.frag_index = self.node_index = None
self.frag_index = None
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
try:
self.frag_index = int(
@ -228,19 +228,6 @@ class Receiver(object):
raise swob.HTTPBadRequest(
'Invalid X-Backend-Ssync-Frag-Index %r' %
self.request.headers['X-Backend-Ssync-Frag-Index'])
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
try:
self.node_index = int(
self.request.headers['X-Backend-Ssync-Node-Index'])
except ValueError:
raise swob.HTTPBadRequest(
'Invalid X-Backend-Ssync-Node-Index %r' %
self.request.headers['X-Backend-Ssync-Node-Index'])
if self.node_index != self.frag_index:
# a primary node should only receive it's own fragments
raise swob.HTTPBadRequest(
'Frag-Index (%s) != Node-Index (%s)' % (
self.frag_index, self.node_index))
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if not self.diskfile_mgr.get_dev_path(self.device):
@ -476,9 +463,9 @@ class Receiver(object):
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if self.node_index is not None:
if self.frag_index is not None:
# primary node should not 409 if it has a non-primary fragment
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.frag_index
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
' '.join(replication_headers)

View File

@ -231,21 +231,14 @@ class Sender(object):
connection.putheader('Transfer-Encoding', 'chunked')
connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
# a sync job must use the node's index for the frag_index of the
# rebuilt fragments instead of the frag_index from the job which
# will be rebuilding them
frag_index = self.node.get('index', self.job.get('frag_index'))
if frag_index is None:
# replication jobs will not have a frag_index key;
# reconstructor jobs with only tombstones will have a
# frag_index key explicitly set to the value of None - in both
# cases on the wire we write the empty string which
# ssync_receiver will translate to None
frag_index = ''
connection.putheader('X-Backend-Ssync-Frag-Index', frag_index)
# a revert job to a handoff will not have a node index
connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index', ''))
# a sync job must use the node's backend_index for the frag_index
# of the rebuilt fragments instead of the frag_index from the job
# which will be rebuilding them
frag_index = self.node.get('backend_index')
if frag_index is not None:
connection.putheader('X-Backend-Ssync-Frag-Index', frag_index)
# Node-Index header is for backwards compat 2.4.0-2.20.0
connection.putheader('X-Backend-Ssync-Node-Index', frag_index)
connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):

View File

@ -3628,6 +3628,92 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
set(c['suffixes']),
) for c in ssync_calls))
def test_sync_duplicates_to_remote_region(self):
partition = 0
part_nodes = self.policy.object_ring.get_part_nodes(partition)
# in the non-duplicate case we just pick a random node
local_dev = random.choice(part_nodes[-14:])
frag_index = self.policy.get_backend_index(local_dev['index'])
sync_to = object_reconstructor._get_partners(
local_dev['index'], part_nodes)
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
str(partition))
# setup left and right hashes
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {frag_index: 'hash', None: 'hash'},
}
# left hand side is in sync
left_frag_index = self.policy.get_backend_index(sync_to[0]['index'])
left_hashes = {
'123': {left_frag_index: 'hash', None: 'hash'},
'abc': {left_frag_index: 'hash', None: 'hash'},
}
# right hand side needs sync
right_frag_index = self.policy.get_backend_index(sync_to[1]['index'])
right_hashes = {
'123': {right_frag_index: 'hash', None: 'hash'},
'abc': {right_frag_index: 'hashX', None: 'hash'},
}
job = {
'job_type': object_reconstructor.SYNC,
'frag_index': frag_index,
'suffixes': stub_hashes.keys(),
'sync_to': sync_to,
'partition': partition,
'path': part_path,
'hashes': stub_hashes,
'policy': self.policy,
'local_dev': self.local_dev,
'device': self.local_dev['device'],
}
responses = [
(200, pickle.dumps(left_hashes)),
(200, pickle.dumps(right_hashes)),
(200, pickle.dumps(right_hashes)),
]
codes, body_iter = zip(*responses)
# we're going to dip our mocks into the ssync layer a bit
ssync_resp = mock.MagicMock()
ssync_resp.status = 200
ssync_resp.readline.side_effect = [
':MISSING_CHECK: START',
':MISSING_CHECK: END',
':UPDATES: START',
':UPDATES: END',
]
ssync_headers = []
def capture_headers(name, value):
ssync_headers.append((name, value))
ssync_conn = mock.MagicMock()
ssync_conn.getresponse.return_value = ssync_resp
ssync_conn.putheader = capture_headers
with mock.patch('swift.obj.ssync_sender.SsyncBufferedHTTPConnection',
return_value=ssync_conn), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mock.patch('swift.obj.diskfile.ECDiskFileManager.yield_hashes',
return_value=iter([])), \
mocked_http_conn(*codes, body_iter=body_iter):
self.reconstructor.process_job(job)
# ... to make sure it sets up our headers correctly
self.assertEqual(ssync_headers, [
('Transfer-Encoding', 'chunked'),
('X-Backend-Storage-Policy-Index', 0),
('X-Backend-Ssync-Frag-Index', right_frag_index),
# we include this for backwards compat
('X-Backend-Ssync-Node-Index', right_frag_index),
])
def test_process_job_sync_missing_durable(self):
partition = 0
part_nodes = self.policy.object_ring.get_part_nodes(partition)
@ -4101,9 +4187,10 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
self.assertEqual(self.reconstructor.handoffs_remaining, 0)
def test_process_job_revert_cleanup_tombstone(self):
sync_to = [random.choice([n for n in self.policy.object_ring.devs
if n != self.local_dev])]
partition = 0
sync_to = [random.choice([
n for n in self.policy.object_ring.get_part_nodes(partition)
if n['id'] != self.local_dev['id']])]
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy),
@ -4205,6 +4292,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4268,6 +4356,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4304,6 +4393,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4349,6 +4439,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[-4]
node['backend_index'] = self.policy.get_backend_index(node['index'])
# make up some data (trim some amount to make it unaligned with
# segment size)
@ -4385,6 +4476,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
policy = self.policy
possible_errors = [Timeout(), Exception('kaboom!')]
@ -4414,6 +4506,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
policy = self.policy
codes = [404 for i in range(policy.object_ring.replicas - 1)]
@ -4438,6 +4531,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4488,6 +4582,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4542,6 +4637,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4603,6 +4699,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
@ -4677,7 +4774,9 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
broken_node = random.randint(0, self.policy.ec_ndata - 1)
broken_index = random.randint(0, self.policy.ec_ndata - 1)
node = part_nodes[broken_index]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4685,7 +4784,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[broken_node]
broken_body = ec_archive_bodies[broken_index]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
@ -4698,7 +4797,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, part_nodes[broken_node], self.obj_metadata)
job, node, self.obj_metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
@ -4711,7 +4810,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
debug_log_lines = self.logger.get_lines_for_level('debug')
# redundant frag found once in first ec_ndata responses
self.assertIn(
'Found existing frag #%s at' % broken_node,
'Found existing frag #%s at' % broken_index,
debug_log_lines[0])
# N.B. in the future, we could avoid those check because
@ -4722,12 +4821,12 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870
log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node
log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_index
self.assertIn(log_prefix, debug_log_lines[1])
self.assertFalse(debug_log_lines[2:])
got_frag_index_list = json.loads(
debug_log_lines[1][len(log_prefix):])
self.assertNotIn(broken_node, got_frag_index_list)
self.assertNotIn(broken_index, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
@ -4736,6 +4835,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4785,6 +4885,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4854,6 +4955,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
@ -4928,6 +5030,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[index]
node['backend_index'] = self.policy.get_backend_index(node['index'])
metadata = {
'name': '/a/c/o',
'Content-Length': 0,

View File

@ -312,6 +312,7 @@ class TestBaseSsyncEC(TestBaseSsync):
self.policy = POLICIES.default
self.logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReconstructor(self.daemon_conf, self.logger)
self.rx_node['backend_index'] = 0
def _get_object_data(self, path, frag_index=None, **kwargs):
# return a frag archive for given object name and frag index.
@ -378,7 +379,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -485,7 +485,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -583,7 +582,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'frag_index': frag_index,
'sync_diskfile_builder': fake_reconstruct_fa}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -667,10 +665,11 @@ class TestSsyncEC(TestBaseSsyncEC):
def test_send_invalid_frag_index(self):
policy = POLICIES.default
job = {'frag_index': 'Not a number',
job = {'frag_index': 'No one cares',
'device': self.device,
'partition': self.partition,
'policy': policy}
self.rx_node['backend_index'] = 'Not a number'
sender = ssync_sender.Sender(
self.daemon, self.rx_node, job, ['abc'])
success, _ = sender()
@ -713,7 +712,6 @@ class TestSsyncEC(TestBaseSsyncEC):
'policy': policy,
'frag_index': frag_index}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -808,7 +806,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
os.path.basename(os.path.dirname(df._datadir)))
self.job_node = dict(self.rx_node)
self.job_node['index'] = self.rx_node_index
self.job_node['id'] = 0
self.frag_length = int(
self.tx_objs['o1'][0].get_metadata()['Content-Length'])
@ -1082,7 +1080,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create sender side diskfiles...
tx_objs = {}
@ -1136,7 +1133,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1204,7 +1200,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_meta_file_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
@ -1309,7 +1304,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1352,7 +1346,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_expired_object(self):
# verify that expired objects sync
policy = POLICIES.default
rx_node_index = 0
tx_df_mgr = self.daemon._df_router[policy]
t1 = next(self.ts_iter)
obj_name = 'o1'
@ -1370,7 +1363,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1387,7 +1379,6 @@ class TestSsyncReplication(TestBaseSsync):
def _check_no_longer_expired_object(self, obj_name, df, policy):
# verify that objects with x-delete-at metadata that are not expired
# can be sync'd
rx_node_index = 0
def do_ssync():
# create ssync sender instance...
@ -1396,7 +1387,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1480,7 +1470,6 @@ class TestSsyncReplication(TestBaseSsync):
# verify that the sender does sync a data file to a legacy receiver,
# but does not PUT meta file content to a legacy receiver
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_df_mgr = self.daemon._df_router[policy]
@ -1504,7 +1493,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1563,7 +1551,6 @@ class TestSsyncReplication(TestBaseSsync):
def test_content_type_sync(self):
policy = POLICIES.default
rx_node_index = 0
# create diskfiles...
tx_objs = {}
@ -1675,7 +1662,6 @@ class TestSsyncReplication(TestBaseSsync):
'partition': self.partition,
'policy': policy}
node = dict(self.rx_node)
node.update({'index': rx_node_index})
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
# wrap connection from tx to rx to capture ssync messages...
sender.connect, trace = self.make_connect_wrapper(sender)
@ -1721,5 +1707,6 @@ class TestSsyncReplication(TestBaseSsync):
self.device, self.partition, suffixes, policy)
self.assertEqual(tx_hashes, rx_hashes)
if __name__ == '__main__':
unittest.main()

View File

@ -211,7 +211,6 @@ class TestReceiver(unittest.TestCase):
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertIsNone(rcvr.node_index)
@unit.patch_policies()
def test_Receiver_with_only_node_index_header(self):
@ -226,13 +225,17 @@ class TestReceiver(unittest.TestCase):
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
with self.assertRaises(HTTPException) as e:
ssync_receiver.Receiver(self.controller, req)
self.assertEqual(e.exception.status_int, 400)
# if a node index is included - it *must* be
# the same value of frag index
self.assertEqual(e.exception.body,
'Frag-Index (None) != Node-Index (7)')
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
# we used to require the reconstructor to send the frag_index twice as
# two different headers because of evolutionary reasons, now we ignore
# node_index
self.assertEqual(rcvr.frag_index, None)
@unit.patch_policies()
def test_Receiver_with_matched_indexes(self):
@ -256,7 +259,6 @@ class TestReceiver(unittest.TestCase):
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertEqual(rcvr.node_index, 7)
@unit.patch_policies()
def test_Receiver_with_invalid_indexes(self):
@ -289,8 +291,16 @@ class TestReceiver(unittest.TestCase):
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
self.assertRaises(HTTPException, ssync_receiver.Receiver,
self.controller, req)
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
# node_index if provided should always match frag_index; but if they
# differ, frag_index takes precedence
self.assertEqual(rcvr.frag_index, 7)
def test_SSYNC_replication_lock_fail(self):
def _mock(path, policy, partition):
@ -2057,7 +2067,8 @@ class TestSsyncRxServer(unittest.TestCase):
sender = ssync_sender.Sender(self.daemon, node, job, ['abc'])
# kick off the sender and let the error trigger failure
with mock.patch('swift.obj.ssync_receiver.Receiver.initialize_request')\
with mock.patch(
'swift.obj.ssync_receiver.Receiver.initialize_request') \
as mock_initialize_request:
mock_initialize_request.side_effect = \
swob.HTTPInternalServerError()

View File

@ -182,7 +182,7 @@ class TestSender(BaseTest):
def test_connect(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', index=0)
device='sda1', backend_index=0)
job = dict(partition='9', policy=POLICIES[1])
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
@ -236,8 +236,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', 9),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
@ -270,8 +268,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 0),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
@ -304,8 +300,40 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}
for method_name, expected_calls in expectations.items():
mock_method = getattr(mock_conn, method_name)
self.assertEqual(expected_calls, mock_method.mock_calls,
'connection method "%s" got %r not %r' % (
method_name, mock_method.mock_calls,
expected_calls))
def test_connect_handoff_none_frag_to_primary(self):
node = dict(replication_ip='1.2.3.4', replication_port=5678,
device='sda1', backend_index=42)
job = dict(partition='9', policy=POLICIES[1], frag_index=None)
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
mock_resp.status = 200
mock_conn.getresponse.return_value = mock_resp
self.sender.connect()
mock_conn_class.assert_called_once_with('1.2.3.4:5678')
expectations = {
'putrequest': [
mock.call('SSYNC', '/sda1/9'),
],
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', 42),
mock.call('X-Backend-Ssync-Node-Index', 42),
],
'endheaders': [mock.call()],
}
@ -339,8 +367,6 @@ class TestSender(BaseTest):
'putheader': [
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', ''),
mock.call('X-Backend-Ssync-Node-Index', ''),
],
'endheaders': [mock.call()],
}