Merge "slo: refactor GET/HEAD response handling"

This commit is contained in:
Zuul 2023-11-13 20:04:28 +00:00 committed by Gerrit Code Review
commit b0d0c49438
9 changed files with 1036 additions and 434 deletions

View File

@ -334,6 +334,7 @@ import time
import six
from swift.cli.container_deleter import make_delete_jobs
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.exceptions import ListingIterError, SegmentError
from swift.common.middleware.listing_formats import \
MAX_CONTAINER_LISTING_CONTENT_LENGTH
@ -345,15 +346,16 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
RESPONSE_REASONS, str_to_wsgi, bytes_to_wsgi, wsgi_to_str, wsgi_quote
from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
RateLimitedIterator, quote, close_if_possible, closing_if_possible, \
LRUCache, StreamingPile, strict_b64decode, Timestamp, drain_and_close, \
RateLimitedIterator, quote, closing_if_possible, \
LRUCache, StreamingPile, strict_b64decode, Timestamp, friendly_close, \
get_expirer_container, md5
from swift.common.registry import register_swift_info
from swift.common.request_helpers import SegmentedIterable, \
get_sys_meta_prefix, update_etag_is_at_header, resolve_etag_is_at_header, \
get_container_update_override_key, update_ignore_range_header
get_container_update_override_key, update_ignore_range_header, \
get_param
from swift.common.constraints import check_utf8, AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED
from swift.common.wsgi import WSGIContext, make_subrequest, make_env, \
make_pre_authed_request
from swift.common.middleware.bulk import get_response_body, \
@ -530,6 +532,129 @@ def parse_and_validate_input(req_body, req_path):
return parsed_data
def _annotate_segments(segments, logger=None):
"""
Decode any inlined data and update sub_slo segments bytes from content-type
when available; then annotate segment dicts in segments list with
'segment_length'.
N.B. raw_data segments don't have a bytes key and range-segments need to
calculate their length from their range key but afterwards all segments
dicts will have 'segment_length' representing the length of the segment.
"""
for seg_dict in segments:
if 'data' in seg_dict:
seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data'))
segment_length = len(seg_dict['raw_data'])
else:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=logger)
seg_range = seg_dict.get('range')
if seg_range is not None:
# The range is of the form N-M, where N and M are both
# positive decimal integers. We know this because this
# middleware is the only thing that creates the SLO
# manifests stored in the cluster.
range_start, range_end = [
int(x) for x in seg_range.split('-')]
segment_length = (range_end - range_start) + 1
else:
segment_length = int(seg_dict['bytes'])
seg_dict['segment_length'] = segment_length
class RespAttrs(object):
"""
Encapsulate properties of a GET or HEAD response that are pertinent to
handling a potential SLO response.
Instances of this class are typically constructed using the
``from_headers`` method.
:param is_slo: True if the response appears to be an SLO manifest, False
otherwise.
:param timestamp: an instance of :class:`~swift.common.utils.Timestamp`.
:param manifest_etag: the Etag of the manifest object, or None if
``is_slo`` is False.
:param slo_etag: the Etag of the SLO.
:param slo_size: the size of the SLO.
"""
def __init__(self, is_slo, timestamp, manifest_etag, slo_etag, slo_size):
self.is_slo = bool(is_slo)
self.timestamp = Timestamp(timestamp or 0)
# manifest_etag is unambiguous, but json_md5 is even more explicit
self.json_md5 = manifest_etag or ''
self.slo_etag = slo_etag or ''
try:
# even though it's from sysmeta, we have to worry about empty
# values - see test_get_invalid_sysmeta_passthrough
self.slo_size = int(slo_size)
except (ValueError, TypeError):
self.slo_size = -1
self.is_legacy = not self._has_size_and_etag()
def _has_size_and_etag(self):
return self.slo_size > 0 and self.slo_etag
@classmethod
def from_headers(cls, response_headers):
"""
Inspect response headers and extract any resp_attrs we can find.
:param response_headers: list of tuples from a object response
:returns: an instance of RespAttrs to represent the response headers
"""
is_slo = False
timestamp = None
found_etag = None
slo_etag = None
slo_size = None
for header, value in response_headers:
header = header.lower()
if header == 'x-static-large-object':
is_slo = config_true_value(value)
elif header == 'x-backend-timestamp':
timestamp = value
elif header == 'etag':
found_etag = value
elif header == SYSMETA_SLO_ETAG:
slo_etag = value
elif header == SYSMETA_SLO_SIZE:
slo_size = value
manifest_etag = found_etag if is_slo else None
return cls(is_slo, timestamp, manifest_etag, slo_etag, slo_size)
def update_from_segments(self, segments):
"""
Always called if SLO has fetched the manifest response body, for
legacy manifests we'll calculate size/etag values we wouldn't have
gotten from sys-meta headers.
"""
# we only have to set size/etag once; it doesn't matter if we got the
# values from sysmeta headers or segments
if self._has_size_and_etag():
return
calculated_size = 0
calculated_etag = md5(usedforsecurity=False)
for seg_dict in segments:
calculated_size += seg_dict['segment_length']
if 'raw_data' in seg_dict:
r = md5(seg_dict['raw_data'],
usedforsecurity=False).hexdigest()
elif seg_dict.get('range'):
r = '%s:%s;' % (seg_dict['hash'], seg_dict['range'])
else:
r = seg_dict['hash']
calculated_etag.update(r.encode('ascii'))
self.slo_size = calculated_size
self.slo_etag = calculated_etag.hexdigest()
class SloGetContext(WSGIContext):
max_slo_recursion_depth = 10
@ -537,6 +662,8 @@ class SloGetContext(WSGIContext):
def __init__(self, slo):
self.slo = slo
super(SloGetContext, self).__init__(slo.app)
# we'll know more after we look at the response metadata
self.segment_listing_needed = False
def _fetch_sub_slo_segments(self, req, version, acc, con, obj):
"""
@ -571,9 +698,8 @@ class SloGetContext(WSGIContext):
body if len(body) <= 60 else body[:57] + '...'))
try:
with closing_if_possible(sub_resp.app_iter):
return json.loads(b''.join(sub_resp.app_iter))
except ValueError as err:
return self._parse_segments(sub_resp.app_iter)
except HTTPException as err:
raise ListingIterError(
'while fetching %s, JSON-decoding of submanifest %s '
'failed with %s' % (req.path, sub_req.path, err))
@ -584,32 +710,8 @@ class SloGetContext(WSGIContext):
conobj=seg_dict['name'].lstrip('/')
)
def _segment_length(self, seg_dict):
"""
Returns the number of bytes that will be fetched from the specified
segment on a plain GET request for this SLO manifest.
"""
if 'raw_data' in seg_dict:
return len(seg_dict['raw_data'])
seg_range = seg_dict.get('range')
if seg_range is not None:
# The range is of the form N-M, where N and M are both positive
# decimal integers. We know this because this middleware is the
# only thing that creates the SLO manifests stored in the
# cluster.
range_start, range_end = [int(x) for x in seg_range.split('-')]
return (range_end - range_start) + 1
else:
return int(seg_dict['bytes'])
def _segment_listing_iterator(self, req, version, account, segments,
byteranges):
for seg_dict in segments:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(seg_dict,
logger=self.slo.logger)
# We handle the range stuff here so that we can be smart about
# skipping unused submanifests. For example, if our first segment is a
# submanifest referencing 50 MiB total, but start_byte falls in
@ -617,9 +719,6 @@ class SloGetContext(WSGIContext):
#
# If we were to make SegmentedIterable handle all the range
# calculations, we would be unable to make this optimization.
total_length = sum(self._segment_length(seg) for seg in segments)
if not byteranges:
byteranges = [(0, total_length - 1)]
# Cache segments from sub-SLOs in case more than one byterange
# includes data from a particular sub-SLO. We only cache a few sets
@ -646,12 +745,26 @@ class SloGetContext(WSGIContext):
first_byte, last_byte,
cached_fetch_sub_slo_segments,
recursion_depth=1):
"""
Iterable that generates a filtered and annotated stream of segment
dicts describing the sub-segment ranges that would be used by the
SegmentedIterable to construct the bytes for a ranged response.
:param req: original request object
:param version: version
:param account: account
:param segments: segments dictionary
:param first_byte: offset into the large object for the first byte
that is returned to the client
:param last_byte: offset into the large object for the last byte
that is returned to the client
:param cached_fetch_sub_slo_segments: LRU cache used for fetching
sub-segments
:param recursion_depth: max number of recursive sub_slo calls
"""
last_sub_path = None
for seg_dict in segments:
if 'data' in seg_dict:
seg_dict['raw_data'] = strict_b64decode(seg_dict.pop('data'))
seg_length = self._segment_length(seg_dict)
seg_length = seg_dict['segment_length']
if first_byte >= seg_length:
# don't need any bytes from this segment
first_byte -= seg_length
@ -718,50 +831,194 @@ class SloGetContext(WSGIContext):
first_byte -= seg_length
last_byte -= seg_length
def _need_to_refetch_manifest(self, req):
"""
Just because a response shows that an object is a SLO manifest does not
mean that response's body contains the entire SLO manifest. If it
doesn't, we need to make a second request to actually get the whole
thing.
def _is_body_complete(self):
content_range = ''
for header, value in self._response_headers:
if header.lower() == 'content-range':
content_range = value
break
# e.g. Content-Range: bytes 0-14289/14290
match = re.match(r'bytes (\d+)-(\d+)/(\d+)$', content_range)
if not match:
# Malformed or missing, so we don't know what we got.
return False
first_byte, last_byte, length = [int(x) for x in match.groups()]
# If and only if we actually got back the full manifest body, then
# we can avoid re-fetching the object.
return first_byte == 0 and last_byte == length - 1
Note: this assumes that X-Static-Large-Object has already been found.
def _is_manifest_and_need_to_refetch(self, req, resp_attrs,
is_manifest_get):
"""
Check if the segments will be needed to service the request and update
the segment_listing_needed attribute.
:return: boolean indicating if we need to refetch, only if the segments
ARE needed we MAY need to refetch them!
"""
if not resp_attrs.is_slo:
# Not a static large object manifest, maybe an error, regardless
# no refetch needed
return False
if is_manifest_get:
# Any manifest json object response will do
return False
if req.method == 'HEAD':
# We've already looked for SYSMETA_SLO_ETAG/SIZE in the response
# and didn't find them. We have to fetch the whole manifest and
# recompute.
# There may be some cases in the future where a HEAD resp on even a
# modern manifest should refetch, e.g. lp bug #2029174
self.segment_listing_needed = resp_attrs.is_legacy
# it will always be the case that a HEAD must re-fetch iff
# segment_listing_needed
return self.segment_listing_needed
last_resp_status_int = self._get_status_int()
# These are based on etag (or last-modified), but the SLO's etag is
# almost certainly not the manifest object's etag. Still, it's highly
# likely that the submitted If-None-Match won't match the manifest
# object's etag, so we can avoid re-fetching the manifest if we got a
# successful response.
if last_resp_status_int in (412, 304):
# a conditional response from a modern manifest would have an
# accurate SLO etag, AND comparison with the etag-is-at header, but
# for legacy manifests responses (who always need to calculate the
# correct etag, even for if-[un]modified-since errors) we can't say
# what the etag is or if it matches unless we calculate it from
# segments - so we always need them
self.segment_listing_needed = resp_attrs.is_legacy
# if we need them; we can't get them from the error
return self.segment_listing_needed
# This is GET request for an SLO object, if we're going to return a
# successful response we're going to need the segments, but this
# resp_iter may not contain the entire SLO manifest.
self.segment_listing_needed = True
# modern swift object-servers should ignore Range headers on manifests,
# but during upgrade if we get a range response we'll probably have to
# refetch
if last_resp_status_int == 416:
# if the range wasn't satisfiable we need to refetch
return True
elif last_resp_status_int == 206:
# a partial response might included the whole content-range?!
return not self._is_body_complete()
else:
# a good number of error responses would have returned earlier for
# lacking is_slo sys-meta, at this point we've filtered all the
# other response codes, so this is a prefectly normal 200 response,
# no need to refetch
return False
response_status = int(self._response_status[:3])
def _refetch_manifest(self, req, resp_iter, orig_resp_attrs):
req.environ['swift.non_client_disconnect'] = True
friendly_close(resp_iter)
del req.environ['swift.non_client_disconnect']
# These are based on etag, and the SLO's etag is almost certainly not
# the manifest object's etag. Still, it's highly likely that the
# submitted If-None-Match won't match the manifest object's etag, so
# we can avoid re-fetching the manifest if we got a successful
# response.
if ((req.if_match or req.if_none_match) and
not is_success(response_status)):
return True
get_req = make_subrequest(
req.environ, method='GET',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartGET', swift_source='SLO')
resp_iter = self._app_call(get_req.environ)
new_resp_attrs = RespAttrs.from_headers(self._response_headers)
if new_resp_attrs.timestamp < orig_resp_attrs.timestamp and \
not new_resp_attrs.is_slo:
# Our *orig_resp_attrs* saw *newer* data that indicated it was an
# SLO, but on refetch it's an older object or error; 503 seems
# reasonable?
friendly_close(resp_iter)
raise HTTPServiceUnavailable(request=req)
# else, the caller will know how to return this response
return new_resp_attrs, resp_iter
if req.range and response_status in (206, 416):
content_range = ''
for header, value in self._response_headers:
if header.lower() == 'content-range':
content_range = value
break
# e.g. Content-Range: bytes 0-14289/14290
match = re.match(r'bytes (\d+)-(\d+)/(\d+)$', content_range)
if not match:
# Malformed or missing, so we don't know what we got.
return True
first_byte, last_byte, length = [int(x) for x in match.groups()]
# If and only if we actually got back the full manifest body, then
# we can avoid re-fetching the object.
got_everything = (first_byte == 0 and last_byte == length - 1)
return not got_everything
def _parse_segments(self, resp_iter):
"""
Read the manifest body and parse segments.
return False
:returns: segments
:raises: HTTPServerError
"""
segments = self._get_manifest_read(resp_iter)
_annotate_segments(segments, logger=self.slo.logger)
return segments
def _return_manifest_response(self, req, start_response, resp_iter,
is_format_raw):
if is_format_raw:
json_data = self.convert_segment_listing(resp_iter)
# we've created a new response body
resp_iter = [json_data]
replace_headers = {
# Note that we have to return the large object's content-type
# (not application/json) so it's like what the client sent on
# PUT. Otherwise, server-side copy won't work.
'Content-Length': len(json_data),
'Etag': md5(json_data, usedforsecurity=False).hexdigest(),
}
else:
# we're going to return the manifest resp_iter as-is
replace_headers = {
'Content-Type': 'application/json; charset=utf-8',
}
return self._return_response(req, start_response, resp_iter,
replace_headers)
def _return_slo_response(self, req, start_response, resp_iter, resp_attrs):
if self.segment_listing_needed:
# consume existing resp_iter; we'll create a new one
segments = self._parse_segments(resp_iter)
resp_attrs.update_from_segments(segments)
if req.method == 'HEAD':
resp_iter = []
else:
resp_iter = self._build_resp_iter(req, segments, resp_attrs)
headers = {
'Etag': '"%s"' % resp_attrs.slo_etag,
'X-Manifest-Etag': resp_attrs.json_md5,
# This isn't correct for range requests, but swob will fix it?
'Content-Length': str(resp_attrs.slo_size),
# ignore bogus content-range, make swob figure it out
'Content-Range': None
}
return self._return_response(req, start_response, resp_iter,
replace_headers=headers)
def _return_response(self, req, start_response, resp_iter,
replace_headers):
if req.method == 'HEAD' or self._get_status_int() in (412, 304):
# we should drain HEAD and unmet condition responses since they
# don't have bodies
friendly_close(resp_iter)
resp_iter = b''
resp_headers = HeaderKeyDict(self._response_headers, **replace_headers)
resp = Response(
status=self._response_status,
headers=resp_headers,
app_iter=resp_iter,
request=req,
conditional_response=True,
conditional_etag=resolve_etag_is_at_header(req, resp_headers))
return resp(req.environ, start_response)
def _return_non_slo_response(self, req, start_response, resp_iter):
# our "pass-through" response may have been from a manifest refetch w/o
# range/conditional headers that turned out to be a real object, and
# now we want out. But if the original client request included Range
# or Conditional headers we can trust swob to do the right conversion
# back into a 206/416/304/412 (as long as the response we have is a
# normal successful response and we respect any forwarding middleware's
# etag-is-at header that we stripped off for the refetch!)
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_response=self._get_status_int() == 200,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers)
)
return resp(req.environ, start_response)
def handle_slo_get_or_head(self, req, start_response):
"""
@ -774,137 +1031,61 @@ class SloGetContext(WSGIContext):
large object manifest.
:param start_response: WSGI start_response callable
"""
if req.params.get('multipart-manifest') != 'get':
is_manifest_get = get_param(req, 'multipart-manifest') == 'get'
is_format_raw = is_manifest_get and get_param(req, 'format') == 'raw'
if not is_manifest_get:
# If this object is an SLO manifest, we may have saved off the
# large object etag during the original PUT. Send an
# X-Backend-Etag-Is-At header so that, if the SLO etag *was*
# saved, we can trust the object-server to respond appropriately
# to If-Match/If-None-Match requests.
# X-Backend-Etag-Is-At header so that, if the SLO etag *was* saved,
# we can trust the object-server to respond appropriately to
# If-Match/If-None-Match requests.
update_etag_is_at_header(req, SYSMETA_SLO_ETAG)
# Tell the object server that if it's a manifest,
# we want the whole thing
update_ignore_range_header(req, 'X-Static-Large-Object')
# process original request
resp_iter = self._app_call(req.environ)
resp_attrs = RespAttrs.from_headers(self._response_headers)
# the next two calls hide a couple side-effects, sorry:
#
# 1) regardless of the return value the "need_to_refetch" check *may*
# also set self.segment_listing_needed = True (it's commented to
# help you wrap your head around that one, good luck)
# 2) if we refetch, we overwrite the current resp_iter and resp_attrs
# variables, partly because we *might* get back a NOT
# resp_attrs.is_slo response (even if we had one to start), but
# hopefully they're just the manifest resp we needed to refetch!
if self._is_manifest_and_need_to_refetch(req, resp_attrs,
is_manifest_get):
resp_attrs, resp_iter = self._refetch_manifest(
req, resp_iter, resp_attrs)
# make sure this response is for a static large object manifest
slo_marker = slo_etag = slo_size = slo_timestamp = None
for header, value in self._response_headers:
header = header.lower()
if header == SYSMETA_SLO_ETAG:
slo_etag = value
elif header == SYSMETA_SLO_SIZE:
slo_size = value
elif (header == 'x-static-large-object' and
config_true_value(value)):
slo_marker = value
elif header == 'x-backend-timestamp':
slo_timestamp = value
if not resp_attrs.is_slo:
# even if the original resp_attrs may have been SLO we may have
# refetched, this also handles the server error case
return self._return_non_slo_response(
req, start_response, resp_iter)
if slo_marker and slo_etag and slo_size and slo_timestamp:
break
if is_manifest_get:
# manifest pass through doesn't require resp_attrs
return self._return_manifest_response(req, start_response,
resp_iter, is_format_raw)
if not slo_marker:
# Not a static large object manifest. Just pass it through.
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
# this a GET/HEAD response for the SLO object (not the manifest)
return self._return_slo_response(req, start_response, resp_iter,
resp_attrs)
# Handle pass-through request for the manifest itself
if req.params.get('multipart-manifest') == 'get':
if req.params.get('format') == 'raw':
resp_iter = self.convert_segment_listing(
self._response_headers, resp_iter)
else:
new_headers = []
for header, value in self._response_headers:
if header.lower() == 'content-type':
new_headers.append(('Content-Type',
'application/json; charset=utf-8'))
else:
new_headers.append((header, value))
self._response_headers = new_headers
start_response(self._response_status,
self._response_headers,
self._response_exc_info)
return resp_iter
is_conditional = self._response_status.startswith(('304', '412')) and (
req.if_match or req.if_none_match)
if slo_etag and slo_size and (
req.method == 'HEAD' or is_conditional):
# Since we have length and etag, we can respond immediately
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers),
conditional_response=True)
resp.headers.update({
'Etag': '"%s"' % slo_etag,
'X-Manifest-Etag': self._response_header_value('etag'),
'Content-Length': slo_size,
})
return resp(req.environ, start_response)
if self._need_to_refetch_manifest(req):
req.environ['swift.non_client_disconnect'] = True
close_if_possible(resp_iter)
del req.environ['swift.non_client_disconnect']
get_req = make_subrequest(
req.environ, method='GET',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartGET', swift_source='SLO')
resp_iter = self._app_call(get_req.environ)
slo_marker = config_true_value(self._response_header_value(
'x-static-large-object'))
if not slo_marker: # will also catch non-2xx responses
got_timestamp = self._response_header_value(
'x-backend-timestamp') or '0'
if Timestamp(got_timestamp) >= Timestamp(slo_timestamp):
# We've got a newer response available, so serve that.
# Note that if there's data, it's going to be a 200 now,
# not a 206, and we're not going to drop bytes in the
# proxy on the client's behalf. Fortunately, the RFC is
# pretty forgiving for a server; there's no guarantee that
# a Range header will be respected.
resp = Response(
status=self._response_status,
headers=self._response_headers,
app_iter=resp_iter,
request=req,
conditional_etag=resolve_etag_is_at_header(
req, self._response_headers),
conditional_response=is_success(
int(self._response_status[:3])))
return resp(req.environ, start_response)
else:
# We saw newer data that indicated it's an SLO, but
# couldn't fetch the whole thing; 503 seems reasonable?
close_if_possible(resp_iter)
raise HTTPServiceUnavailable(request=req)
# NB: we might have gotten an out-of-date manifest -- that's OK;
# we'll just try to serve the old data
# Any Content-Range from a manifest is almost certainly wrong for the
# full large object.
resp_headers = [(h, v) for h, v in self._response_headers
if not h.lower() == 'content-range']
response = self.get_or_head_response(
req, resp_headers, resp_iter)
return response(req.environ, start_response)
def convert_segment_listing(self, resp_headers, resp_iter):
def convert_segment_listing(self, resp_iter):
"""
Converts the manifest data to match with the format
that was put in through ?multipart-manifest=put
:param resp_headers: response headers
:param resp_iter: a response iterable
:raises HTTPServerError:
:returns: the json-serialized raw format (as bytes)
"""
segments = self._get_manifest_read(resp_iter)
@ -921,108 +1102,36 @@ class SloGetContext(WSGIContext):
json_data = json.dumps(segments, sort_keys=True) # convert to string
if six.PY3:
json_data = json_data.encode('utf-8')
new_headers = []
for header, value in resp_headers:
if header.lower() == 'content-length':
new_headers.append(('Content-Length', len(json_data)))
elif header.lower() == 'etag':
new_headers.append(
('Etag', md5(json_data, usedforsecurity=False)
.hexdigest()))
else:
new_headers.append((header, value))
self._response_headers = new_headers
return [json_data]
return json_data
def _get_manifest_read(self, resp_iter):
with closing_if_possible(resp_iter):
resp_body = b''.join(resp_iter)
try:
segments = json.loads(resp_body)
except ValueError:
raise HTTPServerError('Unable to load SLO manifest')
except ValueError as e:
msg = 'Unable to load SLO manifest'
self.slo.logger.error('%s: %s', msg, e)
raise HTTPServerError(msg)
return segments
def get_or_head_response(self, req, resp_headers, resp_iter):
segments = self._get_manifest_read(resp_iter)
slo_etag = None
content_length = None
response_headers = []
for header, value in resp_headers:
lheader = header.lower()
if lheader == 'etag':
response_headers.append(('X-Manifest-Etag', value))
elif lheader != 'content-length':
response_headers.append((header, value))
def _build_resp_iter(self, req, segments, resp_attrs):
"""
Build a response iterable for a GET request.
if lheader == SYSMETA_SLO_ETAG:
slo_etag = value
elif lheader == SYSMETA_SLO_SIZE:
# it's from sysmeta, so we don't worry about non-integer
# values here
content_length = int(value)
:param req: the request object
:param resp_attrs: the slo attributes
# Prep to calculate content_length & etag if necessary
if slo_etag is None:
calculated_etag = md5(usedforsecurity=False)
if content_length is None:
calculated_content_length = 0
for seg_dict in segments:
# Decode any inlined data; it's important that we do this *before*
# calculating the segment length and etag
if 'data' in seg_dict:
seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data'))
if slo_etag is None:
if 'raw_data' in seg_dict:
r = md5(seg_dict['raw_data'],
usedforsecurity=False).hexdigest()
elif seg_dict.get('range'):
r = '%s:%s;' % (seg_dict['hash'], seg_dict['range'])
else:
r = seg_dict['hash']
calculated_etag.update(r.encode('ascii'))
if content_length is None:
if config_true_value(seg_dict.get('sub_slo')):
override_bytes_from_content_type(
seg_dict, logger=self.slo.logger)
calculated_content_length += self._segment_length(seg_dict)
if slo_etag is None:
slo_etag = calculated_etag.hexdigest()
if content_length is None:
content_length = calculated_content_length
response_headers.append(('Content-Length', str(content_length)))
response_headers.append(('Etag', '"%s"' % slo_etag))
if req.method == 'HEAD':
return self._manifest_head_response(req, response_headers)
else:
return self._manifest_get_response(
req, content_length, response_headers, segments)
def _manifest_head_response(self, req, response_headers):
conditional_etag = resolve_etag_is_at_header(req, response_headers)
return HTTPOk(request=req, headers=response_headers, body=b'',
conditional_etag=conditional_etag,
conditional_response=True)
def _manifest_get_response(self, req, content_length, response_headers,
segments):
:returns: a segmented iterable
"""
if req.range:
byteranges = [
# For some reason, swob.Range.ranges_for_length adds 1 to the
# last byte's position.
(start, end - 1) for start, end
in req.range.ranges_for_length(content_length)]
in req.range.ranges_for_length(resp_attrs.slo_size)]
else:
byteranges = []
byteranges = [(0, resp_attrs.slo_size - 1)]
ver, account, _junk = req.split_path(3, 3, rest_with_last=True)
account = wsgi_to_str(account)
@ -1067,15 +1176,8 @@ class SloGetContext(WSGIContext):
# their Etag/Content Length no longer match the connection
# will drop. In this case a 409 Conflict will be logged in
# the proxy logs and the user will receive incomplete results.
return HTTPConflict(request=req)
conditional_etag = resolve_etag_is_at_header(req, response_headers)
response = Response(request=req, content_length=content_length,
headers=response_headers,
conditional_response=True,
conditional_etag=conditional_etag,
app_iter=segmented_iter)
return response
raise HTTPConflict(request=req)
return segmented_iter
class StaticLargeObject(object):
@ -1524,7 +1626,7 @@ class StaticLargeObject(object):
'/%s/%s/%s' % (vrs, account, str_to_wsgi(obj_name.lstrip('/')))
)
# Just request the last byte of non-SLO objects so we don't waste
# a bunch of resources in drain_and_close() below
# a resources in friendly_close() below
manifest_req = Request.blank('', new_env, range='bytes=-1')
update_ignore_range_header(manifest_req, 'X-Static-Large-Object')
resp = manifest_req.get_response(self.app)
@ -1543,7 +1645,7 @@ class StaticLargeObject(object):
raise HTTPServerError('Unable to load SLO manifest')
else:
# Drain and close GET request (prevents socket leaks)
drain_and_close(resp)
friendly_close(resp)
raise HTTPBadRequest('Not an SLO manifest')
elif resp.status_int == HTTP_NOT_FOUND:
raise HTTPNotFound('SLO manifest not found')
@ -1624,7 +1726,7 @@ class StaticLargeObject(object):
resp.status, resp.body)
return HTTPServiceUnavailable()
# consume the response (should be short)
drain_and_close(resp)
friendly_close(resp)
# Finally, delete the manifest
return self.app

