diff --git a/test/unit/proxy/test_mem_server.py b/test/unit/proxy/test_mem_server.py index 2221ee926e..336b98b766 100644 --- a/test/unit/proxy/test_mem_server.py +++ b/test/unit/proxy/test_mem_server.py @@ -33,7 +33,8 @@ class TestProxyServer(test_server.TestProxyServer): pass -class TestObjectController(test_server.TestObjectController): +class TestReplicatedObjectController( + test_server.TestReplicatedObjectController): def test_PUT_no_etag_fallocate(self): # mem server doesn't call fallocate(), believe it or not pass @@ -42,6 +43,8 @@ class TestObjectController(test_server.TestObjectController): def test_policy_IO(self): pass + +class TestECObjectController(test_server.TestECObjectController): def test_PUT_ec(self): pass diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 6136ee96f7..8839d7e92b 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1143,7 +1143,63 @@ class TestProxyServerLoading(unittest.TestCase): self.assertTrue(policy.object_ring) -class ECTestMixin(object): +class BaseTestObjectController(object): + """ + A root of TestObjController that implements helper methods for child + TestObjControllers. + """ + def setUp(self): + self.app = proxy_server.Application( + None, FakeMemcache(), + logger=debug_logger('proxy-ut'), + account_ring=FakeRing(), + container_ring=FakeRing()) + # clear proxy logger result for each test + _test_servers[0].logger._clear() + + def tearDown(self): + self.app.account_ring.set_replicas(3) + self.app.container_ring.set_replicas(3) + for policy in POLICIES: + policy.object_ring = FakeRing(base_port=3000) + + def assert_status_map(self, method, statuses, expected, raise_exc=False): + with save_globals(): + kwargs = {} + if raise_exc: + kwargs['raise_exc'] = raise_exc + + set_http_connect(*statuses, **kwargs) + self.app.memcache.store = {} + req = Request.blank('/v1/a/c/o', + headers={'Content-Length': '0', + 'Content-Type': 'text/plain'}) + self.app.update_request(req) + try: + res = method(req) + except HTTPException as res: + pass + self.assertEqual(res.status_int, expected) + + # repeat test + set_http_connect(*statuses, **kwargs) + self.app.memcache.store = {} + req = Request.blank('/v1/a/c/o', + headers={'Content-Length': '0', + 'Content-Type': 'text/plain'}) + self.app.update_request(req) + try: + res = method(req) + except HTTPException as res: + pass + self.assertEqual(res.status_int, expected) + + def _sleep_enough(self, condition): + for sleeptime in (0.1, 1.0): + sleep(sleeptime) + if condition(): + break + def put_container(self, policy_name, container_name): # Note: only works if called with unpatched policies prolis = _test_sockets[0] @@ -1161,494 +1217,6 @@ class ECTestMixin(object): exp = 'HTTP/1.1 2' self.assertEqual(headers[:len(exp)], exp) - @unpatch_policies - def test_PUT_ec(self): - policy = POLICIES[self.ec_policy_index] - self.put_container(policy.name, policy.name) - - obj = 'abCD' * 10 # small, so we don't get multiple EC stripes - prolis = _test_sockets[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/o1 HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: "%s"\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (policy.name, md5(obj).hexdigest(), - len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - ecd = policy.pyeclib_driver - expected_pieces = set(ecd.encode(obj)) - - # go to disk to make sure it's there and all erasure-coded - partition, nodes = policy.object_ring.get_nodes('a', policy.name, 'o1') - conf = {'devices': _testdir, 'mount_check': 'false'} - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[policy] - - got_pieces = set() - got_indices = set() - got_durable = [] - for node_index, node in enumerate(nodes): - df = df_mgr.get_diskfile(node['device'], partition, - 'a', policy.name, 'o1', - policy=policy) - with df.open(): - meta = df.get_metadata() - contents = ''.join(df.reader()) - got_pieces.add(contents) - - lmeta = dict((k.lower(), v) for k, v in meta.items()) - got_indices.add( - lmeta['x-object-sysmeta-ec-frag-index']) - - self.assertEqual( - lmeta['x-object-sysmeta-ec-etag'], - md5(obj).hexdigest()) - self.assertEqual( - lmeta['x-object-sysmeta-ec-content-length'], - str(len(obj))) - self.assertEqual( - lmeta['x-object-sysmeta-ec-segment-size'], - '4096') - self.assertEqual( - lmeta['x-object-sysmeta-ec-scheme'], - '%s 2+1' % DEFAULT_TEST_EC_TYPE) - self.assertEqual( - lmeta['etag'], - md5(contents).hexdigest()) - - # check presence for a durable data file for the timestamp - durable_file = ( - utils.Timestamp(df.timestamp).internal + - '#%s' % lmeta['x-object-sysmeta-ec-frag-index'] + - '#d.data') - durable_file = os.path.join( - _testdir, node['device'], storage_directory( - diskfile.get_data_dir(policy), - partition, hash_path('a', policy.name, 'o1')), - durable_file) - if os.path.isfile(durable_file): - got_durable.append(True) - - self.assertEqual(expected_pieces, got_pieces) - self.assertEqual(set(('0', '1', '2')), got_indices) - - # verify at least 2 puts made it all the way to the end of 2nd - # phase, ie at least 2 durable statuses were written - num_durable_puts = sum(d is True for d in got_durable) - self.assertGreaterEqual(num_durable_puts, 2) - - @unpatch_policies - def test_PUT_ec_multiple_segments(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - pyeclib_header_size = len(ec_policy.pyeclib_driver.encode("")[0]) - segment_size = ec_policy.ec_segment_size - - # Big enough to have multiple segments. Also a multiple of the - # segment size to get coverage of that path too. - obj = 'ABC' * segment_size - - prolis = _test_sockets[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/o2 HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - # it's a 2+1 erasure code, so each fragment archive should be half - # the length of the object, plus three inline pyeclib metadata - # things (one per segment) - expected_length = (len(obj) / 2 + pyeclib_header_size * 3) - - partition, nodes = ec_policy.object_ring.get_nodes( - 'a', ec_policy.name, 'o2') - - conf = {'devices': _testdir, 'mount_check': 'false'} - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] - - got_durable = [] - fragment_archives = [] - for node in nodes: - df = df_mgr.get_diskfile( - node['device'], partition, 'a', - ec_policy.name, 'o2', policy=ec_policy) - with df.open(): - meta = df.get_metadata() - contents = ''.join(df.reader()) - fragment_archives.append(contents) - self.assertEqual(len(contents), expected_length) - - durable_file = ( - utils.Timestamp(df.timestamp).internal + - '#%s' % meta['X-Object-Sysmeta-Ec-Frag-Index'] + - '#d.data') - durable_file = os.path.join( - _testdir, node['device'], storage_directory( - diskfile.get_data_dir(ec_policy), - partition, hash_path('a', ec_policy.name, 'o2')), - durable_file) - if os.path.isfile(durable_file): - got_durable.append(True) - - # Verify that we can decode each individual fragment and that they - # are all the correct size - fragment_size = ec_policy.fragment_size - nfragments = int( - math.ceil(float(len(fragment_archives[0])) / fragment_size)) - - for fragment_index in range(nfragments): - fragment_start = fragment_index * fragment_size - fragment_end = (fragment_index + 1) * fragment_size - - try: - frags = [fa[fragment_start:fragment_end] - for fa in fragment_archives] - seg = ec_policy.pyeclib_driver.decode(frags) - except ECDriverError: - self.fail("Failed to decode fragments %d; this probably " - "means the fragments are not the sizes they " - "should be" % fragment_index) - - segment_start = fragment_index * segment_size - segment_end = (fragment_index + 1) * segment_size - - self.assertEqual(seg, obj[segment_start:segment_end]) - - # verify at least 2 puts made it all the way to the end of 2nd - # phase, ie at least 2 .durable statuses were written - num_durable_puts = sum(d is True for d in got_durable) - self.assertGreaterEqual(num_durable_puts, 2) - - @unpatch_policies - def test_PUT_ec_object_etag_mismatch(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - obj = '90:6A:02:60:B1:08-96da3e706025537fc42464916427727e' - prolis = _test_sockets[0] - prosrv = _test_servers[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/o3 HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: %s\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, - md5('something else').hexdigest(), - len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 422' - self.assertEqual(headers[:len(exp)], exp) - - # nothing should have made it to disk on the object servers - partition, nodes = prosrv.get_object_ring( - int(ec_policy)).get_nodes('a', ec_policy.name, 'o3') - conf = {'devices': _testdir, 'mount_check': 'false'} - - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] - - for node in nodes: - df = df_mgr.get_diskfile(node['device'], partition, - 'a', ec_policy.name, 'o3', - policy=ec_policy) - self.assertRaises(DiskFileNotExist, df.open) - - @unpatch_policies - def test_PUT_ec_fragment_archive_etag_mismatch(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - # Cause a hash mismatch by feeding one particular MD5 hasher some - # extra data. The goal here is to get exactly more than one of the - # hashers in an object server. - count = (ec_policy.object_ring.replica_count - ec_policy.ec_ndata) - countdown = [count] - - def busted_md5_constructor(initial_str=""): - hasher = md5(initial_str) - if countdown[0] > 0: - hasher.update('wrong') - countdown[0] -= 1 - return hasher - - obj = 'uvarovite-esurience-cerated-symphysic' - prolis = _test_sockets[0] - prosrv = _test_servers[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - with mock.patch('swift.obj.server.md5', busted_md5_constructor): - fd = sock.makefile() - fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: %s\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), - len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 503' # no quorum - self.assertEqual(headers[:len(exp)], exp) - - # replica count - 1 of the fragment archives should have - # landed on disk - partition, nodes = prosrv.get_object_ring( - int(ec_policy)).get_nodes('a', ec_policy.name, 'pimento') - conf = {'devices': _testdir, 'mount_check': 'false'} - - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] - - found = 0 - for node in nodes: - df = df_mgr.get_diskfile(node['device'], partition, - 'a', ec_policy.name, 'pimento', - policy=ec_policy) - try: - # diskfile open won't succeed because no durable was written, - # so look under the hood for data files. - files = os.listdir(df._datadir) - if len(files) > 0: - # Although the third fragment archive hasn't landed on - # disk, the directory df._datadir is pre-maturely created - # and is empty when we use O_TMPFILE + linkat() - num_data_files = \ - len([f for f in files if f.endswith('.data')]) - self.assertEqual(1, num_data_files) - found += 1 - except OSError: - pass - self.assertEqual(found, ec_policy.ec_ndata) - - @unpatch_policies - def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container("ec", "ec-con") - - def busted_md5_constructor(initial_str=""): - hasher = md5(initial_str) - hasher.update('wrong') - return hasher - - obj = 'uvarovite-esurience-cerated-symphysic' - prolis = _test_sockets[0] - prosrv = _test_servers[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - - call_count = [0] - - def mock_committer(self): - call_count[0] += 1 - - commit_confirmation = \ - 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' - - with mock.patch('swift.obj.server.md5', busted_md5_constructor), \ - mock.patch(commit_confirmation, mock_committer): - fd = sock.makefile() - fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: %s\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 503' # no quorum - self.assertEqual(headers[:len(exp)], exp) - # Don't send commit to object-server if quorum responses consist of 4xx - self.assertEqual(0, call_count[0]) - - # no fragment archives should have landed on disk - partition, nodes = prosrv.get_object_ring(3).get_nodes( - 'a', 'ec-con', 'quorum') - conf = {'devices': _testdir, 'mount_check': 'false'} - - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] - - for node in nodes: - df = df_mgr.get_diskfile(node['device'], partition, - 'a', 'ec-con', 'quorum', - policy=POLICIES[3]) - if os.path.exists(df._datadir): - self.assertFalse(os.listdir(df._datadir)) # should be empty - - @unpatch_policies - def test_PUT_ec_fragment_quorum_bad_request(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container("ec", "ec-con") - - obj = 'uvarovite-esurience-cerated-symphysic' - prolis = _test_sockets[0] - prosrv = _test_servers[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - - call_count = [0] - - def mock_committer(self): - call_count[0] += 1 - - read_footer = \ - 'swift.obj.server.ObjectController._read_metadata_footer' - commit_confirmation = \ - 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' - - with mock.patch(read_footer) as read_footer_call, \ - mock.patch(commit_confirmation, mock_committer): - # Emulate missing footer MIME doc in all object-servers - read_footer_call.side_effect = HTTPBadRequest( - body="couldn't find footer MIME doc") - - fd = sock.makefile() - fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: %s\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - - # Don't show a result of the bad conversation between proxy-server - # and object-server - exp = 'HTTP/1.1 503' - self.assertEqual(headers[:len(exp)], exp) - # Don't send commit to object-server if quorum responses consist of 4xx - self.assertEqual(0, call_count[0]) - - # no fragment archives should have landed on disk - partition, nodes = prosrv.get_object_ring(3).get_nodes( - 'a', 'ec-con', 'quorum') - conf = {'devices': _testdir, 'mount_check': 'false'} - - df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] - - for node in nodes: - df = df_mgr.get_diskfile(node['device'], partition, - 'a', 'ec-con', 'quorum', - policy=POLICIES[3]) - if os.path.exists(df._datadir): - self.assertFalse(os.listdir(df._datadir)) # should be empty - - @unpatch_policies - def test_PUT_ec_if_none_match(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - obj = 'ananepionic-lepidophyllous-ropewalker-neglectful' - prolis = _test_sockets[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Etag: "%s"\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), - len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'If-None-Match: *\r\n' - 'Etag: "%s"\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), - len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 412' - self.assertEqual(headers[:len(exp)], exp) - - @unpatch_policies - def test_GET_ec(self): - prolis = _test_sockets[0] - prosrv = _test_servers[0] - - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - obj = '0123456' * 11 * 17 - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/go-get-it HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'X-Object-Meta-Color: chartreuse\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('GET /v1/a/%s/go-get-it HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 200' - self.assertEqual(headers[:len(exp)], exp) - - headers = parse_headers_string(headers) - self.assertEqual(str(len(obj)), headers['Content-Length']) - self.assertEqual(md5(obj).hexdigest(), headers['Etag']) - self.assertEqual('chartreuse', headers['X-Object-Meta-Color']) - - gotten_obj = '' - while True: - buf = fd.read(64) - if not buf: - break - gotten_obj += buf - self.assertEqual(gotten_obj, obj) - error_lines = prosrv.logger.get_lines_for_level('error') - warn_lines = prosrv.logger.get_lines_for_level('warning') - self.assertEqual(len(error_lines), 0) # sanity - self.assertEqual(len(warn_lines), 0) # sanity - def _test_conditional_GET(self, policy): container_name = uuid.uuid4().hex object_path = '/v1/a/%s/conditionals' % container_name @@ -1738,403 +1306,14 @@ class ECTestMixin(object): self.assertEqual(len(error_lines), 0) # sanity self.assertEqual(len(warn_lines), 0) # sanity - @unpatch_policies - def test_conditional_GET_ec(self): - policy = POLICIES[self.ec_policy_index] - self.assertEqual('erasure_coding', policy.policy_type) # sanity - self._test_conditional_GET(policy) - - @unpatch_policies - def test_GET_ec_big(self): - prolis = _test_sockets[0] - prosrv = _test_servers[0] - - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - # our EC segment size is 4 KiB, so this is multiple (3) segments; - # we'll verify that with a sanity check - obj = 'a moose once bit my sister' * 400 - self.assertGreater( - len(obj), ec_policy.ec_segment_size * 2, - "object is too small for proper testing") - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/big-obj-get HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('GET /v1/a/%s/big-obj-get HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 200' - self.assertEqual(headers[:len(exp)], exp) - - headers = parse_headers_string(headers) - self.assertEqual(str(len(obj)), headers['Content-Length']) - self.assertEqual(md5(obj).hexdigest(), headers['Etag']) - - gotten_obj = '' - while True: - buf = fd.read(64) - if not buf: - break - gotten_obj += buf - # This may look like a redundant test, but when things fail, this - # has a useful failure message while the subsequent one spews piles - # of garbage and demolishes your terminal's scrollback buffer. - self.assertEqual(len(gotten_obj), len(obj)) - self.assertEqual(gotten_obj, obj) - error_lines = prosrv.logger.get_lines_for_level('error') - warn_lines = prosrv.logger.get_lines_for_level('warning') - self.assertEqual(len(error_lines), 0) # sanity - self.assertEqual(len(warn_lines), 0) # sanity - - @unpatch_policies - def test_GET_ec_failure_handling(self): - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - obj = 'look at this object; it is simply amazing ' * 500 - prolis = _test_sockets[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/crash-test-dummy HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - def explodey_iter(inner_iter): - yield next(inner_iter) - raise Exception("doom ba doom") - - def explodey_doc_parts_iter(inner_iter_iter): - try: - for item in inner_iter_iter: - item = item.copy() # paranoia about mutable data - item['part_iter'] = explodey_iter(item['part_iter']) - yield item - except GeneratorExit: - inner_iter_iter.close() - raise - - real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter - - def explodey_ec_app_iter(path, policy, iterators, *a, **kw): - # Each thing in `iterators` here is a document-parts iterator, - # and we want to fail after getting a little into each part. - # - # That way, we ensure we've started streaming the response to - # the client when things go wrong. - return real_ec_app_iter( - path, policy, - [explodey_doc_parts_iter(i) for i in iterators], - *a, **kw) - - with mock.patch("swift.proxy.controllers.obj.ECAppIter", - explodey_ec_app_iter): - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('GET /v1/a/%s/crash-test-dummy HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 200' - self.assertEqual(headers[:len(exp)], exp) - - headers = parse_headers_string(headers) - self.assertEqual(str(len(obj)), headers['Content-Length']) - self.assertEqual(md5(obj).hexdigest(), headers['Etag']) - - gotten_obj = '' - try: - # don't hang the test run when this fails - with Timeout(300): - while True: - buf = fd.read(64) - if not buf: - break - gotten_obj += buf - except Timeout: - self.fail("GET hung when connection failed") - - # Ensure we failed partway through, otherwise the mocks could - # get out of date without anyone noticing - self.assertTrue(0 < len(gotten_obj) < len(obj)) - - @unpatch_policies - def test_HEAD_ec(self): - prolis = _test_sockets[0] - prosrv = _test_servers[0] - - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - obj = '0123456' * 11 * 17 - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('PUT /v1/a/%s/go-head-it HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'Content-Length: %d\r\n' - 'X-Storage-Token: t\r\n' - 'X-Object-Meta-Color: chartreuse\r\n' - 'Content-Type: application/octet-stream\r\n' - '\r\n%s' % (ec_policy.name, len(obj), obj)) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 201' - self.assertEqual(headers[:len(exp)], exp) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('HEAD /v1/a/%s/go-head-it HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 200' - self.assertEqual(headers[:len(exp)], exp) - - headers = parse_headers_string(headers) - self.assertEqual(str(len(obj)), headers['Content-Length']) - self.assertEqual(md5(obj).hexdigest(), headers['Etag']) - self.assertEqual('chartreuse', headers['X-Object-Meta-Color']) - - error_lines = prosrv.logger.get_lines_for_level('error') - warn_lines = prosrv.logger.get_lines_for_level('warning') - self.assertEqual(len(error_lines), 0) # sanity - self.assertEqual(len(warn_lines), 0) # sanity - - @unpatch_policies - def test_GET_ec_404(self): - prolis = _test_sockets[0] - prosrv = _test_servers[0] - - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('GET /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 404' - self.assertEqual(headers[:len(exp)], exp) - - error_lines = prosrv.logger.get_lines_for_level('error') - warn_lines = prosrv.logger.get_lines_for_level('warning') - self.assertEqual(len(error_lines), 0) # sanity - self.assertEqual(len(warn_lines), 0) # sanity - - @unpatch_policies - def test_HEAD_ec_404(self): - prolis = _test_sockets[0] - prosrv = _test_servers[0] - - ec_policy = POLICIES[self.ec_policy_index] - self.put_container(ec_policy.name, ec_policy.name) - - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write('HEAD /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n' - 'Host: localhost\r\n' - 'Connection: close\r\n' - 'X-Storage-Token: t\r\n' - '\r\n' % ec_policy.name) - fd.flush() - headers = readuntil2crlfs(fd) - exp = 'HTTP/1.1 404' - self.assertEqual(headers[:len(exp)], exp) - - error_lines = prosrv.logger.get_lines_for_level('error') - warn_lines = prosrv.logger.get_lines_for_level('warning') - self.assertEqual(len(error_lines), 0) # sanity - self.assertEqual(len(warn_lines), 0) # sanity - - @unpatch_policies - def test_reload_ring_ec(self): - policy = POLICIES[self.ec_policy_index] - self.put_container("ec", "ec-con") - - orig_rtime = policy.object_ring._rtime - orig_replica_count = policy.object_ring.replica_count - # save original file as back up - copyfile(policy.object_ring.serialized_path, - policy.object_ring.serialized_path + '.bak') - - try: - # overwrite with 2 replica, 2 devices ring - obj_devs = [] - obj_devs.append( - {'port': _test_sockets[-3].getsockname()[1], - 'device': 'sdg1'}) - obj_devs.append( - {'port': _test_sockets[-2].getsockname()[1], - 'device': 'sdh1'}) - write_fake_ring(policy.object_ring.serialized_path, - *obj_devs) - - def get_ring_reloaded_response(method): - # force to reload at the request - policy.object_ring._rtime = 0 - - trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method, - 'Host: localhost\r\n', - 'Connection: close\r\n', - 'X-Storage-Token: t\r\n'] - - if method == 'PUT': - # small, so we don't get multiple EC stripes - obj = 'abCD' * 10 - - extra_trans_data = [ - 'Etag: "%s"\r\n' % md5(obj).hexdigest(), - 'Content-Length: %d\r\n' % len(obj), - 'Content-Type: application/octet-stream\r\n', - '\r\n%s' % obj - ] - trans_data.extend(extra_trans_data) - else: - trans_data.append('\r\n') - - prolis = _test_sockets[0] - sock = connect_tcp(('localhost', prolis.getsockname()[1])) - fd = sock.makefile() - fd.write(''.join(trans_data)) - fd.flush() - headers = readuntil2crlfs(fd) - - # use older ring with rollbacking - return headers - - for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'): - headers = get_ring_reloaded_response(method) - exp = 'HTTP/1.1 20' - self.assertEqual(headers[:len(exp)], exp) - - # proxy didn't load newest ring, use older one - self.assertEqual(orig_replica_count, - policy.object_ring.replica_count) - - if method == 'POST': - # Take care fast post here! - orig_post_as_copy = getattr( - _test_servers[0], 'object_post_as_copy', None) - try: - _test_servers[0].object_post_as_copy = False - with mock.patch.object( - _test_servers[0], - 'object_post_as_copy', False): - headers = get_ring_reloaded_response(method) - finally: - if orig_post_as_copy is None: - del _test_servers[0].object_post_as_copy - else: - _test_servers[0].object_post_as_copy = \ - orig_post_as_copy - - exp = 'HTTP/1.1 20' - self.assertEqual(headers[:len(exp)], exp) - # sanity - self.assertEqual(orig_replica_count, - policy.object_ring.replica_count) - - finally: - policy.object_ring._rtime = orig_rtime - os.rename(policy.object_ring.serialized_path + '.bak', - policy.object_ring.serialized_path) - @patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing(base_port=3000))]) -class TestObjectController(ECTestMixin, unittest.TestCase): - ec_policy_index = 3 - - def setUp(self): - self.app = proxy_server.Application( - None, FakeMemcache(), - logger=debug_logger('proxy-ut'), - account_ring=FakeRing(), - container_ring=FakeRing()) - # clear proxy logger result for each test - _test_servers[0].logger._clear() - - def tearDown(self): - self.app.account_ring.set_replicas(3) - self.app.container_ring.set_replicas(3) - for policy in POLICIES: - policy.object_ring = FakeRing(base_port=3000) - - def assert_status_map(self, method, statuses, expected, raise_exc=False): - with save_globals(): - kwargs = {} - if raise_exc: - kwargs['raise_exc'] = raise_exc - - set_http_connect(*statuses, **kwargs) - self.app.memcache.store = {} - req = Request.blank('/v1/a/c/o', - headers={'Content-Length': '0', - 'Content-Type': 'text/plain'}) - self.app.update_request(req) - try: - res = method(req) - except HTTPException as res: - pass - self.assertEqual(res.status_int, expected) - - # repeat test - set_http_connect(*statuses, **kwargs) - self.app.memcache.store = {} - req = Request.blank('/v1/a/c/o', - headers={'Content-Length': '0', - 'Content-Type': 'text/plain'}) - self.app.update_request(req) - try: - res = method(req) - except HTTPException as res: - pass - self.assertEqual(res.status_int, expected) - - def _sleep_enough(self, condition): - for sleeptime in (0.1, 1.0): - sleep(sleeptime) - if condition(): - break - +class TestReplicatedObjectController( + BaseTestObjectController, unittest.TestCase): + """ + Test suite for replication policy + """ @unpatch_policies def test_policy_IO(self): def check_file(policy, cont, devs, check_val): @@ -5632,17 +4811,847 @@ class TestObjectController(ECTestMixin, unittest.TestCase): ]) -class TestObjectControllerECDuplication(ECTestMixin, unittest.TestCase): - ec_policy_index = 4 +class BaseTestECObjectController(BaseTestObjectController): - def setUp(self): - self.app = proxy_server.Application( - None, FakeMemcache(), - logger=debug_logger('proxy-ut'), - account_ring=FakeRing(), - container_ring=FakeRing()) - # clear proxy logger result for each test - _test_servers[0].logger._clear() + @unpatch_policies + def test_PUT_ec(self): + policy = POLICIES[self.ec_policy_index] + self.put_container(policy.name, policy.name) + + obj = 'abCD' * 10 # small, so we don't get multiple EC stripes + prolis = _test_sockets[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/o1 HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: "%s"\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (policy.name, md5(obj).hexdigest(), + len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + ecd = policy.pyeclib_driver + expected_pieces = set(ecd.encode(obj)) + + # go to disk to make sure it's there and all erasure-coded + partition, nodes = policy.object_ring.get_nodes('a', policy.name, 'o1') + conf = {'devices': _testdir, 'mount_check': 'false'} + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[policy] + + got_pieces = set() + got_indices = set() + got_durable = [] + for node_index, node in enumerate(nodes): + df = df_mgr.get_diskfile(node['device'], partition, + 'a', policy.name, 'o1', + policy=policy) + with df.open(): + meta = df.get_metadata() + contents = ''.join(df.reader()) + got_pieces.add(contents) + + lmeta = dict((k.lower(), v) for k, v in meta.items()) + got_indices.add( + lmeta['x-object-sysmeta-ec-frag-index']) + + self.assertEqual( + lmeta['x-object-sysmeta-ec-etag'], + md5(obj).hexdigest()) + self.assertEqual( + lmeta['x-object-sysmeta-ec-content-length'], + str(len(obj))) + self.assertEqual( + lmeta['x-object-sysmeta-ec-segment-size'], + '4096') + self.assertEqual( + lmeta['x-object-sysmeta-ec-scheme'], + '%s 2+1' % DEFAULT_TEST_EC_TYPE) + self.assertEqual( + lmeta['etag'], + md5(contents).hexdigest()) + + # check presence for a durable data file for the timestamp + durable_file = ( + utils.Timestamp(df.timestamp).internal + + '#%s' % lmeta['x-object-sysmeta-ec-frag-index'] + + '#d.data') + durable_file = os.path.join( + _testdir, node['device'], storage_directory( + diskfile.get_data_dir(policy), + partition, hash_path('a', policy.name, 'o1')), + durable_file) + if os.path.isfile(durable_file): + got_durable.append(True) + + self.assertEqual(expected_pieces, got_pieces) + self.assertEqual(set(('0', '1', '2')), got_indices) + + # verify at least 2 puts made it all the way to the end of 2nd + # phase, ie at least 2 durable statuses were written + num_durable_puts = sum(d is True for d in got_durable) + self.assertGreaterEqual(num_durable_puts, 2) + + @unpatch_policies + def test_PUT_ec_multiple_segments(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + pyeclib_header_size = len(ec_policy.pyeclib_driver.encode("")[0]) + segment_size = ec_policy.ec_segment_size + + # Big enough to have multiple segments. Also a multiple of the + # segment size to get coverage of that path too. + obj = 'ABC' * segment_size + + prolis = _test_sockets[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/o2 HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + # it's a 2+1 erasure code, so each fragment archive should be half + # the length of the object, plus three inline pyeclib metadata + # things (one per segment) + expected_length = (len(obj) / 2 + pyeclib_header_size * 3) + + partition, nodes = ec_policy.object_ring.get_nodes( + 'a', ec_policy.name, 'o2') + + conf = {'devices': _testdir, 'mount_check': 'false'} + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] + + got_durable = [] + fragment_archives = [] + for node in nodes: + df = df_mgr.get_diskfile( + node['device'], partition, 'a', + ec_policy.name, 'o2', policy=ec_policy) + with df.open(): + meta = df.get_metadata() + contents = ''.join(df.reader()) + fragment_archives.append(contents) + self.assertEqual(len(contents), expected_length) + + durable_file = ( + utils.Timestamp(df.timestamp).internal + + '#%s' % meta['X-Object-Sysmeta-Ec-Frag-Index'] + + '#d.data') + durable_file = os.path.join( + _testdir, node['device'], storage_directory( + diskfile.get_data_dir(ec_policy), + partition, hash_path('a', ec_policy.name, 'o2')), + durable_file) + if os.path.isfile(durable_file): + got_durable.append(True) + + # Verify that we can decode each individual fragment and that they + # are all the correct size + fragment_size = ec_policy.fragment_size + nfragments = int( + math.ceil(float(len(fragment_archives[0])) / fragment_size)) + + for fragment_index in range(nfragments): + fragment_start = fragment_index * fragment_size + fragment_end = (fragment_index + 1) * fragment_size + + try: + frags = [fa[fragment_start:fragment_end] + for fa in fragment_archives] + seg = ec_policy.pyeclib_driver.decode(frags) + except ECDriverError: + self.fail("Failed to decode fragments %d; this probably " + "means the fragments are not the sizes they " + "should be" % fragment_index) + + segment_start = fragment_index * segment_size + segment_end = (fragment_index + 1) * segment_size + + self.assertEqual(seg, obj[segment_start:segment_end]) + + # verify at least 2 puts made it all the way to the end of 2nd + # phase, ie at least 2 .durable statuses were written + num_durable_puts = sum(d is True for d in got_durable) + self.assertGreaterEqual(num_durable_puts, 2) + + @unpatch_policies + def test_PUT_ec_object_etag_mismatch(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + obj = '90:6A:02:60:B1:08-96da3e706025537fc42464916427727e' + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/o3 HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: %s\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, + md5('something else').hexdigest(), + len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 422' + self.assertEqual(headers[:len(exp)], exp) + + # nothing should have made it to disk on the object servers + partition, nodes = prosrv.get_object_ring( + int(ec_policy)).get_nodes('a', ec_policy.name, 'o3') + conf = {'devices': _testdir, 'mount_check': 'false'} + + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] + + for node in nodes: + df = df_mgr.get_diskfile(node['device'], partition, + 'a', ec_policy.name, 'o3', + policy=ec_policy) + self.assertRaises(DiskFileNotExist, df.open) + + @unpatch_policies + def test_PUT_ec_fragment_archive_etag_mismatch(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + # Cause a hash mismatch by feeding one particular MD5 hasher some + # extra data. The goal here is to get exactly more than one of the + # hashers in an object server. + count = (ec_policy.object_ring.replica_count - ec_policy.ec_ndata) + countdown = [count] + + def busted_md5_constructor(initial_str=""): + hasher = md5(initial_str) + if countdown[0] > 0: + hasher.update('wrong') + countdown[0] -= 1 + return hasher + + obj = 'uvarovite-esurience-cerated-symphysic' + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + with mock.patch('swift.obj.server.md5', busted_md5_constructor): + fd = sock.makefile() + fd.write('PUT /v1/a/%s/pimento HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: %s\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), + len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 503' # no quorum + self.assertEqual(headers[:len(exp)], exp) + + # replica count - 1 of the fragment archives should have + # landed on disk + partition, nodes = prosrv.get_object_ring( + int(ec_policy)).get_nodes('a', ec_policy.name, 'pimento') + conf = {'devices': _testdir, 'mount_check': 'false'} + + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] + + found = 0 + for node in nodes: + df = df_mgr.get_diskfile(node['device'], partition, + 'a', ec_policy.name, 'pimento', + policy=ec_policy) + try: + # diskfile open won't succeed because no durable was written, + # so look under the hood for data files. + files = os.listdir(df._datadir) + if len(files) > 0: + # Although the third fragment archive hasn't landed on + # disk, the directory df._datadir is pre-maturely created + # and is empty when we use O_TMPFILE + linkat() + num_data_files = \ + len([f for f in files if f.endswith('.data')]) + self.assertEqual(1, num_data_files) + found += 1 + except OSError: + pass + self.assertEqual(found, ec_policy.ec_ndata) + + @unpatch_policies + def test_PUT_ec_fragment_quorum_archive_etag_mismatch(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container("ec", "ec-con") + + def busted_md5_constructor(initial_str=""): + hasher = md5(initial_str) + hasher.update('wrong') + return hasher + + obj = 'uvarovite-esurience-cerated-symphysic' + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + + call_count = [0] + + def mock_committer(self): + call_count[0] += 1 + + commit_confirmation = \ + 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' + + with mock.patch('swift.obj.server.md5', busted_md5_constructor), \ + mock.patch(commit_confirmation, mock_committer): + fd = sock.makefile() + fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: %s\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 503' # no quorum + self.assertEqual(headers[:len(exp)], exp) + # Don't send commit to object-server if quorum responses consist of 4xx + self.assertEqual(0, call_count[0]) + + # no fragment archives should have landed on disk + partition, nodes = prosrv.get_object_ring(3).get_nodes( + 'a', 'ec-con', 'quorum') + conf = {'devices': _testdir, 'mount_check': 'false'} + + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] + + for node in nodes: + df = df_mgr.get_diskfile(node['device'], partition, + 'a', 'ec-con', 'quorum', + policy=ec_policy) + if os.path.exists(df._datadir): + self.assertFalse(os.listdir(df._datadir)) # should be empty + + @unpatch_policies + def test_PUT_ec_fragment_quorum_bad_request(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container("ec", "ec-con") + + obj = 'uvarovite-esurience-cerated-symphysic' + prolis = _test_sockets[0] + prosrv = _test_servers[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + + call_count = [0] + + def mock_committer(self): + call_count[0] += 1 + + read_footer = \ + 'swift.obj.server.ObjectController._read_metadata_footer' + commit_confirmation = \ + 'swift.proxy.controllers.obj.MIMEPutter.send_commit_confirmation' + + with mock.patch(read_footer) as read_footer_call, \ + mock.patch(commit_confirmation, mock_committer): + # Emulate missing footer MIME doc in all object-servers + read_footer_call.side_effect = HTTPBadRequest( + body="couldn't find footer MIME doc") + + fd = sock.makefile() + fd.write('PUT /v1/a/ec-con/quorum HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: %s\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (md5(obj).hexdigest(), len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + + # Don't show a result of the bad conversation between proxy-server + # and object-server + exp = 'HTTP/1.1 503' + self.assertEqual(headers[:len(exp)], exp) + # Don't send commit to object-server if quorum responses consist of 4xx + self.assertEqual(0, call_count[0]) + + # no fragment archives should have landed on disk + partition, nodes = prosrv.get_object_ring(3).get_nodes( + 'a', 'ec-con', 'quorum') + conf = {'devices': _testdir, 'mount_check': 'false'} + + df_mgr = diskfile.DiskFileRouter(conf, FakeLogger())[ec_policy] + + for node in nodes: + df = df_mgr.get_diskfile(node['device'], partition, + 'a', 'ec-con', 'quorum', + policy=ec_policy) + if os.path.exists(df._datadir): + self.assertFalse(os.listdir(df._datadir)) # should be empty + + @unpatch_policies + def test_PUT_ec_if_none_match(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + obj = 'ananepionic-lepidophyllous-ropewalker-neglectful' + prolis = _test_sockets[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Etag: "%s"\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), + len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/inm HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'If-None-Match: *\r\n' + 'Etag: "%s"\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, md5(obj).hexdigest(), + len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 412' + self.assertEqual(headers[:len(exp)], exp) + + @unpatch_policies + def test_GET_ec(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + obj = '0123456' * 11 * 17 + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/go-get-it HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'X-Object-Meta-Color: chartreuse\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/%s/go-get-it HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEqual(headers[:len(exp)], exp) + + headers = parse_headers_string(headers) + self.assertEqual(str(len(obj)), headers['Content-Length']) + self.assertEqual(md5(obj).hexdigest(), headers['Etag']) + self.assertEqual('chartreuse', headers['X-Object-Meta-Color']) + + gotten_obj = '' + while True: + buf = fd.read(64) + if not buf: + break + gotten_obj += buf + self.assertEqual(gotten_obj, obj) + error_lines = prosrv.logger.get_lines_for_level('error') + warn_lines = prosrv.logger.get_lines_for_level('warning') + self.assertEqual(len(error_lines), 0) # sanity + self.assertEqual(len(warn_lines), 0) # sanity + + @unpatch_policies + def test_conditional_GET_ec(self): + policy = POLICIES[self.ec_policy_index] + self.assertEqual('erasure_coding', policy.policy_type) # sanity + self._test_conditional_GET(policy) + + @unpatch_policies + def test_GET_ec_big(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + # our EC segment size is 4 KiB, so this is multiple (3) segments; + # we'll verify that with a sanity check + obj = 'a moose once bit my sister' * 400 + self.assertGreater( + len(obj), ec_policy.ec_segment_size * 2, + "object is too small for proper testing") + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/big-obj-get HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/%s/big-obj-get HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEqual(headers[:len(exp)], exp) + + headers = parse_headers_string(headers) + self.assertEqual(str(len(obj)), headers['Content-Length']) + self.assertEqual(md5(obj).hexdigest(), headers['Etag']) + + gotten_obj = '' + while True: + buf = fd.read(64) + if not buf: + break + gotten_obj += buf + # This may look like a redundant test, but when things fail, this + # has a useful failure message while the subsequent one spews piles + # of garbage and demolishes your terminal's scrollback buffer. + self.assertEqual(len(gotten_obj), len(obj)) + self.assertEqual(gotten_obj, obj) + error_lines = prosrv.logger.get_lines_for_level('error') + warn_lines = prosrv.logger.get_lines_for_level('warning') + self.assertEqual(len(error_lines), 0) # sanity + self.assertEqual(len(warn_lines), 0) # sanity + + @unpatch_policies + def test_GET_ec_failure_handling(self): + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + obj = 'look at this object; it is simply amazing ' * 500 + prolis = _test_sockets[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/crash-test-dummy HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + def explodey_iter(inner_iter): + yield next(inner_iter) + raise Exception("doom ba doom") + + def explodey_doc_parts_iter(inner_iter_iter): + try: + for item in inner_iter_iter: + item = item.copy() # paranoia about mutable data + item['part_iter'] = explodey_iter(item['part_iter']) + yield item + except GeneratorExit: + inner_iter_iter.close() + raise + + real_ec_app_iter = swift.proxy.controllers.obj.ECAppIter + + def explodey_ec_app_iter(path, policy, iterators, *a, **kw): + # Each thing in `iterators` here is a document-parts iterator, + # and we want to fail after getting a little into each part. + # + # That way, we ensure we've started streaming the response to + # the client when things go wrong. + return real_ec_app_iter( + path, policy, + [explodey_doc_parts_iter(i) for i in iterators], + *a, **kw) + + with mock.patch("swift.proxy.controllers.obj.ECAppIter", + explodey_ec_app_iter): + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/%s/crash-test-dummy HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEqual(headers[:len(exp)], exp) + + headers = parse_headers_string(headers) + self.assertEqual(str(len(obj)), headers['Content-Length']) + self.assertEqual(md5(obj).hexdigest(), headers['Etag']) + + gotten_obj = '' + try: + # don't hang the test run when this fails + with Timeout(300): + while True: + buf = fd.read(64) + if not buf: + break + gotten_obj += buf + except Timeout: + self.fail("GET hung when connection failed") + + # Ensure we failed partway through, otherwise the mocks could + # get out of date without anyone noticing + self.assertTrue(0 < len(gotten_obj) < len(obj)) + + @unpatch_policies + def test_HEAD_ec(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + obj = '0123456' * 11 * 17 + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('PUT /v1/a/%s/go-head-it HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'Content-Length: %d\r\n' + 'X-Storage-Token: t\r\n' + 'X-Object-Meta-Color: chartreuse\r\n' + 'Content-Type: application/octet-stream\r\n' + '\r\n%s' % (ec_policy.name, len(obj), obj)) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 201' + self.assertEqual(headers[:len(exp)], exp) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('HEAD /v1/a/%s/go-head-it HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEqual(headers[:len(exp)], exp) + + headers = parse_headers_string(headers) + self.assertEqual(str(len(obj)), headers['Content-Length']) + self.assertEqual(md5(obj).hexdigest(), headers['Etag']) + self.assertEqual('chartreuse', headers['X-Object-Meta-Color']) + + error_lines = prosrv.logger.get_lines_for_level('error') + warn_lines = prosrv.logger.get_lines_for_level('warning') + self.assertEqual(len(error_lines), 0) # sanity + self.assertEqual(len(warn_lines), 0) # sanity + + @unpatch_policies + def test_GET_ec_404(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('GET /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 404' + self.assertEqual(headers[:len(exp)], exp) + + error_lines = prosrv.logger.get_lines_for_level('error') + warn_lines = prosrv.logger.get_lines_for_level('warning') + self.assertEqual(len(error_lines), 0) # sanity + self.assertEqual(len(warn_lines), 0) # sanity + + @unpatch_policies + def test_HEAD_ec_404(self): + prolis = _test_sockets[0] + prosrv = _test_servers[0] + + ec_policy = POLICIES[self.ec_policy_index] + self.put_container(ec_policy.name, ec_policy.name) + + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write('HEAD /v1/a/%s/yes-we-have-no-bananas HTTP/1.1\r\n' + 'Host: localhost\r\n' + 'Connection: close\r\n' + 'X-Storage-Token: t\r\n' + '\r\n' % ec_policy.name) + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 404' + self.assertEqual(headers[:len(exp)], exp) + + error_lines = prosrv.logger.get_lines_for_level('error') + warn_lines = prosrv.logger.get_lines_for_level('warning') + self.assertEqual(len(error_lines), 0) # sanity + self.assertEqual(len(warn_lines), 0) # sanity + + @unpatch_policies + def test_reload_ring_ec(self): + policy = POLICIES[self.ec_policy_index] + self.put_container("ec", "ec-con") + + orig_rtime = policy.object_ring._rtime + orig_replica_count = policy.object_ring.replica_count + # save original file as back up + copyfile(policy.object_ring.serialized_path, + policy.object_ring.serialized_path + '.bak') + + try: + # overwrite with 2 replica, 2 devices ring + obj_devs = [] + obj_devs.append( + {'port': _test_sockets[-3].getsockname()[1], + 'device': 'sdg1'}) + obj_devs.append( + {'port': _test_sockets[-2].getsockname()[1], + 'device': 'sdh1'}) + write_fake_ring(policy.object_ring.serialized_path, + *obj_devs) + + def get_ring_reloaded_response(method): + # force to reload at the request + policy.object_ring._rtime = 0 + + trans_data = ['%s /v1/a/ec-con/o2 HTTP/1.1\r\n' % method, + 'Host: localhost\r\n', + 'Connection: close\r\n', + 'X-Storage-Token: t\r\n'] + + if method == 'PUT': + # small, so we don't get multiple EC stripes + obj = 'abCD' * 10 + + extra_trans_data = [ + 'Etag: "%s"\r\n' % md5(obj).hexdigest(), + 'Content-Length: %d\r\n' % len(obj), + 'Content-Type: application/octet-stream\r\n', + '\r\n%s' % obj + ] + trans_data.extend(extra_trans_data) + else: + trans_data.append('\r\n') + + prolis = _test_sockets[0] + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write(''.join(trans_data)) + fd.flush() + headers = readuntil2crlfs(fd) + + # use older ring with rollbacking + return headers + + for method in ('PUT', 'HEAD', 'GET', 'POST', 'DELETE'): + headers = get_ring_reloaded_response(method) + exp = 'HTTP/1.1 20' + self.assertEqual(headers[:len(exp)], exp) + + # proxy didn't load newest ring, use older one + self.assertEqual(orig_replica_count, + policy.object_ring.replica_count) + + if method == 'POST': + # Take care fast post here! + orig_post_as_copy = getattr( + _test_servers[0], 'object_post_as_copy', None) + try: + _test_servers[0].object_post_as_copy = False + with mock.patch.object( + _test_servers[0], + 'object_post_as_copy', False): + headers = get_ring_reloaded_response(method) + finally: + if orig_post_as_copy is None: + del _test_servers[0].object_post_as_copy + else: + _test_servers[0].object_post_as_copy = \ + orig_post_as_copy + + exp = 'HTTP/1.1 20' + self.assertEqual(headers[:len(exp)], exp) + # sanity + self.assertEqual(orig_replica_count, + policy.object_ring.replica_count) + + finally: + policy.object_ring._rtime = orig_rtime + os.rename(policy.object_ring.serialized_path + '.bak', + policy.object_ring.serialized_path) + + +@patch_policies([StoragePolicy(0, 'zero', True, + object_ring=FakeRing(base_port=3000))]) +class TestECObjectController(BaseTestECObjectController, unittest.TestCase): + ec_policy_index = 3 + + +@patch_policies([StoragePolicy(0, 'zero', True, + object_ring=FakeRing(base_port=3000))]) +class TestECDuplicationObjectController( + BaseTestECObjectController, unittest.TestCase): + ec_policy_index = 4 class TestECMismatchedFA(unittest.TestCase):