diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 6d03334e76..aa0cefbcbc 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1127,18 +1127,44 @@ class GetterBase(object): else: self.backend_headers.pop('Range') + def learn_size_from_content_range(self, start, end, length): + """ + Sets our Range header's first byterange to the value learned from + the Content-Range header in the response; if we were given a + fully-specified range (e.g. "bytes=123-456"), this is a no-op. + + If we were given a half-specified range (e.g. "bytes=123-" or + "bytes=-456"), then this changes the Range header to a + semantically-equivalent one *and* it lets us resume on a proper + boundary instead of just in the middle of a piece somewhere. + """ + if length == 0: + return + + if 'Range' in self.backend_headers: + try: + req_range = Range(self.backend_headers['Range']) + new_ranges = [(start, end)] + req_range.ranges[1:] + except ValueError: + new_ranges = [(start, end)] + else: + new_ranges = [(start, end)] + + self.backend_headers['Range'] = ( + "bytes=" + (",".join("%s-%s" % (s if s is not None else '', + e if e is not None else '') + for s, e in new_ranges))) + class GetOrHeadHandler(GetterBase): def __init__(self, app, req, server_type, node_iter, partition, path, backend_headers, concurrency=1, policy=None, - client_chunk_size=None, newest=None, logger=None): + newest=None, logger=None): super(GetOrHeadHandler, self).__init__( app=app, req=req, node_iter=node_iter, partition=partition, policy=policy, path=path, backend_headers=backend_headers, logger=logger) self.server_type = server_type - self.client_chunk_size = client_chunk_size - self.skip_bytes = 0 self.used_nodes = [] self.used_source_etag = '' self.concurrency = concurrency @@ -1167,40 +1193,6 @@ class GetOrHeadHandler(GetterBase): # populated from response headers self.start_byte = self.end_byte = self.length = None - def learn_size_from_content_range(self, start, end, length): - """ - If client_chunk_size is set, makes sure we yield things starting on - chunk boundaries based on the Content-Range header in the response. - - Sets our Range header's first byterange to the value learned from - the Content-Range header in the response; if we were given a - fully-specified range (e.g. "bytes=123-456"), this is a no-op. - - If we were given a half-specified range (e.g. "bytes=123-" or - "bytes=-456"), then this changes the Range header to a - semantically-equivalent one *and* it lets us resume on a proper - boundary instead of just in the middle of a piece somewhere. - """ - if length == 0: - return - - if self.client_chunk_size: - self.skip_bytes = bytes_to_skip(self.client_chunk_size, start) - - if 'Range' in self.backend_headers: - try: - req_range = Range(self.backend_headers['Range']) - new_ranges = [(start, end)] + req_range.ranges[1:] - except ValueError: - new_ranges = [(start, end)] - else: - new_ranges = [(start, end)] - - self.backend_headers['Range'] = ( - "bytes=" + (",".join("%s-%s" % (s if s is not None else '', - e if e is not None else '') - for s, e in new_ranges))) - def is_good_source(self, src): """ Indicates whether or not the request made to the backend found @@ -1237,16 +1229,12 @@ class GetOrHeadHandler(GetterBase): raise StopIteration() def iter_bytes_from_response_part(self, part_file, nbytes): - buf = b'' part_file = ByteCountEnforcer(part_file, nbytes) while True: try: with WatchdogTimeout(self.app.watchdog, self.node_timeout, ChunkReadTimeout): chunk = part_file.read(self.app.object_chunk_size) - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk if nbytes is not None: nbytes -= len(chunk) except (ChunkReadTimeout, ShortReadError): @@ -1259,7 +1247,6 @@ class GetOrHeadHandler(GetterBase): six.reraise(exc_type, exc_value, exc_traceback) except RangeAlreadyComplete: break - buf = b'' if self._replace_source_and_node( 'Trying to read object during GET (retrying)'): try: @@ -1274,43 +1261,14 @@ class GetOrHeadHandler(GetterBase): else: six.reraise(exc_type, exc_value, exc_traceback) else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' break - if self.client_chunk_size is not None: - while len(buf) >= self.client_chunk_size: - client_chunk = buf[:self.client_chunk_size] - buf = buf[self.client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(chunk) + yield chunk def _get_response_parts_iter(self, req): try: @@ -1325,14 +1283,12 @@ class GetOrHeadHandler(GetterBase): while True: start_byte, end_byte, length, headers, part = \ self.get_next_doc_part() - # note: learn_size_from_content_range() sets - # self.skip_bytes self.learn_size_from_content_range( start_byte, end_byte, length) self.bytes_used_from_backend = 0 # not length; that refers to the whole object, so is the # wrong value to use for GET-range responses - byte_count = ((end_byte - start_byte + 1) - self.skip_bytes + byte_count = ((end_byte - start_byte + 1) if (end_byte is not None and start_byte is not None) else None) @@ -2164,7 +2120,7 @@ class Controller(object): return False def GETorHEAD_base(self, req, server_type, node_iter, partition, path, - concurrency=1, policy=None, client_chunk_size=None): + concurrency=1, policy=None): """ Base handler for HTTP GET or HEAD requests. @@ -2175,7 +2131,6 @@ class Controller(object): :param path: path for the request :param concurrency: number of requests to run concurrently :param policy: the policy instance, or None if Account or Container - :param client_chunk_size: chunk size for response body iterator :returns: swob.Response object """ backend_headers = self.generate_request_headers( @@ -2184,7 +2139,6 @@ class Controller(object): handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter, partition, path, backend_headers, concurrency, policy=policy, - client_chunk_size=client_chunk_size, logger=self.logger) res = handler.get_working_response(req) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 43dffdfbab..6b830575de 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2503,39 +2503,7 @@ class ECFragGetter(GetterBase): self.fragment_size = policy.fragment_size self.skip_bytes = 0 self.logger_thread_locals = logger_thread_locals - - def learn_size_from_content_range(self, start, end, length): - """ - Make sure we yield things starting on fragment boundaries based on the - Content-Range header in the response. - - Sets our Range header's first byterange to the value learned from - the Content-Range header in the response; if we were given a - fully-specified range (e.g. "bytes=123-456"), this is a no-op. - - If we were given a half-specified range (e.g. "bytes=123-" or - "bytes=-456"), then this changes the Range header to a - semantically-equivalent one *and* it lets us resume on a proper - boundary instead of just in the middle of a piece somewhere. - """ - if length == 0: - return - - self.skip_bytes = bytes_to_skip(self.fragment_size, start) - - if 'Range' in self.backend_headers: - try: - req_range = Range(self.backend_headers['Range']) - new_ranges = [(start, end)] + req_range.ranges[1:] - except ValueError: - new_ranges = [(start, end)] - else: - new_ranges = [(start, end)] - - self.backend_headers['Range'] = ( - "bytes=" + (",".join("%s-%s" % (s if s is not None else '', - e if e is not None else '') - for s, e in new_ranges))) + self.status = self.reason = self.body = self.source_headers = None def response_parts_iter(self, req): try: @@ -2651,8 +2619,10 @@ class ECFragGetter(GetterBase): # sure why the req.environ update is always needed req.environ['swift.non_client_disconnect'] = True break - # note: learn_size_from_content_range() sets - # self.skip_bytes + # skip_bytes compensates for the backend request range + # expansion done in _convert_range + self.skip_bytes = bytes_to_skip( + self.fragment_size, start_byte) self.learn_size_from_content_range( start_byte, end_byte, length) self.bytes_used_from_backend = 0 diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 0d0206f08e..6636c669bc 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -49,7 +49,7 @@ import six.moves.cPickle as pickle from six.moves import range from six.moves.http_client import HTTPException -from swift.common import storage_policy, swob, utils +from swift.common import storage_policy, swob, utils, exceptions from swift.common.memcached import MemcacheConnectionError from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, VALID_EC_TYPES) @@ -1452,3 +1452,36 @@ class ConfigAssertMixin(object): app = app._pipeline_final_app found_value = getattr(app, option_name) self.assertEqual(found_value, option_value) + + +class FakeSource(object): + def __init__(self, chunks, headers=None, body=b''): + self.chunks = list(chunks) + self.headers = headers or {} + self.status = 200 + self.swift_conn = None + self.body = body + + def read(self, _read_size): + if self.chunks: + chunk = self.chunks.pop(0) + if chunk is None: + raise exceptions.ChunkReadTimeout() + else: + return chunk + else: + return self.body + + def getheader(self, header): + # content-length for the whole object is generated dynamically + # by summing non-None chunks + if header.lower() == "content-length": + if self.chunks: + return str(sum(len(c) for c in self.chunks + if c is not None)) + return len(self.read(-1)) + return self.headers.get(header.lower()) + + def getheaders(self): + return [('content-length', self.getheader('content-length'))] + \ + [(k, v) for k, v in self.headers.items()] diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index db9cc2abc7..44d1db8f1a 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -40,7 +40,7 @@ from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection from test.debug_logger import debug_logger from test.unit import ( fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, - make_timestamp_iter, mocked_http_conn, patch_policies) + make_timestamp_iter, mocked_http_conn, patch_policies, FakeSource) from swift.common.request_helpers import ( get_sys_meta_prefix, get_object_transient_sysmeta ) @@ -183,39 +183,6 @@ class FakeCache(FakeMemcache): return self.stub or self.store.get(key) -class FakeSource(object): - def __init__(self, chunks, headers=None, body=b''): - self.chunks = list(chunks) - self.headers = headers or {} - self.status = 200 - self.swift_conn = None - self.body = body - - def read(self, _read_size): - if self.chunks: - chunk = self.chunks.pop(0) - if chunk is None: - raise exceptions.ChunkReadTimeout() - else: - return chunk - else: - return self.body - - def getheader(self, header): - # content-length for the whole object is generated dynamically - # by summing non-None chunks - if header.lower() == "content-length": - if self.chunks: - return str(sum(len(c) for c in self.chunks - if c is not None)) - return len(self.read(-1)) - return self.headers.get(header.lower()) - - def getheaders(self): - return [('content-length', self.getheader('content-length'))] + \ - [(k, v) for k, v in self.headers.items()] - - class BaseTest(unittest.TestCase): def setUp(self): @@ -1304,70 +1271,6 @@ class TestFuncs(BaseTest): self.assertEqual(v, dst_headers[k]) self.assertEqual('', dst_headers['Referer']) - def test_client_chunk_size(self): - source = FakeSource(( - b'abcd', b'1234', b'abc', b'd1', b'234abcd1234abcd1', b'2')) - req = Request.blank('/v1/a/c/o') - handler = GetOrHeadHandler( - self.app, req, None, Namespace(num_primary_nodes=3), None, None, - {}, client_chunk_size=8) - - with mock.patch.object(handler, '_get_source_and_node', - return_value=(source, {})): - resp = handler.get_working_response(req) - client_chunks = list(resp.app_iter) - self.assertEqual(client_chunks, [ - b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12']) - - def test_client_chunk_size_resuming(self): - node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} - - source1 = FakeSource([b'abcd', b'1234', None, - b'efgh', b'5678', b'lots', b'more', b'data']) - # incomplete reads of client_chunk_size will be re-fetched - source2 = FakeSource([b'efgh', b'5678', b'lots', None]) - source3 = FakeSource([b'lots', b'more', b'data']) - req = Request.blank('/v1/a/c/o') - handler = GetOrHeadHandler( - self.app, req, 'Object', Namespace(num_primary_nodes=1), None, - None, {}, client_chunk_size=8) - - range_headers = [] - sources = [(source1, node), (source2, node), (source3, node)] - - def mock_get_source_and_node(): - range_headers.append(handler.backend_headers.get('Range')) - return sources.pop(0) - - with mock.patch.object(handler, '_get_source_and_node', - mock_get_source_and_node): - resp = handler.get_working_response(req) - client_chunks = list(resp.app_iter) - self.assertEqual(range_headers, [None, 'bytes=8-27', 'bytes=16-27']) - self.assertEqual(client_chunks, [ - b'abcd1234', b'efgh5678', b'lotsmore', b'data']) - - def test_client_chunk_size_resuming_chunked(self): - node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} - headers = {'transfer-encoding': 'chunked', - 'content-type': 'text/plain'} - source1 = FakeSource([b'abcd', b'1234', b'abc', None], headers=headers) - source2 = FakeSource([b'efgh5678'], headers=headers) - sources = [(source1, node), (source2, node)] - req = Request.blank('/v1/a/c/o') - handler = GetOrHeadHandler( - self.app, req, 'Object', Namespace(num_primary_nodes=1), None, - None, {}, client_chunk_size=8) - - def mock_get_source_and_node(): - return sources.pop(0) - - with mock.patch.object(handler, '_get_source_and_node', - mock_get_source_and_node): - resp = handler.get_working_response(req) - client_chunks = list(resp.app_iter) - self.assertEqual(client_chunks, [b'abcd1234', b'efgh5678']) - def test_disconnected_logging(self): self.app.logger = mock.Mock() req = Request.blank('/v1/a/c/o') diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 0c2eb50d4b..2630d48e89 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -54,7 +54,7 @@ from test.unit import ( FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus, DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub, fake_ec_node_response, StubResponse, mocked_http_conn, - quiet_eventlet_exceptions) + quiet_eventlet_exceptions, FakeSource) from test.unit.proxy.test_server import node_error_count @@ -6788,6 +6788,73 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase): it = self.getter.iter_bytes_from_response_part(part, nbytes=None) self.assertEqual([c.encode() for c in 'something'], [ch for ch in it]) + def test_fragment_size(self): + source = FakeSource(( + b'abcd', b'1234', b'abc', b'd1', b'234abcd1234abcd1', b'2')) + req = Request.blank('/v1/a/c/o') + + def mock_source_and_node_gen(): + yield source, {} + + self.getter.fragment_size = 8 + with mock.patch.object(self.getter, '_source_and_node_gen', + mock_source_and_node_gen): + it = self.getter.response_parts_iter(req) + fragments = list(next(it)['part_iter']) + + self.assertEqual(fragments, [ + b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12']) + + def test_fragment_size_resuming(self): + node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} + + source1 = FakeSource([b'abcd', b'1234', None, + b'efgh', b'5678', b'lots', b'more', b'data']) + # incomplete reads of fragment_size will be re-fetched + source2 = FakeSource([b'efgh', b'5678', b'lots', None]) + source3 = FakeSource([b'lots', b'more', b'data']) + req = Request.blank('/v1/a/c/o') + range_headers = [] + sources = [(source1, node), (source2, node), (source3, node)] + + def mock_source_and_node_gen(): + for source in sources: + range_headers.append(self.getter.backend_headers.get('Range')) + yield source + + self.getter.fragment_size = 8 + with mock.patch.object(self.getter, '_source_and_node_gen', + mock_source_and_node_gen): + it = self.getter.response_parts_iter(req) + fragments = list(next(it)['part_iter']) + + self.assertEqual(fragments, [ + b'abcd1234', b'efgh5678', b'lotsmore', b'data']) + self.assertEqual(range_headers, [None, 'bytes=8-27', 'bytes=16-27']) + + def test_fragment_size_resuming_chunked(self): + node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} + headers = {'transfer-encoding': 'chunked', + 'content-type': 'text/plain'} + source1 = FakeSource([b'abcd', b'1234', b'abc', None], headers=headers) + source2 = FakeSource([b'efgh5678'], headers=headers) + range_headers = [] + sources = [(source1, node), (source2, node)] + req = Request.blank('/v1/a/c/o') + + def mock_source_and_node_gen(): + for source in sources: + range_headers.append(self.getter.backend_headers.get('Range')) + yield source + + self.getter.fragment_size = 8 + with mock.patch.object(self.getter, '_source_and_node_gen', + mock_source_and_node_gen): + it = self.getter.response_parts_iter(req) + fragments = list(next(it)['part_iter']) + self.assertEqual(fragments, [b'abcd1234', b'efgh5678']) + self.assertEqual(range_headers, [None, 'bytes=8-']) + if __name__ == '__main__': unittest.main()