View File

@ -36,7 +36,8 @@ from swift.common.swob import HTTPBadRequest, \
HTTPServiceUnavailable, Range, is_chunked, multi_range_iterator, \
HTTPPreconditionFailed, wsgi_to_bytes, wsgi_unquote, wsgi_to_str
from swift.common.utils import split_path, validate_device_partition, \
close_if_possible, maybe_multipart_byteranges_to_document_iters, \
close_if_possible, friendly_close, \
maybe_multipart_byteranges_to_document_iters, \
multipart_byteranges_to_document_iters, parse_content_type, \
parse_content_range, csv_append, list_from_csv, Spliterator, quote, \
RESERVED, config_true_value, md5, CloseableChain, select_ip_port
@ -742,7 +743,10 @@ class SegmentedIterable(object):
for x in mri:
yield x
finally:
self.close()
# Spliterator and multi_range_iterator can't possibly know we've
# consumed the whole of the app_iter, but we want to read/close the
# final segment response
friendly_close(self.app_iter)
def validate_first_segment(self):
"""

View File

@ -55,7 +55,7 @@ from six.moves import urllib
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import UTC, reiterate, split_path, Timestamp, pairs, \
close_if_possible, closing_if_possible, config_true_value, drain_and_close
close_if_possible, closing_if_possible, config_true_value, friendly_close
from swift.common.exceptions import InvalidTimestamp
@ -1395,12 +1395,15 @@ class Response(object):
if empty_resp is not None:
self.status = empty_resp
self.content_length = 0
# the existing successful response and it's app_iter have been
# determined to not meet the conditions of the reqeust, the
# response app_iter should be closed but not drained.
close_if_possible(app_iter)
return [b'']
if self.request and self.request.method == 'HEAD':
# We explicitly do NOT want to set self.content_length to 0 here
drain_and_close(app_iter) # be friendly to our app_iter
friendly_close(app_iter) # be friendly to our app_iter
return [b'']
if self.conditional_response and self.request and \

View File

@ -170,6 +170,11 @@ LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \
'{trans_time:.4f} "{additional_info}" {pid} ' \
'{policy_index}'
DEFAULT_LOCK_TIMEOUT = 10
# this is coupled with object-server.conf's network_chunk_size; if someone is
# running that unreasonably small they may find this number inefficient, but in
# the more likely case they've increased the value to optimize high througput
# transfers this will still cut off the transfer after the first chunk.
DEFAULT_DRAIN_LIMIT = 65536
class InvalidHashPathConfigError(ValueError):
@ -4018,7 +4023,7 @@ def closing_if_possible(maybe_closable):
close_if_possible(maybe_closable)
def drain_and_close(response_or_app_iter):
def drain_and_close(response_or_app_iter, read_limit=None):
"""
Drain and close a swob or WSGI response.
@ -4028,9 +4033,26 @@ def drain_and_close(response_or_app_iter):
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
if app_iter is None: # for example, if we used the Response.body property
return
for _chunk in app_iter:
pass
close_if_possible(app_iter)
bytes_read = 0
with closing_if_possible(app_iter):
for chunk in app_iter:
bytes_read += len(chunk)
if read_limit is not None and bytes_read >= read_limit:
break
def friendly_close(resp):
"""
Close a swob or WSGI response and maybe drain it.
It's basically free to "read" a HEAD or HTTPException response - the bytes
are probably already in our network buffers. For a larger response we
could possibly burn a lot of CPU/network trying to drain an un-used
response. This method will read up to DEFAULT_DRAIN_LIMIT bytes to avoid
logging a 499 in the proxy when it would otherwise be easy to just throw
away the small/empty body.
"""
return drain_and_close(resp, read_limit=DEFAULT_DRAIN_LIMIT)
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'

