proxy: remove client_chunk_size and skip_bytes from GetOrHeadHandler

The client_chunk_size attribute was introduced into GetOrHeadHandler
for EC support [1]. It was only ever not None for an
ECObjectController. The ECObjectController stopped using
GetOrHeadHandler for Object GET when the ECFragGetter class was
introduced [2], but the EC specific code was not expunged from
GetOrHeadHandler. In [3] the ECFragGetter client_chunk_size was renamed
to fragment_size to better reflect what it represented.

The skip_bytes attribute was similarly introduced for EC support. It
is only ever non-zero if client_chunk_size is an int. For EC,
skip_bytes is used to undo the effect of expanding the backend
range(s) to fetch whole fragments: the range(s) of decoded bytes
returned to the client may need to be narrower than the backend
ranges. There is no equivalent requirement for replicated GETs.

The elimination of client_chunk_size and skip_bytes simplifies the
yielding of chunks from the GetOrHeadHandler response iter.

Related-Change:
[1] I9c13c03616489f8eab7dcd7c5f21237ed4cb6fd2
[2] I0dc5644a84ededee753e449e053e6b1786fdcf32
[3] Ie1efaab3bd0510275d534b5c023cb73c98bec90d

Change-Id: I31ed36d32682469e3c5ca8bf9a2b383568d63c72
This commit is contained in:
Alistair Coles 2023-06-23 13:21:02 +01:00 committed by Clay Gerrard
parent 1b7cf29476
commit 369a72c4cf
5 changed files with 144 additions and 217 deletions

View File

