Fixups for EC frag duplication tests

Follow up for related change:
- fix typos
- use common helper methods
- refactor some tests to reduce duplicate code

Related-Change: Idd155401982a2c48110c30b480966a863f6bd305

Change-Id: I2f91a2f31e4c1b11f3d685fa8166c1a25eb87429
This commit is contained in:
Alistair Coles 2017-02-25 20:28:13 -08:00
parent 40ba7f6172
commit e4972f5ac7
5 changed files with 98 additions and 122 deletions

View File

@ -1147,7 +1147,8 @@ def encode_frag_archive_bodies(policy, body):
# encode the buffers into fragment payloads
fragment_payloads = []
for chunk in chunks:
fragments = policy.pyeclib_driver.encode(chunk)
fragments = policy.pyeclib_driver.encode(chunk) \
* policy.ec_duplication_factor
if not fragments:
break
fragment_payloads.append(fragments)

View File

@ -2584,6 +2584,8 @@ cluster_dfw1 = http://dfw1.host/v1/
1: 1,
'2': 2,
'1024': 1024,
'0': ValueError,
'-1': ValueError,
'0x01': ValueError,
'asdf': ValueError,
None: ValueError,

View File

@ -42,7 +42,7 @@ from swift.obj.reconstructor import REVERT
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
FabricatedRing, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE)
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies)
from test.unit.obj.common import write_diskfile
@ -65,25 +65,6 @@ def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs):
yield fake_ssync
def make_ec_archive_bodies(policy, test_body):
segment_size = policy.ec_segment_size
# split up the body into buffers
chunks = [test_body[x:x + segment_size]
for x in range(0, len(test_body), segment_size)]
# encode the buffers into fragment payloads
fragment_payloads = []
for chunk in chunks:
fragments = \
policy.pyeclib_driver.encode(chunk) * policy.ec_duplication_factor
if not fragments:
break
fragment_payloads.append(fragments)
# join up the fragment payloads per node
ec_archive_bodies = [''.join(frags) for frags in zip(*fragment_payloads)]
return ec_archive_bodies
def _create_test_rings(path):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
@ -2636,7 +2617,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
responses = list()
@ -2702,7 +2683,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(4)
@ -2744,7 +2725,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(4)
@ -2774,7 +2755,7 @@ class TestObjectReconstructor(unittest.TestCase):
df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata))
fixed_body = ''.join(df.reader())
# ... this bad request should be treated like any other failure
# ... this bad response should be ignored like any other failure
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
@ -2797,7 +2778,7 @@ class TestObjectReconstructor(unittest.TestCase):
# segment size)
test_data = ('rebuild' * self.policy.ec_segment_size)[:-454]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
# the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4)
@ -2843,9 +2824,11 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# # of replicas failed and one more error log to report no enough
# # of replicas failed and one more error log to report not enough
# responses to reconstruct.
self.assertEqual(policy.object_ring.replicas, len(error_lines))
for line in error_lines[:-1]:
self.assertIn("Trying to GET", line)
self.assertIn(
'Unable to get enough responses (%s error responses)'
% (policy.object_ring.replicas - 1),
@ -2874,7 +2857,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
# only 1 log to report not enough responses
self.assertEqual(1, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
@ -2900,11 +2883,11 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
# bad response
broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
ts = make_timestamp_iter()
bad_headers = get_header_frag_index(self, broken_body)
bad_headers.update({
'X-Object-Sysmeta-Ec-Etag': 'some garbage',
@ -2920,8 +2903,8 @@ class TestObjectReconstructor(unittest.TestCase):
'X-Backend-Timestamp': t1})
responses.append((200, body, headers))
# mixed together
error_index = random.randint(0, self.policy.ec_ndata)
# include the one older frag with different etag in first responses
error_index = random.randint(0, self.policy.ec_ndata - 1)
error_headers = get_header_frag_index(self,
(responses[error_index])[1])
error_headers.update(bad_headers)
@ -2956,10 +2939,10 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
ts = make_timestamp_iter()
# good responses
responses = list()
@ -3016,7 +2999,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
@ -3037,12 +3020,12 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# one newer timestamp but same etag won't spoil the bunch
# N.B. (FIXIME). we choose the first response as garbage, the
# a response at same timestamp but different etag won't spoil the bunch
# N.B. (FIXME). if we choose the first response as garbage, the
# reconstruction fails because all other *correct* frags will be
# assumed as garbage. To avoid the freaky failing set randint
# as [1, self.policy.ec_ndata - 1] to make the first response
# being the correct fragment to reconstruct
# always have the correct etag to reconstruct
new_index = random.randint(1, self.policy.ec_ndata - 1)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage'})
@ -3057,7 +3040,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
# expect an error log but no warnings
error_log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
self.assertIn(
@ -3082,11 +3065,11 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
ts = make_timestamp_iter()
# create 3 different ec bodies
for i in range(3):
body = test_data[i:]
archive_bodies = make_ec_archive_bodies(self.policy, body)
archive_bodies = encode_frag_archive_bodies(self.policy, body)
# pop the index to the destination node
archive_bodies.pop(1)
ec_archive_dict[
@ -3118,7 +3101,7 @@ class TestObjectReconstructor(unittest.TestCase):
job, node, metadata)
error_lines = self.logger.get_lines_for_level('error')
# only 1 log to report no enough responses
# 1 error log per etag to report not enough responses
self.assertEqual(3, len(error_lines))
for error_line in error_lines:
for expected_etag, ts in ec_archive_dict:
@ -3155,7 +3138,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
@ -3190,7 +3173,7 @@ class TestObjectReconstructor(unittest.TestCase):
# ... and then, it should be skipped in the responses
# N.B. in the future, we could avoid those check because
# definately sending the copy rather than reconstruct will
# definitely sending the copy rather than reconstruct will
# save resources. But one more reason, we're avoiding to
# use the dest index fragment even if it goes to reconstruct
# function is that it will cause a bunch of warning log from
@ -3219,7 +3202,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# add some duplicates
@ -3274,7 +3257,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
@ -3300,7 +3283,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
# no errors
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the missing header
warning_log_lines = self.logger.get_lines_for_level('warning')
@ -3341,7 +3324,7 @@ class TestObjectReconstructor(unittest.TestCase):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
@ -3368,7 +3351,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no errorg
# no errors
self.assertFalse(self.logger.get_lines_for_level('error'))
# ...but warning for the invalid header
warning_log_lines = self.logger.get_lines_for_level('warning')
@ -3409,7 +3392,7 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
ec_archive_bodies = encode_frag_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(index)
@ -3428,11 +3411,11 @@ class TestObjectReconstructorECDuplicationFactor(TestObjectReconstructor):
called_headers.append(headers)
return orig_func(self, node, part, path, headers, policy)
# need to m + 1 node failures to reach 2nd set of duplicated fragments
# need parity + 1 node failures to reach duplicated fragments
failed_start_at = (
self.policy.ec_n_unique_fragments - self.policy.ec_nparity - 1)
# set Timeout for node #10, #11, #12, #13, #14
# set Timeout for node #9, #10, #11, #12, #13
for i in range(self.policy.ec_nparity + 1):
responses[failed_start_at + i] = (Timeout(), '', '')

View File

@ -2205,7 +2205,7 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
_part, primary_nodes = self.obj_ring.get_nodes('a', 'c', 'o')
node_key = lambda n: (n['ip'], n['port'])
backend_index = lambda index: self.policy.get_backend_index(index)
backend_index = self.policy.get_backend_index
ts = self._ts_iter.next()
response_map = {
@ -3654,41 +3654,24 @@ class TestECObjController(BaseObjectControllerMixin, unittest.TestCase):
class TestECFunctions(unittest.TestCase):
def test_chunk_transformer(self):
def do_test(dup):
segment_size = 1024
orig_chunk = 'a' * segment_size
policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=10),
ec_segment_size=segment_size)
expected = policy.pyeclib_driver.encode(orig_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform.send(None)
backend_chunks = transform.send(orig_chunk)
self.assertNotEqual(None, backend_chunks) # sanity
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected, backend_chunks)
def test_chunk_transformer_duplication_factor(self):
segment_size = 1024
orig_chunk = 'a' * segment_size
policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=20),
object_ring=FakeRing(replicas=10 * dup),
ec_segment_size=segment_size,
ec_duplication_factor=2)
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(orig_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
transform.send(None)
backend_chunks = transform.send(orig_chunk)
self.assertNotEqual(None, backend_chunks) # sanity
self.assertIsNotNone(backend_chunks) # sanity
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * 2, backend_chunks)
self.assertEqual(expected * dup, backend_chunks)
# flush out last chunk buffer
backend_chunks = transform.send('')
@ -3696,14 +3679,19 @@ class TestECFunctions(unittest.TestCase):
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual([''] * policy.object_ring.replica_count,
backend_chunks)
do_test(1)
do_test(2)
do_test(3)
def test_chunk_transformer_duplication_factor_non_aligned_last_chunk(self):
def test_chunk_transformer_non_aligned_last_chunk(self):
last_chunk = 'a' * 128
def do_test(dup):
policy = ECStoragePolicy(0, 'ec8-2', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=8, ec_nparity=2,
object_ring=FakeRing(replicas=20),
object_ring=FakeRing(replicas=10 * dup),
ec_segment_size=1024,
ec_duplication_factor=2)
ec_duplication_factor=dup)
expected = policy.pyeclib_driver.encode(last_chunk)
transform = obj.chunk_transformer(
policy, policy.object_ring.replica_count)
@ -3715,7 +3703,9 @@ class TestECFunctions(unittest.TestCase):
self.assertEqual(
len(backend_chunks), policy.object_ring.replica_count)
self.assertEqual(expected * 2, backend_chunks)
self.assertEqual(expected * dup, backend_chunks)
do_test(1)
do_test(2)
@patch_policies([ECStoragePolicy(0, name='ec', is_default=True,
@ -3767,7 +3757,7 @@ class TestECDuplicationObjController(
index = conn.resp.headers['X-Object-Sysmeta-Ec-Frag-Index']
collected_responses[etag].add(index)
# the backend requests should be >= num_data_fragmetns
# the backend requests should be >= num_data_fragments
self.assertGreaterEqual(len(log), self.policy.ec_ndata)
# but <= # of replicas
self.assertLessEqual(len(log), self.replicas())
@ -3820,9 +3810,9 @@ class TestECDuplicationObjController(
{'obj': obj, 'frag': 5},
{'obj': obj, 'frag': 6},
{'obj': obj, 'frag': 6},
{'obj': obj, 'frag': 7},
{'obj': obj, 'frag': 7},
# second half of # of replicas are 7, 8, 9, 10, 11, 12, 13
{'obj': obj, 'frag': 7},
{'obj': obj, 'frag': 7},
{'obj': obj, 'frag': 8},
{'obj': obj, 'frag': 8},
{'obj': obj, 'frag': 9},
@ -3865,7 +3855,7 @@ class TestECDuplicationObjController(
{'obj': obj2, 'frag': 8},
]
# ... and the rests are 404s which is limited by request_count
# (2 * replicas in default) rather than max_extra_count limitation
# (2 * replicas in default) rather than max_extra_requests limitation
# because the retries will be in ResumingGetter if the responses
# are 404s
node_frags += [[]] * (self.replicas() * 2 - len(node_frags))
@ -3917,7 +3907,7 @@ class TestECDuplicationObjController(
]
# ... and the rests are 404s which is limited by request_count
# (2 * replicas in default) rather than max_extra_count limitation
# (2 * replicas in default) rather than max_extra_requests limitation
# because the retries will be in ResumingGetter if the responses
# are 404s
node_frags += [[]] * (self.replicas() * 2 - len(node_frags))
@ -3991,7 +3981,7 @@ class TestECDuplicationObjController(
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
self.assertLessEqual(len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
@ -4108,7 +4098,7 @@ class TestECDuplicationObjController(
# ... regardless we should never need to fetch more than ec_ndata
# frags for any given etag
for etag, frags in collected_responses.items():
self.assertTrue(len(frags) <= self.policy.ec_ndata,
self.assertLessEqual(len(frags), self.policy.ec_ndata,
'collected %s frags for etag %s' % (
len(frags), etag))
@ -4332,7 +4322,7 @@ class TestECDuplicationObjController(
unique, self.policy.get_backend_index(duplicated)) # sanity
putters.pop(duplicated)
# pop one more frag ment too to make one missing hole
# pop one more fragment too to make one missing hole
putters.pop(one_more_missing)
# then determine chunk, we have 26 putters here and unique frag

View File

@ -5665,7 +5665,7 @@ class TestECMismatchedFA(unittest.TestCase):
environ={"REQUEST_METHOD": "PUT"},
headers={"X-Storage-Policy": "ec-dup", "X-Auth-Token": "t"})
resp = ensure_container.get_response(prosrv)
self.assertTrue(resp.status_int in (201, 202))
self.assertIn(resp.status_int, (201, 202))
obj1 = "first version..."
put_req1 = Request.blank(