View File

@ -20,6 +20,7 @@ import json
import time
import unittest
import mock
from mock import patch
import six
@ -2254,9 +2255,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
expected_app_calls = [('HEAD', '/v1/AUTH_test/headtest/man')]
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_get_manifest_passthrough(self):
@ -2301,9 +2299,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_match_etag_not_matching(self):
req = Request.blank(
@ -2323,9 +2318,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_none_match_etag_matching_with_override(self):
req = Request.blank(
@ -2349,9 +2341,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_if_match_etag_not_matching_with_override(self):
req = Request.blank(
@ -2375,9 +2364,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_app_calls.append(('GET', '/v1/AUTH_test/headtest/man'))
self.assertEqual(self.app.calls, expected_app_calls)
# XXX swob isn't draining the backend resp_iter on conditional error
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/headtest/man')] = 1
def test_head_manifest_is_efficient(self):
req = Request.blank(
@ -2402,9 +2388,6 @@ class TestSloHeadOldManifest(SloGETorHEADTestCase):
# might do it anyway.
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
self.assertEqual(self.app.calls, expected_calls)
@ -2826,10 +2809,6 @@ class TestSloGetManifests(SloGETorHEADTestCase):
self.assertIsNone(self.app.swift_sources[0])
self.assertEqual(self.app.swift_sources[1:],
['SLO'] * (len(self.app.swift_sources) - 1))
# XXX SegmntedIterable stops reading after the last byte of the last
# segment in app_iter_ranges
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')] = 1
def test_multiple_ranges_including_suffix_get_manifest(self):
req = Request.blank(
@ -2890,10 +2869,6 @@ class TestSloGetManifests(SloGETorHEADTestCase):
None, # b_10
'bytes=0-2,14-', # c_15
None]) # d_20
# XXX SegmntedIterable stops reading after the last byte of the last
# segment in app_iter_ranges
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')] = 1
class TestSloGetOldManifests(TestSloGetManifests):
@ -3052,9 +3027,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
self.assertEqual('bytes=100000-199999', self.app.headers[0]['Range'])
self.assertNotIn('Range', self.app.headers[1])
self.assertEqual('bytes=100000-199999', self.app.headers[2]['Range'])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_fails(self):
# new swift would have ignored the range and got the whole
@ -3078,10 +3050,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees 404 it closes that one too before returning an error.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
def test_old_swift_range_get_beyond_manifest_refetch_finds_old(self):
# new swift would have ignored the range and got the whole
@ -3105,11 +3073,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees older data it closes that one too before returning an
# error.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
def test_old_swift_range_get_beyond_manifest_refetch_small_non_slo(self):
# new swift would have ignored the range and got the whole
@ -3132,11 +3095,10 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch, and then
# when it sees older data it closes that one too before returning an
# error.
# swob is converting the successful non-slo response to conditional
# error and closing our unconditionally refetched resp_iter
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 2
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_big_non_slo(self):
# new swift would have ignored the range and got the whole
@ -3162,9 +3124,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_beyond_manifest_refetch_tombstone(self):
# new swift would have ignored the range and got the whole
@ -3188,9 +3147,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
# retry the first one
('GET', '/v1/AUTH_test/gettest/big_manifest'),
])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/big_manifest')] = 1
def test_old_swift_range_get_bogus_content_range(self):
# Just a little paranoia; Swift currently sends back valid
@ -3232,9 +3188,6 @@ class TestOldSwiftWithRanges(SloGETorHEADTestCase):
('GET', '/v1/AUTH_test/gettest/b_10?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/c_15?multipart-manifest=get'),
('GET', '/v1/AUTH_test/gettest/d_20?multipart-manifest=get')])
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_old_swift_range_get_includes_whole_range_manifest(self):
# If the first range GET results in retrieval of the entire manifest
@ -3387,10 +3340,6 @@ class TestSloRangeRequests(SloGETorHEADTestCase):
headers={'Range': 'bytes=100-200'})
status, headers, body = self.call_slo(req)
self.assertEqual(status, '416 Requested Range Not Satisfiable')
# XXX it seems we validate the first segment before handing the
# resp_iter to swob; who decides it can't serve the given range.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get')] = 1
def test_get_segment_with_non_ascii_path(self):
segment_body = u"a møøse once bit my sister".encode("utf-8")
@ -4081,14 +4030,20 @@ class TestSloErrors(SloGETorHEADTestCase):
self.assertEqual('200 OK', status)
self.assertEqual(body, b'aaaaa')
if six.PY2:
error = "No JSON object could be decoded"
else:
error = "Expecting value: line 1 column 2 (char 1)"
self.assertEqual(self.slo.logger.get_lines_for_level('error'), [
lines = self.slo.logger.get_lines_for_level('error')
self.assertEqual(lines, [
mock.ANY,
'while fetching /v1/AUTH_test/gettest/manifest-abcd, '
'JSON-decoding of submanifest /v1/AUTH_test/gettest/manifest-bc '
'failed with %s' % error
'failed with 500 Internal Error'
])
self.assertIn(lines[0], [
# py2
'Unable to load SLO manifest: '
'No JSON object could be decoded',
# py3
'Unable to load SLO manifest: '
'Expecting value: line 1 column 2 (char 1)',
])
def test_mismatched_etag(self):
@ -4254,9 +4209,6 @@ class TestSloErrors(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-badetag'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-badetag')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_first_segment_mismatched_size(self):
@ -4301,9 +4253,6 @@ class TestSloErrors(SloGETorHEADTestCase):
if not self.modern_manifest_headers:
expected_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-badsize'))
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('HEAD', '/v1/AUTH_test/gettest/manifest-badsize')] = 1
self.assertEqual(self.app.calls, expected_calls)
@patch('swift.common.request_helpers.time')
@ -4746,16 +4695,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
if self.modern_manifest_headers:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
else:
# XXX it seems we validate the first segment before handing the
# resp_iter to swob; who decides it can't serve the given range.
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
def test_if_none_match_mismatches(self):
req = Request.blank(
@ -4800,9 +4739,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd')
)
# XXX slo isn't draining the orig resp_iter after refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
@ -4849,9 +4785,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-None-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_none_match_matches_no_alternate_etag(self):
@ -4891,10 +4824,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/alt_00'
'?multipart-manifest=get')] = 1
else:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_none_match_mismatches_alternate_etag(self):
@ -4956,10 +4885,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
if not self.modern_manifest_headers:
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_if_match_mismatches(self):
req = Request.blank(
@ -4997,9 +4922,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
for headers in self.app.headers[1:]:
self.assertNotIn('If-Match', headers)
self.assertNotIn('X-Backend-Etag-Is-At', headers)
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_if_match_mismatches_manifest_json_md5(self):
req = Request.blank(
@ -5028,10 +4950,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# reading the remaining segments
self.expected_unread_requests[('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
else:
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_match_matches_alternate_etag(self):
@ -5078,9 +4996,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls = [('GET', '/v1/AUTH_test/c/manifest-alt')]
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'X-Object-Sysmeta-Alt-Etag,x-object-sysmeta-slo-etag')
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if not self.modern_manifest_headers:
expected_app_calls.extend([
# Needed to re-fetch because if-match can't find slo-etag
@ -5328,9 +5243,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_calls)
def test_if_match_matches_alternate_etag_non_slo_after_refetch(self):
# XXX 2/250 tests requiring this header sounds like a bug
self.app._responses[('GET', '/v1/AUTH_test/c/manifest-alt')][1][
'X-Backend-Timestamp'] = '1234'
self.app.register_next_response(
'GET', '/v1/AUTH_test/c/manifest-alt',
swob.HTTPOk, {'Content-Length': '25',
@ -5352,10 +5264,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertIn('X-Object-Sysmeta-Alt-Etag',
self.app.headers[0]['X-Backend-Etag-Is-At'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if self.modern_manifest_headers:
# and since the response includes modern sysmeta, slo trusts the
# 412 w/o refetch
@ -5384,16 +5292,10 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# wrapping middleware
self.assertEqual('alt-object-etag',
headers['X-Object-Sysmeta-Alt-Etag'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
def test_if_match_mismatches_alternate_etag_non_slo_after_refetch(self):
# XXX 2/~250 tests requiring this header sounds like a bug
self.app._responses[('GET', '/v1/AUTH_test/c/manifest-alt')][1][
'X-Backend-Timestamp'] = '1234'
self.app.register_next_response(
'GET', '/v1/AUTH_test/c/manifest-alt',
swob.HTTPOk, {'Content-Length': '25',
@ -5415,10 +5317,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertIn('X-Object-Sysmeta-Alt-Etag',
self.app.headers[0]['X-Backend-Etag-Is-At'])
# XXX swob doesn't drain the resp_iter on conditional error
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
if self.modern_manifest_headers:
# and since the response includes modern sysmeta, slo trusts the
# 412 w/o refetch
@ -5450,7 +5348,7 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
# swob is converting the successful non-slo response to conditional
# error and closing our unconditionally refetched resp_iter
self.expected_unread_requests[
('GET', '/v1/AUTH_test/c/manifest-alt')] += 1
('GET', '/v1/AUTH_test/c/manifest-alt')] = 1
self.assertEqual(self.app.calls, expected_app_calls)
@ -5476,9 +5374,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
expected_app_calls.append(
('GET', '/v1/AUTH_test/gettest/manifest-abcd'),
)
# XXX the orig resp_iter doesn't get closed
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
# and then fetch the segments
expected_app_calls.extend([
('GET', '/v1/AUTH_test/gettest/manifest-bc'),
@ -5515,9 +5410,6 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
self.assertEqual(self.app.calls, expected_app_calls)
self.assertEqual(self.app.headers[0].get('X-Backend-Etag-Is-At'),
'x-object-sysmeta-slo-etag')
# XXX slo isn't closing the orig resp_iter on refetch
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/manifest-abcd')] = 1
def test_range_resume_download(self):
req = Request.blank(
@ -5592,9 +5484,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Modified-Since': 'Mon, 23 Oct 2023 10:05:32 GMT',
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# nope, that was the last time it was changed
self.assertEqual(status, '304 Not Modified')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_modified_since_now(self):
now = datetime.now()
@ -5605,9 +5520,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Modified-Since': last_modified,
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# nope, that was the last time it was changed
self.assertEqual(status, '304 Not Modified')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_unmodified_since_ancient_date(self):
req = swob.Request.blank(
@ -5616,9 +5554,32 @@ class TestSloConditionalGetOldManifest(SloGETorHEADTestCase):
'If-Unmodified-Since': 'Fri, 01 Feb 2012 20:38:36 GMT',
})
status, headers, body = self.call_slo(req)
# XXX see lp bug #2040178
self.assertEqual(status, '500 Internal Error')
self.assertEqual(b'Unable to load SLO manifest', body)
# oh it's *definately* been modified since then!
self.assertEqual(status, '412 Precondition Failed')
self.assertEqual(headers['X-Static-Large-Object'], 'true')
self.assertEqual(headers['Etag'],
'"%s"' % self.manifest_last_modified_slo_etag)
self.assertEqual(headers['X-Manifest-Etag'],
self.manifest_last_modified_json_md5)
self.assertEqual(headers['Content-Length'], '0')
self.assertEqual('Mon, 23 Oct 2023 10:05:32 GMT',
headers['Last-Modified'])
self.assertEqual(b'', body)
expected_calls = [
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
]
if not self.modern_manifest_headers:
# N.B. legacy manifests must refetch for accurate Etag, and then we
# validate first segment before lettting swob return the error
expected_calls.extend([
('GET', '/v1/AUTH_test/c/manifest-last-modified'),
('GET', '/v1/AUTH_test/gettest/a_5?multipart-manifest=get'),
])
# we don't drain the segment's resp_iter if validation fails
self.expected_unread_requests[
('GET', '/v1/AUTH_test/gettest/a_5'
'?multipart-manifest=get')] = 1
self.assertEqual(self.app.calls, expected_calls)
def test_if_unmodified_since_last_modified(self):
req = swob.Request.blank(
@ -5929,5 +5890,352 @@ class TestNonSloPassthrough(SloGETorHEADTestCase):
])
class TestRespAttrs(unittest.TestCase):
def test_init_calculates_is_legacy(self):
attrs = slo.RespAttrs(True, 123456789.12345,
'manifest-etag', 'slo-etag', 999)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(999, attrs.slo_size)
# we gave it etag and size!
self.assertTrue(attrs._has_size_and_etag())
self.assertFalse(attrs.is_legacy)
def test_init_converts_timestamps_from_strings(self):
attrs = slo.RespAttrs(True, '123456789.12345',
'manifest-etag', 'slo-etag', 999)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(999, attrs.slo_size)
# we gave it etag and size!
self.assertTrue(attrs._has_size_and_etag())
self.assertFalse(attrs.is_legacy)
def test_default_types(self):
attrs = slo.RespAttrs(None, None, None, None, None)
# types are correct, values are default/place-holders
self.assertTrue(attrs.is_slo is False) # not None!
self.assertEqual(0, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_init_with_no_sysmeta(self):
now = Timestamp.now()
attrs = slo.RespAttrs(True, now.normal, None, None, None)
self.assertTrue(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_init_with_no_sysmeta_offset(self):
now = Timestamp.now(offset=123)
attrs = slo.RespAttrs(True, now.internal, None, None, None)
self.assertTrue(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertIsInstance(attrs.timestamp, Timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
# we didn't provide etag & size
self.assertFalse(attrs._has_size_and_etag())
self.assertTrue(attrs.is_legacy)
def test_from_empty_headers(self):
attrs = slo.RespAttrs.from_headers([])
self.assertFalse(attrs.is_slo)
self.assertEqual(0, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_from_only_timestamp(self):
now = Timestamp.now(offset=1)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', now.internal),
('X-Irrelevant', 'ignored')])
self.assertFalse(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_legacy_slo_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_partial_modern_sysmeta(self):
# missing slo etag
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', '1234')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
# missing slo size
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'slo-etag')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
# missing manifest etag
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', '1234'),
('x-object-sysmeta-slo-etag', 'slo-etag')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('slo-etag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
# missing Etag might be some kind of bug, but it has all sysmeta
self.assertFalse(attrs.is_legacy)
def test_invalid_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-size', 'huge!')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('e-TAG', 'wrong!'),
('x-object-sysmeta-slo-size', '')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_from_valid_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'slo-tag'),
('x-object-sysmeta-slo-size', '1234')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('slo-tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertFalse(attrs.is_legacy)
def test_from_regular_object(self):
now = Timestamp.now()
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', now.normal),
('Etag', 'object-etag')])
self.assertFalse(attrs.is_slo)
self.assertEqual(now, attrs.timestamp)
# N.B. we only set manifest_etag on slo objects
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
def test_non_slo_with_sysmeta(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'false')])
self.assertFalse(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'segment-etag'),
('x-object-sysmeta-slo-etag', 'tag'),
('x-object-sysmeta-slo-size', '1234')])
# this is NOT an SLO
self.assertFalse(attrs.is_slo)
self.assertEqual('', attrs.json_md5)
# ... but we set these based on the sysmeta values
self.assertEqual('tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
self.assertEqual(123456789.12345, attrs.timestamp)
# I hope someday a non-slo with slo sysmeta *will* be just a legacy,
# see lp bug #2035158
self.assertFalse(attrs.is_legacy)
def _legacy_from_headers(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('Etag', 'manifest-etag'),
('X-Static-lARGE-Object', 'yes')])
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('manifest-etag', attrs.json_md5)
self.assertEqual('', attrs.slo_etag)
self.assertEqual(-1, attrs.slo_size)
self.assertTrue(attrs.is_legacy)
return attrs
def test_update_from_segments(self):
attrs = self._legacy_from_headers()
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': 'def', 'bytes': 3},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abcdef'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(5, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_raw_data(self):
attrs = self._legacy_from_headers()
raw_data = b'something'
segments = [
{'hash': 'abc', 'bytes': 2},
{'data': base64.b64encode(raw_data)},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
raw_data_checksum = md5(raw_data).hexdigest()
exp_etag = md5(('abc' + raw_data_checksum).encode('ascii'),
usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(11, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_range(self):
attrs = self._legacy_from_headers()
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': 'def', 'range': '1-2'},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abcdef:1-2;'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(4, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_sub_slo(self):
attrs = self._legacy_from_headers()
content_type = 'application/octet-stream'
content_type += ";swift_bytes=%d" % 5
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': '123', 'sub_slo': True, 'content_type': content_type},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abc123'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(7, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_with_sub_slo_range(self):
attrs = self._legacy_from_headers()
content_type = 'application/octet-stream'
content_type += ";swift_bytes=%d" % 5
segments = [
{'hash': 'abc', 'bytes': 2},
{'hash': '123', 'sub_slo': True, 'content_type': content_type,
'range': '2-4'},
]
slo._annotate_segments(segments)
attrs.update_from_segments(segments)
exp_etag = md5('abc123:2-4;'.encode('ascii'), usedforsecurity=False)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual(exp_etag.hexdigest(), attrs.slo_etag)
self.assertEqual(5, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertTrue(attrs.is_legacy)
def test_update_from_segments_not_legacy(self):
attrs = slo.RespAttrs.from_headers(
[('X-Backend-Timestamp', '123456789.12345'),
('X-Static-lARGE-Object', 'yes'),
('x-object-sysmeta-slo-etag', 'tag'),
('x-object-sysmeta-slo-size', '1234')])
segments = 'not even json; does not matter'
attrs.update_from_segments(segments)
self.assertTrue(attrs.is_slo)
self.assertEqual(123456789.12345, attrs.timestamp)
self.assertEqual('tag', attrs.slo_etag)
self.assertEqual(1234, attrs.slo_size)
# N.B. it's still a legacy manifest
self.assertFalse(attrs.is_legacy)
if __name__ == '__main__':
unittest.main()

View File

@ -759,7 +759,3 @@ class TestSegmentedIterable(unittest.TestCase):
b'--bound--',
])
self.assertEqual(expected, body)
# XXX Spliterator stops SegementedIterable from asking to exhasut the
# segment response after it gets the last byte in app_iter_ranges
self.expected_unread_requests[
('GET', '/a/c/seg2?multipart-manifest=get')] = 1

View File

@ -22,6 +22,7 @@ import time
from io import BytesIO
import mock
import six
from six.moves.urllib.parse import quote
@ -1263,6 +1264,72 @@ class TestResponse(unittest.TestCase):
'read': 1,
})
def test_swob_drains_small_HEAD_resp_iter(self):
tracking = {
'closed': 0,
'read': 0,
}
def mark_closed(*args):
tracking['closed'] += 1
def mark_read(*args):
tracking['read'] += 1
def test_app(environ, start_response):
start_response('200 OK', [])
body = [b'hello', b'world']
return LeakTrackingIter(body, mark_closed, mark_read, None)
req = swob.Request.blank('/', method='HEAD')
status, headers, app_iter = req.call_application(test_app)
resp = swob.Response(status=status, headers=dict(headers),
app_iter=app_iter)
# sanity, swob drains small HEAD responses
output_iter = resp(req.environ, lambda *_: None)
with utils.closing_if_possible(output_iter):
# regardless what the app returns swob's HEAD response is empty
body = b''.join(output_iter)
self.assertEqual(body, b'')
self.assertEqual(tracking, {
'closed': 1,
'read': 1,
})
def test_swob_closes_large_HEAD_resp_iter(self):
tracking = {
'closed': 0,
'read': 0,
}
def mark_closed(*args):
tracking['closed'] += 1
def mark_read(*args):
tracking['read'] += 1
def test_app(environ, start_response):
start_response('200 OK', [])
body = [b'hello', b'world']
return LeakTrackingIter(body, mark_closed, mark_read, None)
req = swob.Request.blank('/', method='HEAD')
status, headers, app_iter = req.call_application(test_app)
resp = swob.Response(status=status, headers=dict(headers),
app_iter=app_iter)
# N.B. if we call next a third time (i.e. len(helloworld) < read_limit)
# then leak tracker will notice StopIteration and count it drained.
with mock.patch.object(utils, 'DEFAULT_DRAIN_LIMIT', 10):
output_iter = resp(req.environ, lambda *_: None)
with utils.closing_if_possible(output_iter):
# regardless what the app returns swob's HEAD response is empty
body = b''.join(output_iter)
self.assertEqual(body, b'')
self.assertEqual(tracking, {
'closed': 1,
'read': 0,
})
def test_call_preserves_closeability(self):
def test_app(environ, start_response):
start_response('200 OK', [])

View File

@ -545,13 +545,108 @@ class TestUtils(unittest.TestCase):
yield 'y'
drained[0] = True
utils.drain_and_close(gen())
g = gen()
utils.drain_and_close(g)
self.assertTrue(drained[0])
self.assertIsNone(g.gi_frame)
utils.drain_and_close(Response(status=200, body=b'Some body'))
drained = [False]
utils.drain_and_close(Response(status=200, app_iter=gen()))
self.assertTrue(drained[0])
def test_drain_and_close_with_limit(self):
def gen():
yield 'a' * 5
yield 'a' * 4
yield 'a' * 3
drained[0] = True
drained = [False]
g = gen()
utils.drain_and_close(g, read_limit=13)
self.assertTrue(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
g = gen()
utils.drain_and_close(g, read_limit=12)
# this would need *one more* call to next
self.assertFalse(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
# not even close to the whole thing
g = gen()
utils.drain_and_close(g, read_limit=3)
self.assertFalse(drained[0])
self.assertIsNone(g.gi_frame)
drained = [False]
# default is to drain; no limit!
g = gen()
utils.drain_and_close(g)
self.assertIsNone(g.gi_frame)
self.assertTrue(drained[0])
def test_friendly_close_small_body(self):
def small_body_iter():
yield 'a small body'
drained[0] = True
drained = [False]
utils.friendly_close(small_body_iter())
self.assertTrue(drained[0])
def test_friendly_close_large_body(self):
def large_body_iter():
for i in range(10):
chunk = chr(97 + i) * 64 * 2 ** 10
yielded_chunks.append(chunk)
yield chunk
drained[0] = True
drained = [False]
yielded_chunks = []
utils.friendly_close(large_body_iter())
self.assertFalse(drained[0])
self.assertEqual(['a' * 65536], yielded_chunks)
def test_friendly_close_exploding_body(self):
class ExplodingBody(object):
def __init__(self):
self.yielded_chunks = []
self.close_calls = []
self._body = self._exploding_iter()
def _exploding_iter(self):
chunk = 'a' * 63 * 2 ** 10
self.yielded_chunks.append(chunk)
yield chunk
raise Exception('kaboom!')
def __iter__(self):
return self
def __next__(self):
return next(self._body)
next = __next__ # py2
def close(self):
self.close_calls.append(True)
body = ExplodingBody()
with self.assertRaises(Exception) as ctx:
utils.friendly_close(body)
self.assertEqual('kaboom!', str(ctx.exception))
self.assertEqual(['a' * 64512], body.yielded_chunks)
self.assertEqual([True], body.close_calls)
def test_backwards(self):
# Test swift.common.utils.backward

View File

@ -1623,10 +1623,12 @@ class TestWSGIContext(unittest.TestCase):
r = Request.blank('/')
it = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
r = Request.blank('/')
it = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '404 Not Found')
self.assertEqual(wc._get_status_int(), 404)
self.assertEqual(b''.join(it), b'Ok\n')
def test_app_iter_is_closable(self):
@ -1645,6 +1647,7 @@ class TestWSGIContext(unittest.TestCase):
r = Request.blank('/')
iterable = wc._app_call(r.environ)
self.assertEqual(wc._response_status, '200 OK')
self.assertEqual(wc._get_status_int(), 200)
iterator = iter(iterable)
self.assertEqual(b'aaaaa', next(iterator))
@ -1665,6 +1668,7 @@ class TestWSGIContext(unittest.TestCase):
it = wc._app_call(r.environ)
wc.update_content_length(35)
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
self.assertEqual(wc._response_headers, [('Content-Length', '35')])
@ -1680,6 +1684,7 @@ class TestWSGIContext(unittest.TestCase):
it = wc._app_call(r.environ)
wc._response_headers.append(('X-Trans-Id', 'txn'))
self.assertEqual(wc._response_status, '200 Ok')
self.assertEqual(wc._get_status_int(), 200)
self.assertEqual(b''.join(it), b'Ok\n')
self.assertEqual(wc._response_headers, [
('Content-Length', '3'),