Merge "Refactor for extract closure to method"
This commit is contained in:
commit
3e08fcd7b1
@ -1034,7 +1034,13 @@ class GetOrHeadHandler(object):
|
||||
self.concurrency = concurrency
|
||||
self.policy = policy
|
||||
self.node = None
|
||||
self.source = None
|
||||
self.source_parts_iter = None
|
||||
self.latest_404_timestamp = Timestamp(0)
|
||||
if self.server_type == 'Object':
|
||||
self.node_timeout = self.app.recoverable_node_timeout
|
||||
else:
|
||||
self.node_timeout = self.app.node_timeout
|
||||
policy_options = self.app.get_policy_options(self.policy)
|
||||
self.rebalance_missing_suppression_count = min(
|
||||
policy_options.rebalance_missing_suppression_count,
|
||||
@ -1177,156 +1183,148 @@ class GetOrHeadHandler(object):
|
||||
return True
|
||||
return is_success(src.status) or is_redirection(src.status)
|
||||
|
||||
def _get_response_parts_iter(self, req, node, source):
|
||||
# Someday we can replace this [mess] with python 3's "nonlocal"
|
||||
source = [source]
|
||||
node = [node]
|
||||
def get_next_doc_part(self):
|
||||
while True:
|
||||
try:
|
||||
# This call to next() performs IO when we have a
|
||||
# multipart/byteranges response; it reads the MIME
|
||||
# boundary and part headers.
|
||||
#
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and either just returns source or
|
||||
# raises StopIteration.
|
||||
with WatchdogTimeout(self.app.watchdog, self.node_timeout,
|
||||
ChunkReadTimeout):
|
||||
# if StopIteration is raised, it escapes and is
|
||||
# handled elsewhere
|
||||
start_byte, end_byte, length, headers, part = next(
|
||||
self.source_parts_iter)
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
self.source_parts_iter = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
raise StopIteration()
|
||||
|
||||
def iter_bytes_from_response_part(self, part_file, nbytes):
|
||||
buf = b''
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
while True:
|
||||
try:
|
||||
with WatchdogTimeout(self.app.watchdog, self.node_timeout,
|
||||
ChunkReadTimeout):
|
||||
chunk = part_file.read(self.app.object_chunk_size)
|
||||
# NB: this append must be *inside* the context
|
||||
# manager for test.unit.SlowBody to do its thing
|
||||
buf += chunk
|
||||
if nbytes is not None:
|
||||
nbytes -= len(chunk)
|
||||
except (ChunkReadTimeout, ShortReadError):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
if self.newest or self.server_type != 'Object':
|
||||
raise
|
||||
try:
|
||||
self.fast_forward(self.bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = b''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
self.node, 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
self.source = new_source
|
||||
self.node = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
self.source_parts_iter = \
|
||||
http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
self.get_next_doc_part()
|
||||
except StopIteration:
|
||||
# Tried to find a new node from which to
|
||||
# finish the GET, but failed. There's
|
||||
# nothing more we can do here.
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
else:
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
else:
|
||||
if buf and self.skip_bytes:
|
||||
if self.skip_bytes < len(buf):
|
||||
buf = buf[self.skip_bytes:]
|
||||
self.bytes_used_from_backend += self.skip_bytes
|
||||
self.skip_bytes = 0
|
||||
else:
|
||||
self.skip_bytes -= len(buf)
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
buf = b''
|
||||
|
||||
if not chunk:
|
||||
if buf:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
break
|
||||
|
||||
if self.client_chunk_size is not None:
|
||||
while len(buf) >= self.client_chunk_size:
|
||||
client_chunk = buf[:self.client_chunk_size]
|
||||
buf = buf[self.client_chunk_size:]
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += \
|
||||
len(client_chunk)
|
||||
yield client_chunk
|
||||
else:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
|
||||
def _get_response_parts_iter(self, req):
|
||||
try:
|
||||
client_chunk_size = self.client_chunk_size
|
||||
node_timeout = self.app.node_timeout
|
||||
if self.server_type == 'Object':
|
||||
node_timeout = self.app.recoverable_node_timeout
|
||||
|
||||
# This is safe; it sets up a generator but does not call next()
|
||||
# on it, so no IO is performed.
|
||||
parts_iter = [
|
||||
http_response_to_document_iters(
|
||||
source[0], read_chunk_size=self.app.object_chunk_size)]
|
||||
|
||||
def get_next_doc_part():
|
||||
while True:
|
||||
try:
|
||||
# This call to next() performs IO when we have a
|
||||
# multipart/byteranges response; it reads the MIME
|
||||
# boundary and part headers.
|
||||
#
|
||||
# If we don't have a multipart/byteranges response,
|
||||
# but just a 200 or a single-range 206, then this
|
||||
# performs no IO, and either just returns source or
|
||||
# raises StopIteration.
|
||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||
ChunkReadTimeout):
|
||||
# if StopIteration is raised, it escapes and is
|
||||
# handled elsewhere
|
||||
start_byte, end_byte, length, headers, part = next(
|
||||
parts_iter[0])
|
||||
return (start_byte, end_byte, length, headers, part)
|
||||
except ChunkReadTimeout:
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it sets up a generator but does
|
||||
# not call next() on it, so no IO is performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
else:
|
||||
raise StopIteration()
|
||||
|
||||
def iter_bytes_from_response_part(part_file, nbytes):
|
||||
buf = b''
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
while True:
|
||||
try:
|
||||
with WatchdogTimeout(self.app.watchdog, node_timeout,
|
||||
ChunkReadTimeout):
|
||||
chunk = part_file.read(self.app.object_chunk_size)
|
||||
# NB: this append must be *inside* the context
|
||||
# manager for test.unit.SlowBody to do its thing
|
||||
buf += chunk
|
||||
if nbytes is not None:
|
||||
nbytes -= len(chunk)
|
||||
except (ChunkReadTimeout, ShortReadError):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
if self.newest or self.server_type != 'Object':
|
||||
raise
|
||||
try:
|
||||
self.fast_forward(self.bytes_used_from_backend)
|
||||
except (HTTPException, ValueError):
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
except RangeAlreadyComplete:
|
||||
break
|
||||
buf = b''
|
||||
new_source, new_node = self._get_source_and_node()
|
||||
if new_source:
|
||||
self.app.error_occurred(
|
||||
node[0], 'Trying to read object during '
|
||||
'GET (retrying)')
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
source[0] = new_source
|
||||
node[0] = new_node
|
||||
# This is safe; it just sets up a generator but
|
||||
# does not call next() on it, so no IO is
|
||||
# performed.
|
||||
parts_iter[0] = http_response_to_document_iters(
|
||||
new_source,
|
||||
read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
try:
|
||||
_junk, _junk, _junk, _junk, part_file = \
|
||||
get_next_doc_part()
|
||||
except StopIteration:
|
||||
# Tried to find a new node from which to
|
||||
# finish the GET, but failed. There's
|
||||
# nothing more we can do here.
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
part_file = ByteCountEnforcer(part_file, nbytes)
|
||||
else:
|
||||
six.reraise(exc_type, exc_value, exc_traceback)
|
||||
else:
|
||||
if buf and self.skip_bytes:
|
||||
if self.skip_bytes < len(buf):
|
||||
buf = buf[self.skip_bytes:]
|
||||
self.bytes_used_from_backend += self.skip_bytes
|
||||
self.skip_bytes = 0
|
||||
else:
|
||||
self.skip_bytes -= len(buf)
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
buf = b''
|
||||
|
||||
if not chunk:
|
||||
if buf:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
break
|
||||
|
||||
if client_chunk_size is not None:
|
||||
while len(buf) >= client_chunk_size:
|
||||
client_chunk = buf[:client_chunk_size]
|
||||
buf = buf[client_chunk_size:]
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += \
|
||||
len(client_chunk)
|
||||
yield client_chunk
|
||||
else:
|
||||
with WatchdogTimeout(self.app.watchdog,
|
||||
self.app.client_timeout,
|
||||
ChunkWriteTimeout):
|
||||
self.bytes_used_from_backend += len(buf)
|
||||
yield buf
|
||||
buf = b''
|
||||
self.source_parts_iter = http_response_to_document_iters(
|
||||
self.source, read_chunk_size=self.app.object_chunk_size)
|
||||
|
||||
part_iter = None
|
||||
try:
|
||||
while True:
|
||||
start_byte, end_byte, length, headers, part = \
|
||||
get_next_doc_part()
|
||||
self.get_next_doc_part()
|
||||
# note: learn_size_from_content_range() sets
|
||||
# self.skip_bytes
|
||||
self.learn_size_from_content_range(
|
||||
@ -1339,7 +1337,7 @@ class GetOrHeadHandler(object):
|
||||
and start_byte is not None)
|
||||
else None)
|
||||
part_iter = CooperativeIterator(
|
||||
iter_bytes_from_response_part(part, byte_count))
|
||||
self.iter_bytes_from_response_part(part, byte_count))
|
||||
yield {'start_byte': start_byte, 'end_byte': end_byte,
|
||||
'entity_length': length, 'headers': headers,
|
||||
'part_iter': part_iter}
|
||||
@ -1351,7 +1349,7 @@ class GetOrHeadHandler(object):
|
||||
part_iter.close()
|
||||
|
||||
except ChunkReadTimeout:
|
||||
self.app.exception_occurred(node[0], 'Object',
|
||||
self.app.exception_occurred(self.node, 'Object',
|
||||
'Trying to read during GET')
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
@ -1378,8 +1376,8 @@ class GetOrHeadHandler(object):
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source[0], 'swift_conn', None):
|
||||
close_swift_conn(source[0])
|
||||
if getattr(self.source, 'swift_conn', None):
|
||||
close_swift_conn(self.source)
|
||||
|
||||
@property
|
||||
def last_status(self):
|
||||
@ -1546,7 +1544,7 @@ class GetOrHeadHandler(object):
|
||||
return source, node
|
||||
return None, None
|
||||
|
||||
def _make_app_iter(self, req, node, source):
|
||||
def _make_app_iter(self, req):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
func). There is also quite a bit of cleanup to ensure garbage
|
||||
@ -1558,7 +1556,7 @@ class GetOrHeadHandler(object):
|
||||
:param node: The node the source is reading from, for logging purposes.
|
||||
"""
|
||||
|
||||
ct = source.getheader('Content-Type')
|
||||
ct = self.source.getheader('Content-Type')
|
||||
if ct:
|
||||
content_type, content_type_attrs = parse_content_type(ct)
|
||||
is_multipart = content_type == 'multipart/byteranges'
|
||||
@ -1571,7 +1569,7 @@ class GetOrHeadHandler(object):
|
||||
# furnished one for us, so we'll just re-use it
|
||||
boundary = dict(content_type_attrs)["boundary"]
|
||||
|
||||
parts_iter = self._get_response_parts_iter(req, node, source)
|
||||
parts_iter = self._get_response_parts_iter(req)
|
||||
|
||||
def add_content_type(response_part):
|
||||
response_part["content_type"] = \
|
||||
@ -1591,7 +1589,9 @@ class GetOrHeadHandler(object):
|
||||
update_headers(res, source.getheaders())
|
||||
if req.method == 'GET' and \
|
||||
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
|
||||
res.app_iter = self._make_app_iter(req, node, source)
|
||||
self.source = source
|
||||
self.node = node
|
||||
res.app_iter = self._make_app_iter(req)
|
||||
# See NOTE: swift_conn at top of file about this.
|
||||
res.swift_conn = source.swift_conn
|
||||
if not res.environ:
|
||||
|
@ -1299,7 +1299,9 @@ class TestFuncs(BaseTest):
|
||||
self.app, req, None, Namespace(num_primary_nodes=3), None, None,
|
||||
{}, client_chunk_size=8)
|
||||
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
handler.source = source
|
||||
handler.node = node
|
||||
app_iter = handler._make_app_iter(req)
|
||||
client_chunks = list(app_iter)
|
||||
self.assertEqual(client_chunks, [
|
||||
b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12'])
|
||||
@ -1350,7 +1352,9 @@ class TestFuncs(BaseTest):
|
||||
range_headers.append(handler.backend_headers['Range'])
|
||||
return sources.pop(0)
|
||||
|
||||
app_iter = handler._make_app_iter(req, node, source1)
|
||||
handler.source = source1
|
||||
handler.node = node
|
||||
app_iter = handler._make_app_iter(req)
|
||||
with mock.patch.object(handler, '_get_source_and_node',
|
||||
side_effect=mock_get_source_and_node):
|
||||
client_chunks = list(app_iter)
|
||||
@ -1392,7 +1396,9 @@ class TestFuncs(BaseTest):
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
None, {}, client_chunk_size=8)
|
||||
|
||||
app_iter = handler._make_app_iter(req, node, source1)
|
||||
handler.source = source1
|
||||
handler.node = node
|
||||
app_iter = handler._make_app_iter(req)
|
||||
with mock.patch.object(handler, '_get_source_and_node',
|
||||
lambda: (source2, node)):
|
||||
client_chunks = list(app_iter)
|
||||
@ -1423,7 +1429,9 @@ class TestFuncs(BaseTest):
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
'some-path', {})
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
handler.source = source
|
||||
handler.node = node
|
||||
app_iter = handler._make_app_iter(req)
|
||||
app_iter.close()
|
||||
self.app.logger.info.assert_called_once_with(
|
||||
'Client disconnected on read of %r', 'some-path')
|
||||
@ -1433,7 +1441,9 @@ class TestFuncs(BaseTest):
|
||||
handler = GetOrHeadHandler(
|
||||
self.app, req, 'Object', Namespace(num_primary_nodes=1), None,
|
||||
None, {})
|
||||
app_iter = handler._make_app_iter(req, node, source)
|
||||
handler.source = source
|
||||
handler.node = node
|
||||
app_iter = handler._make_app_iter(req)
|
||||
next(app_iter)
|
||||
app_iter.close()
|
||||
self.app.logger.warning.assert_not_called()
|
||||
|
Loading…
x
Reference in New Issue
Block a user