@ -1127,18 +1127,44 @@ class GetterBase(object):
else:
self.backend_headers.pop('Range')
def learn_size_from_content_range(self, start, end, length):
"""
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
If we were given a half-specified range (e.g. "bytes=123-" or
"bytes=-456"), then this changes the Range header to a
semantically-equivalent one *and* it lets us resume on a proper
boundary instead of just in the middle of a piece somewhere.
"""
if length == 0:
return
if 'Range' in self.backend_headers:
try:
req_range = Range(self.backend_headers['Range'])
new_ranges = [(start, end)] + req_range.ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
self.backend_headers['Range'] = (
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
e if e is not None else '')
for s, e in new_ranges)))
class GetOrHeadHandler(GetterBase):
def __init__(self, app, req, server_type, node_iter, partition, path,
backend_headers, concurrency=1, policy=None,
client_chunk_size=None, newest=None, logger=None):
newest=None, logger=None):
super(GetOrHeadHandler, self).__init__(
app=app, req=req, node_iter=node_iter,
partition=partition, policy=policy, path=path,
backend_headers=backend_headers, logger=logger)
self.server_type = server_type
self.client_chunk_size = client_chunk_size
self.skip_bytes = 0
self.used_nodes = []
self.used_source_etag = ''
self.concurrency = concurrency
@ -1167,40 +1193,6 @@ class GetOrHeadHandler(GetterBase):
# populated from response headers
self.start_byte = self.end_byte = self.length = None
def learn_size_from_content_range(self, start, end, length):
"""
If client_chunk_size is set, makes sure we yield things starting on
chunk boundaries based on the Content-Range header in the response.
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
If we were given a half-specified range (e.g. "bytes=123-" or
"bytes=-456"), then this changes the Range header to a
semantically-equivalent one *and* it lets us resume on a proper
boundary instead of just in the middle of a piece somewhere.
"""
if length == 0:
return
if self.client_chunk_size:
self.skip_bytes = bytes_to_skip(self.client_chunk_size, start)
if 'Range' in self.backend_headers:
try:
req_range = Range(self.backend_headers['Range'])
new_ranges = [(start, end)] + req_range.ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
self.backend_headers['Range'] = (
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
e if e is not None else '')
for s, e in new_ranges)))
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
@ -1237,16 +1229,12 @@ class GetOrHeadHandler(GetterBase):
raise StopIteration()
def iter_bytes_from_response_part(self, part_file, nbytes):
buf = b''
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
try:
with WatchdogTimeout(self.app.watchdog, self.node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
# NB: this append must be *inside* the context
# manager for test.unit.SlowBody to do its thing
buf += chunk
if nbytes is not None:
nbytes -= len(chunk)
except (ChunkReadTimeout, ShortReadError):
@ -1259,7 +1247,6 @@ class GetOrHeadHandler(GetterBase):
six.reraise(exc_type, exc_value, exc_traceback)
except RangeAlreadyComplete:
break
buf = b''
if self._replace_source_and_node(
'Trying to read object during GET (retrying)'):
try:
@ -1274,43 +1261,14 @@ class GetOrHeadHandler(GetterBase):
else:
six.reraise(exc_type, exc_value, exc_traceback)
else:
if buf and self.skip_bytes:
if self.skip_bytes < len(buf):
buf = buf[self.skip_bytes:]
self.bytes_used_from_backend += self.skip_bytes
self.skip_bytes = 0
else:
self.skip_bytes -= len(buf)
self.bytes_used_from_backend += len(buf)
buf = b''
if not chunk:
if buf:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
break
if self.client_chunk_size is not None:
while len(buf) >= self.client_chunk_size:
client_chunk = buf[:self.client_chunk_size]
buf = buf[self.client_chunk_size:]
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += \
len(client_chunk)
yield client_chunk
else:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(chunk)
yield chunk
def _get_response_parts_iter(self, req):
try:
@ -1325,14 +1283,12 @@ class GetOrHeadHandler(GetterBase):
while True:
start_byte, end_byte, length, headers, part = \
self.get_next_doc_part()
# note: learn_size_from_content_range() sets
# self.skip_bytes
self.learn_size_from_content_range(
start_byte, end_byte, length)
self.bytes_used_from_backend = 0
# not length; that refers to the whole object, so is the
# wrong value to use for GET-range responses
byte_count = ((end_byte - start_byte + 1) - self.skip_bytes
byte_count = ((end_byte - start_byte + 1)
if (end_byte is not None
and start_byte is not None)
else None)
@ -2164,7 +2120,7 @@ class Controller(object):
return False
def GETorHEAD_base(self, req, server_type, node_iter, partition, path,
concurrency=1, policy=None, client_chunk_size=None):
concurrency=1, policy=None):
"""
Base handler for HTTP GET or HEAD requests.
@ -2175,7 +2131,6 @@ class Controller(object):
:param path: path for the request
:param concurrency: number of requests to run concurrently
:param policy: the policy instance, or None if Account or Container
:param client_chunk_size: chunk size for response body iterator
:returns: swob.Response object
"""
backend_headers = self.generate_request_headers(
@ -2184,7 +2139,6 @@ class Controller(object):
handler = GetOrHeadHandler(self.app, req, self.server_type, node_iter,
partition, path, backend_headers,
concurrency, policy=policy,
client_chunk_size=client_chunk_size,
logger=self.logger)
res = handler.get_working_response(req)

View File

@ -2503,39 +2503,7 @@ class ECFragGetter(GetterBase):
self.fragment_size = policy.fragment_size
self.skip_bytes = 0
self.logger_thread_locals = logger_thread_locals
def learn_size_from_content_range(self, start, end, length):
"""
Make sure we yield things starting on fragment boundaries based on the
Content-Range header in the response.
Sets our Range header's first byterange to the value learned from
the Content-Range header in the response; if we were given a
fully-specified range (e.g. "bytes=123-456"), this is a no-op.
If we were given a half-specified range (e.g. "bytes=123-" or
"bytes=-456"), then this changes the Range header to a
semantically-equivalent one *and* it lets us resume on a proper
boundary instead of just in the middle of a piece somewhere.
"""
if length == 0:
return
self.skip_bytes = bytes_to_skip(self.fragment_size, start)
if 'Range' in self.backend_headers:
try:
req_range = Range(self.backend_headers['Range'])
new_ranges = [(start, end)] + req_range.ranges[1:]
except ValueError:
new_ranges = [(start, end)]
else:
new_ranges = [(start, end)]
self.backend_headers['Range'] = (
"bytes=" + (",".join("%s-%s" % (s if s is not None else '',
e if e is not None else '')
for s, e in new_ranges)))
self.status = self.reason = self.body = self.source_headers = None
def response_parts_iter(self, req):
try:
@ -2651,8 +2619,10 @@ class ECFragGetter(GetterBase):
# sure why the req.environ update is always needed
req.environ['swift.non_client_disconnect'] = True
break
# note: learn_size_from_content_range() sets
# self.skip_bytes
# skip_bytes compensates for the backend request range
# expansion done in _convert_range
self.skip_bytes = bytes_to_skip(
self.fragment_size, start_byte)
self.learn_size_from_content_range(
start_byte, end_byte, length)
self.bytes_used_from_backend = 0

View File

@ -49,7 +49,7 @@ import six.moves.cPickle as pickle
from six.moves import range
from six.moves.http_client import HTTPException
from swift.common import storage_policy, swob, utils
from swift.common import storage_policy, swob, utils, exceptions
from swift.common.memcached import MemcacheConnectionError
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
VALID_EC_TYPES)
@ -1452,3 +1452,36 @@ class ConfigAssertMixin(object):
app = app._pipeline_final_app
found_value = getattr(app, option_name)
self.assertEqual(found_value, option_value)
class FakeSource(object):
def __init__(self, chunks, headers=None, body=b''):
self.chunks = list(chunks)
self.headers = headers or {}
self.status = 200
self.swift_conn = None
self.body = body
def read(self, _read_size):
if self.chunks:
chunk = self.chunks.pop(0)
if chunk is None:
raise exceptions.ChunkReadTimeout()
else:
return chunk
else:
return self.body
def getheader(self, header):
# content-length for the whole object is generated dynamically
# by summing non-None chunks
if header.lower() == "content-length":
if self.chunks:
return str(sum(len(c) for c in self.chunks
if c is not None))
return len(self.read(-1))
return self.headers.get(header.lower())
def getheaders(self):
return [('content-length', self.getheader('content-length'))] + \
[(k, v) for k, v in self.headers.items()]

View File

@ -40,7 +40,7 @@ from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
from test.debug_logger import debug_logger
from test.unit import (
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies,
make_timestamp_iter, mocked_http_conn, patch_policies)
make_timestamp_iter, mocked_http_conn, patch_policies, FakeSource)
from swift.common.request_helpers import (
get_sys_meta_prefix, get_object_transient_sysmeta
)
@ -183,39 +183,6 @@ class FakeCache(FakeMemcache):
return self.stub or self.store.get(key)
class FakeSource(object):
def __init__(self, chunks, headers=None, body=b''):
self.chunks = list(chunks)
self.headers = headers or {}
self.status = 200
self.swift_conn = None
self.body = body
def read(self, _read_size):
if self.chunks:
chunk = self.chunks.pop(0)
if chunk is None:
raise exceptions.ChunkReadTimeout()
else:
return chunk
else:
return self.body
def getheader(self, header):
# content-length for the whole object is generated dynamically
# by summing non-None chunks
if header.lower() == "content-length":
if self.chunks:
return str(sum(len(c) for c in self.chunks
if c is not None))
return len(self.read(-1))
return self.headers.get(header.lower())
def getheaders(self):
return [('content-length', self.getheader('content-length'))] + \
[(k, v) for k, v in self.headers.items()]
class BaseTest(unittest.TestCase):
def setUp(self):
@ -1304,70 +1271,6 @@ class TestFuncs(BaseTest):
self.assertEqual(v, dst_headers[k])
self.assertEqual('', dst_headers['Referer'])
def test_client_chunk_size(self):
source = FakeSource((
b'abcd', b'1234', b'abc', b'd1', b'234abcd1234abcd1', b'2'))
req = Request.blank('/v1/a/c/o')
handler = GetOrHeadHandler(
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
{}, client_chunk_size=8)
with mock.patch.object(handler, '_get_source_and_node',
return_value=(source, {})):
resp = handler.get_working_response(req)
client_chunks = list(resp.app_iter)
self.assertEqual(client_chunks, [
b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12'])
def test_client_chunk_size_resuming(self):
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
source1 = FakeSource([b'abcd', b'1234', None,
b'efgh', b'5678', b'lots', b'more', b'data'])
# incomplete reads of client_chunk_size will be re-fetched
source2 = FakeSource([b'efgh', b'5678', b'lots', None])
source3 = FakeSource([b'lots', b'more', b'data'])
req = Request.blank('/v1/a/c/o')
handler = GetOrHeadHandler(
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
None, {}, client_chunk_size=8)
range_headers = []
sources = [(source1, node), (source2, node), (source3, node)]
def mock_get_source_and_node():
range_headers.append(handler.backend_headers.get('Range'))
return sources.pop(0)
with mock.patch.object(handler, '_get_source_and_node',
mock_get_source_and_node):
resp = handler.get_working_response(req)
client_chunks = list(resp.app_iter)
self.assertEqual(range_headers, [None, 'bytes=8-27', 'bytes=16-27'])
self.assertEqual(client_chunks, [
b'abcd1234', b'efgh5678', b'lotsmore', b'data'])
def test_client_chunk_size_resuming_chunked(self):
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
headers = {'transfer-encoding': 'chunked',
'content-type': 'text/plain'}
source1 = FakeSource([b'abcd', b'1234', b'abc', None], headers=headers)
source2 = FakeSource([b'efgh5678'], headers=headers)
sources = [(source1, node), (source2, node)]
req = Request.blank('/v1/a/c/o')
handler = GetOrHeadHandler(
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
None, {}, client_chunk_size=8)
def mock_get_source_and_node():
return sources.pop(0)
with mock.patch.object(handler, '_get_source_and_node',
mock_get_source_and_node):
resp = handler.get_working_response(req)
client_chunks = list(resp.app_iter)
self.assertEqual(client_chunks, [b'abcd1234', b'efgh5678'])
def test_disconnected_logging(self):
self.app.logger = mock.Mock()
req = Request.blank('/v1/a/c/o')

View File

@ -54,7 +54,7 @@ from test.unit import (
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub,
fake_ec_node_response, StubResponse, mocked_http_conn,
quiet_eventlet_exceptions)
quiet_eventlet_exceptions, FakeSource)
from test.unit.proxy.test_server import node_error_count
@ -6788,6 +6788,73 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
it = self.getter.iter_bytes_from_response_part(part, nbytes=None)
self.assertEqual([c.encode() for c in 'something'], [ch for ch in it])
def test_fragment_size(self):
source = FakeSource((
b'abcd', b'1234', b'abc', b'd1', b'234abcd1234abcd1', b'2'))
req = Request.blank('/v1/a/c/o')
def mock_source_and_node_gen():
yield source, {}
self.getter.fragment_size = 8
with mock.patch.object(self.getter, '_source_and_node_gen',
mock_source_and_node_gen):
it = self.getter.response_parts_iter(req)
fragments = list(next(it)['part_iter'])
self.assertEqual(fragments, [
b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12'])
def test_fragment_size_resuming(self):
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
source1 = FakeSource([b'abcd', b'1234', None,
b'efgh', b'5678', b'lots', b'more', b'data'])
# incomplete reads of fragment_size will be re-fetched
source2 = FakeSource([b'efgh', b'5678', b'lots', None])
source3 = FakeSource([b'lots', b'more', b'data'])
req = Request.blank('/v1/a/c/o')
range_headers = []
sources = [(source1, node), (source2, node), (source3, node)]
def mock_source_and_node_gen():
for source in sources:
range_headers.append(self.getter.backend_headers.get('Range'))
yield source
self.getter.fragment_size = 8
with mock.patch.object(self.getter, '_source_and_node_gen',
mock_source_and_node_gen):
it = self.getter.response_parts_iter(req)
fragments = list(next(it)['part_iter'])
self.assertEqual(fragments, [
b'abcd1234', b'efgh5678', b'lotsmore', b'data'])
self.assertEqual(range_headers, [None, 'bytes=8-27', 'bytes=16-27'])
def test_fragment_size_resuming_chunked(self):
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
headers = {'transfer-encoding': 'chunked',
'content-type': 'text/plain'}
source1 = FakeSource([b'abcd', b'1234', b'abc', None], headers=headers)
source2 = FakeSource([b'efgh5678'], headers=headers)
range_headers = []
sources = [(source1, node), (source2, node)]
req = Request.blank('/v1/a/c/o')
def mock_source_and_node_gen():
for source in sources:
range_headers.append(self.getter.backend_headers.get('Range'))
yield source
self.getter.fragment_size = 8
with mock.patch.object(self.getter, '_source_and_node_gen',
mock_source_and_node_gen):
it = self.getter.response_parts_iter(req)
fragments = list(next(it)['part_iter'])
self.assertEqual(fragments, [b'abcd1234', b'efgh5678'])
self.assertEqual(range_headers, [None, 'bytes=8-'])
if __name__ == '__main__':
unittest.main()