diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index fa2add6f74..76ef469d94 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -742,7 +742,7 @@ use = egg:swift#bulk [filter:slo] use = egg:swift#slo # max_manifest_segments = 1000 -# max_manifest_size = 2097152 +# max_manifest_size = 8388608 # # Rate limiting applies only to segments smaller than this size (bytes). # rate_limit_under_size = 1048576 diff --git a/swift/common/middleware/dlo.py b/swift/common/middleware/dlo.py index 91728a5989..4c4ce00bff 100644 --- a/swift/common/middleware/dlo.py +++ b/swift/common/middleware/dlo.py @@ -188,16 +188,20 @@ class GetContext(WSGIContext): if isinstance(seg_name, six.text_type): seg_name = seg_name.encode("utf-8") - # (obj path, etag, size, first byte, last byte) - yield ("/" + "/".join((version, account, container, - seg_name)), - # We deliberately omit the etag and size here; - # SegmentedIterable will check size and etag if - # specified, but we don't want it to. DLOs only care - # that the objects' names match the specified prefix. - None, None, - (None if first_byte <= 0 else first_byte), - (None if last_byte >= seg_length - 1 else last_byte)) + # We deliberately omit the etag and size here; + # SegmentedIterable will check size and etag if + # specified, but we don't want it to. DLOs only care + # that the objects' names match the specified prefix. + # SegmentedIterable will instead check that the data read + # from each segment matches the response headers. + _path = "/".join(["", version, account, container, seg_name]) + _first = None if first_byte <= 0 else first_byte + _last = None if last_byte >= seg_length - 1 else last_byte + yield { + 'path': _path, + 'first_byte': _first, + 'last_byte': _last + } first_byte = max(first_byte - seg_length, -1) last_byte = max(last_byte - seg_length, -1) diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 2421cdf105..b4ada96526 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -1,4 +1,4 @@ -# Copyright (c) 2013 OpenStack Foundation +# Copyright (c) 2018 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,7 +32,7 @@ uploaded. The request must be a ``PUT`` with the query parameter:: ?multipart-manifest=put The body of this request will be an ordered list of segment descriptions in -JSON format. The data to be supplied for each segment is: +JSON format. The data to be supplied for each segment is either: =========== ======================================================== Key Description @@ -44,27 +44,47 @@ etag (optional) the ETag given back when the segment object size_bytes (optional) the size of the complete segment object in bytes range (optional) the (inclusive) range within the object to - use as a segment. If omitted, the entire object is used. + use as a segment. If omitted, the entire object is used =========== ======================================================== +Or: + +=========== ======================================================== +Key Description +=========== ======================================================== +data base64-encoded data to be returned +=========== ======================================================== + +.. note:: + At least one object-backed segment must be included. If you'd like + to create a manifest consisting purely of data segments, consider + uploading a normal object instead. + The format of the list will be:: [{"path": "/cont/object", "etag": "etagoftheobjectsegment", "size_bytes": 10485760, "range": "1048576-2097151"}, + {"data": base64.b64encode("interstitial data")}, + {"path": "/cont/another-object", ...}, ...] -The number of object segments is limited to a configurable amount, default -1000. Each segment must be at least 1 byte. On upload, the middleware will -head every segment passed in to verify: +The number of object-backed segments is limited to ``max_manifest_segments`` +(configurable in proxy-server.conf, default 1000). Each segment must be at +least 1 byte. On upload, the middleware will head every object-backed segment +passed in to verify: 1. the segment exists (i.e. the ``HEAD`` was successful); 2. the segment meets minimum size requirements; 3. if the user provided a non-null ``etag``, the etag matches; 4. if the user provided a non-null ``size_bytes``, the size_bytes matches; and 5. if the user provided a ``range``, it is a singular, syntactically correct - range that is satisfiable given the size of the object. + range that is satisfiable given the size of the object referenced. + +For inlined data segments, the middleware verifies each is valid, non-empty +base64-encoded binary data. Note that data segments *do not* count against +``max_manifest_segments``. Note that the ``etag`` and ``size_bytes`` keys are optional; if omitted, the verification is not performed. If any of the objects fail to verify (not @@ -148,13 +168,16 @@ above manifest would be:: echo -n 'etagoftheobjectsegmentone:1-2;etagoftheobjectsegmenttwo:3-4;' \ | md5sum +For the purposes of Etag computations, inlined data segments are considered to +have an etag of the md5 of the raw data (i.e., *not* base64-encoded). + ------------------- Range Specification ------------------- Users now have the ability to specify ranges for SLO segments. -Users can now include an optional ``range`` field in segment descriptions +Users can include an optional ``range`` field in segment descriptions to specify which bytes from the underlying object should be used for the segment data. Only one range may be specified per segment. @@ -177,11 +200,28 @@ finally bytes 2095104 through 2097152 (i.e., the last 2048 bytes) of .. note:: - The minimum sized range is 1 byte. This is the same as the minimum segment size. +------------------------- +Inline Data Specification +------------------------- + +When uploading a manifest, users can include 'data' segments that should +be included along with objects. The data in these segments must be +base64-encoded binary data and will be included in the etag of the +resulting large object exactly as if that data had been uploaded and +referenced as separate objects. + +.. note:: + + This feature is primarily aimed at reducing the need for storing + many tiny objects, and as such any supplied data must fit within + the maximum manifest size (default is 8MiB). This maximum size + can be configured via ``max_manifest_size`` in proxy-server.conf. + + ------------------------- Retrieving a Large Object ------------------------- @@ -272,6 +312,7 @@ the manifest and the segments it's referring to) in the container and account metadata which can be used for stats and billing purposes. """ +import base64 from collections import defaultdict from datetime import datetime import json @@ -289,7 +330,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ from swift.common.utils import get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ register_swift_info, RateLimitedIterator, quote, close_if_possible, \ - closing_if_possible, LRUCache, StreamingPile + closing_if_possible, LRUCache, StreamingPile, strict_b64decode from swift.common.request_helpers import SegmentedIterable, \ get_sys_meta_prefix, update_etag_is_at_header from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS @@ -299,15 +340,17 @@ from swift.common.middleware.bulk import get_response_body, \ ACCEPTABLE_FORMATS, Bulk -DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 * 1024 # 1 MiB +DEFAULT_RATE_LIMIT_UNDER_SIZE = 1024 ** 2 # 1 MiB DEFAULT_MAX_MANIFEST_SEGMENTS = 1000 -DEFAULT_MAX_MANIFEST_SIZE = 1024 * 1024 * 2 # 2 MiB +DEFAULT_MAX_MANIFEST_SIZE = 8 * (1024 ** 2) # 8 MiB DEFAULT_YIELD_FREQUENCY = 10 -REQUIRED_SLO_KEYS = set(['path']) -OPTIONAL_SLO_KEYS = set(['range', 'etag', 'size_bytes']) -ALLOWED_SLO_KEYS = REQUIRED_SLO_KEYS | OPTIONAL_SLO_KEYS +SLO_KEYS = { + # required: optional + 'data': set(), + 'path': {'range', 'etag', 'size_bytes'}, +} SYSMETA_SLO_ETAG = get_sys_meta_prefix('object') + 'slo-etag' SYSMETA_SLO_SIZE = get_sys_meta_prefix('object') + 'slo-size' @@ -318,8 +361,8 @@ def parse_and_validate_input(req_body, req_path): Given a request body, parses it and returns a list of dictionaries. The output structure is nearly the same as the input structure, but it - is not an exact copy. Given a valid input dictionary ``d_in``, its - corresponding output dictionary ``d_out`` will be as follows: + is not an exact copy. Given a valid object-backed input dictionary + ``d_in``, its corresponding output dictionary ``d_out`` will be as follows: * d_out['etag'] == d_in['etag'] @@ -333,8 +376,10 @@ def parse_and_validate_input(req_body, req_path): corresponding swob.Range object. If d_in does not have a key 'range', neither will d_out. - :raises HTTPException: on parse errors or semantic errors (e.g. bogus - JSON structure, syntactically invalid ranges) + Inlined data dictionaries will have any extraneous padding stripped. + + :raises: HTTPException on parse errors or semantic errors (e.g. bogus + JSON structure, syntactically invalid ranges) :returns: a list of dictionaries on success """ @@ -356,15 +401,19 @@ def parse_and_validate_input(req_body, req_path): errors.append("Index %d: not a JSON object" % seg_index) continue - missing_keys = [k for k in REQUIRED_SLO_KEYS if k not in seg_dict] - if missing_keys: + for required in SLO_KEYS: + if required in seg_dict: + segment_type = required + break + else: errors.append( - "Index %d: missing keys %s" + "Index %d: expected keys to include one of %s" % (seg_index, - ", ".join('"%s"' % (mk,) for mk in sorted(missing_keys)))) + " or ".join(repr(required) for required in SLO_KEYS))) continue - extraneous_keys = [k for k in seg_dict if k not in ALLOWED_SLO_KEYS] + allowed_keys = SLO_KEYS[segment_type].union([segment_type]) + extraneous_keys = [k for k in seg_dict if k not in allowed_keys] if extraneous_keys: errors.append( "Index %d: extraneous keys %s" @@ -373,61 +422,84 @@ def parse_and_validate_input(req_body, req_path): for ek in sorted(extraneous_keys)))) continue - if not isinstance(seg_dict['path'], six.string_types): - errors.append("Index %d: \"path\" must be a string" % seg_index) - continue - if not (seg_dict.get('etag') is None or - isinstance(seg_dict['etag'], six.string_types)): - errors.append('Index %d: "etag" must be a string or null ' - '(if provided)' % seg_index) - continue - - if '/' not in seg_dict['path'].strip('/'): - errors.append( - "Index %d: path does not refer to an object. Path must be of " - "the form /container/object." % seg_index) - continue - - seg_size = seg_dict.get('size_bytes') - if seg_size is not None: - try: - seg_size = int(seg_size) - seg_dict['size_bytes'] = seg_size - except (TypeError, ValueError): - errors.append("Index %d: invalid size_bytes" % seg_index) + if segment_type == 'path': + if not isinstance(seg_dict['path'], six.string_types): + errors.append("Index %d: \"path\" must be a string" % + seg_index) continue - if seg_size < 1 and seg_index != (len(parsed_data) - 1): + if not (seg_dict.get('etag') is None or + isinstance(seg_dict['etag'], six.string_types)): + errors.append('Index %d: "etag" must be a string or null ' + '(if provided)' % seg_index) + continue + + if '/' not in seg_dict['path'].strip('/'): + errors.append( + "Index %d: path does not refer to an object. Path must " + "be of the form /container/object." % seg_index) + continue + + seg_size = seg_dict.get('size_bytes') + if seg_size is not None: + try: + seg_size = int(seg_size) + seg_dict['size_bytes'] = seg_size + except (TypeError, ValueError): + errors.append("Index %d: invalid size_bytes" % seg_index) + continue + if seg_size < 1 and seg_index != (len(parsed_data) - 1): + errors.append("Index %d: too small; each segment must be " + "at least 1 byte." + % (seg_index,)) + continue + + obj_path = '/'.join(['', vrs, account, + seg_dict['path'].lstrip('/')]) + if req_path == quote(obj_path): + errors.append( + "Index %d: manifest must not include itself as a segment" + % seg_index) + continue + + if seg_dict.get('range'): + try: + seg_dict['range'] = Range('bytes=%s' % seg_dict['range']) + except ValueError: + errors.append("Index %d: invalid range" % seg_index) + continue + + if len(seg_dict['range'].ranges) > 1: + errors.append("Index %d: multiple ranges " + "(only one allowed)" % seg_index) + continue + + # If the user *told* us the object's size, we can check range + # satisfiability right now. If they lied about the size, we'll + # fail that validation later. + if (seg_size is not None and 1 != len( + seg_dict['range'].ranges_for_length(seg_size))): + errors.append("Index %d: unsatisfiable range" % seg_index) + continue + + elif segment_type == 'data': + # Validate that the supplied data is non-empty and base64-encoded + try: + data = strict_b64decode(seg_dict['data']) + except ValueError: + errors.append( + "Index %d: data must be valid base64" % seg_index) + continue + if len(data) < 1: errors.append("Index %d: too small; each segment must be " "at least 1 byte." % (seg_index,)) continue + # re-encode to normalize padding + seg_dict['data'] = base64.b64encode(data) - obj_path = '/'.join(['', vrs, account, seg_dict['path'].lstrip('/')]) - if req_path == quote(obj_path): - errors.append( - "Index %d: manifest must not include itself as a segment" - % seg_index) - continue - - if seg_dict.get('range'): - try: - seg_dict['range'] = Range('bytes=%s' % seg_dict['range']) - except ValueError: - errors.append("Index %d: invalid range" % seg_index) - continue - - if len(seg_dict['range'].ranges) > 1: - errors.append("Index %d: multiple ranges (only one allowed)" - % seg_index) - continue - - # If the user *told* us the object's size, we can check range - # satisfiability right now. If they lied about the size, we'll - # fail that validation later. - if (seg_size is not None and - len(seg_dict['range'].ranges_for_length(seg_size)) != 1): - errors.append("Index %d: unsatisfiable range" % seg_index) - continue + if parsed_data and all('data' in d for d in parsed_data): + errors.append("Inline data segments require at least one " + "object-backed segment.") if errors: error_message = "".join(e + "\n" for e in errors) @@ -472,11 +544,20 @@ class SloGetContext(WSGIContext): 'while fetching %s, JSON-decoding of submanifest %s ' 'failed with %s' % (req.path, sub_req.path, err)) + def _segment_path(self, version, account, seg_dict): + return "/{ver}/{acc}/{conobj}".format( + ver=version, acc=account, + conobj=seg_dict['name'].lstrip('/') + ) + def _segment_length(self, seg_dict): """ Returns the number of bytes that will be fetched from the specified segment on a plain GET request for this SLO manifest. """ + if 'raw_data' in seg_dict: + return len(seg_dict['raw_data']) + seg_range = seg_dict.get('range') if seg_range is not None: # The range is of the form N-M, where N and M are both positive @@ -484,7 +565,7 @@ class SloGetContext(WSGIContext): # only thing that creates the SLO manifests stored in the # cluster. range_start, range_end = [int(x) for x in seg_range.split('-')] - return range_end - range_start + 1 + return (range_end - range_start) + 1 else: return int(seg_dict['bytes']) @@ -533,6 +614,9 @@ class SloGetContext(WSGIContext): recursion_depth=1): last_sub_path = None for seg_dict in segments: + if 'data' in seg_dict: + seg_dict['raw_data'] = strict_b64decode(seg_dict.pop('data')) + seg_length = self._segment_length(seg_dict) if first_byte >= seg_length: # don't need any bytes from this segment @@ -544,16 +628,25 @@ class SloGetContext(WSGIContext): # no bytes are needed from this or any future segment return + if 'raw_data' in seg_dict: + yield dict(seg_dict, + first_byte=max(0, first_byte), + last_byte=min(seg_length - 1, last_byte)) + first_byte -= seg_length + last_byte -= seg_length + continue + seg_range = seg_dict.get('range') if seg_range is None: range_start, range_end = 0, seg_length - 1 else: - # We already validated and supplied concrete values - # for the range on upload + # This simple parsing of the range is valid because we already + # validated and supplied concrete values for the range + # during SLO manifest creation range_start, range_end = map(int, seg_range.split('-')) if config_true_value(seg_dict.get('sub_slo')): - # do this check here so that we can avoid fetching this last + # Do this check here so that we can avoid fetching this last # manifest before raising the exception if recursion_depth >= self.max_slo_recursion_depth: raise ListingIterError( @@ -568,7 +661,7 @@ class SloGetContext(WSGIContext): last_sub_path = sub_path # Use the existing machinery to slice into the sub-SLO. - for sub_seg_dict, sb, eb in self._byterange_listing_iterator( + for sub_seg_dict in self._byterange_listing_iterator( req, version, account, sub_segments, # This adjusts first_byte and last_byte to be # relative to the sub-SLO. @@ -577,13 +670,13 @@ class SloGetContext(WSGIContext): cached_fetch_sub_slo_segments, recursion_depth=recursion_depth + 1): - yield sub_seg_dict, sb, eb + yield sub_seg_dict else: if isinstance(seg_dict['name'], six.text_type): seg_dict['name'] = seg_dict['name'].encode("utf-8") - yield (seg_dict, - max(0, first_byte) + range_start, - min(range_end, range_start + last_byte)) + yield dict(seg_dict, + first_byte=max(0, first_byte) + range_start, + last_byte=min(range_end, range_start + last_byte)) first_byte -= seg_length last_byte -= seg_length @@ -741,6 +834,8 @@ class SloGetContext(WSGIContext): segments = self._get_manifest_read(resp_iter) for seg_dict in segments: + if 'data' in seg_dict: + continue seg_dict.pop('content_type', None) seg_dict.pop('last_modified', None) seg_dict.pop('sub_slo', None) @@ -774,7 +869,6 @@ class SloGetContext(WSGIContext): def get_or_head_response(self, req, resp_headers, resp_iter): segments = self._get_manifest_read(resp_iter) - slo_etag = None content_length = None response_headers = [] @@ -789,21 +883,38 @@ class SloGetContext(WSGIContext): elif lheader not in ('etag', 'content-length'): response_headers.append((header, value)) - if slo_etag is None or content_length is None: - etag = md5() - content_length = 0 - for seg_dict in segments: - if seg_dict.get('range'): - etag.update('%s:%s;' % (seg_dict['hash'], - seg_dict['range'])) - else: - etag.update(seg_dict['hash']) + # Prep to calculate content_length & etag if necessary + if slo_etag is None: + calculated_etag = md5() + if content_length is None: + calculated_content_length = 0 + for seg_dict in segments: + # Decode any inlined data; it's important that we do this *before* + # calculating the segment length and etag + if 'data' in seg_dict: + seg_dict['raw_data'] = base64.b64decode(seg_dict.pop('data')) + + if slo_etag is None: + if 'raw_data' in seg_dict: + calculated_etag.update( + md5(seg_dict['raw_data']).hexdigest()) + elif seg_dict.get('range'): + calculated_etag.update( + '%s:%s;' % (seg_dict['hash'], seg_dict['range'])) + else: + calculated_etag.update(seg_dict['hash']) + + if content_length is None: if config_true_value(seg_dict.get('sub_slo')): override_bytes_from_content_type( seg_dict, logger=self.slo.logger) - content_length += self._segment_length(seg_dict) - slo_etag = etag.hexdigest() + calculated_content_length += self._segment_length(seg_dict) + + if slo_etag is None: + slo_etag = calculated_etag.hexdigest() + if content_length is None: + content_length = calculated_content_length response_headers.append(('Content-Length', str(content_length))) response_headers.append(('Etag', '"%s"' % slo_etag)) @@ -833,9 +944,13 @@ class SloGetContext(WSGIContext): plain_listing_iter = self._segment_listing_iterator( req, ver, account, segments, byteranges) - def is_small_segment((seg_dict, start_byte, end_byte)): - start = 0 if start_byte is None else start_byte - end = int(seg_dict['bytes']) - 1 if end_byte is None else end_byte + def ratelimit_predicate(seg_dict): + if 'raw_data' in seg_dict: + return False # it's already in memory anyway + start = seg_dict.get('start_byte') or 0 + end = seg_dict.get('end_byte') + if end is None: + end = int(seg_dict['bytes']) - 1 is_small = (end - start + 1) < self.slo.rate_limit_under_size return is_small @@ -843,17 +958,14 @@ class SloGetContext(WSGIContext): plain_listing_iter, self.slo.rate_limit_segments_per_sec, limit_after=self.slo.rate_limit_after_segment, - ratelimit_if=is_small_segment) + ratelimit_if=ratelimit_predicate) - # self._segment_listing_iterator gives us 3-tuples of (segment dict, - # start byte, end byte), but SegmentedIterable wants (obj path, etag, - # size, start byte, end byte), so we clean that up here + # data segments are already in the correct format, but object-backed + # segments need a path key added segment_listing_iter = ( - ("/{ver}/{acc}/{conobj}".format( - ver=ver, acc=account, conobj=seg_dict['name'].lstrip('/')), - seg_dict['hash'], int(seg_dict['bytes']), - start_byte, end_byte) - for seg_dict, start_byte, end_byte in ratelimited_listing_iter) + seg_dict if 'raw_data' in seg_dict else + dict(seg_dict, path=self._segment_path(ver, account, seg_dict)) + for seg_dict in ratelimited_listing_iter) segmented_iter = SegmentedIterable( req, self.slo.app, segment_listing_iter, @@ -964,9 +1076,10 @@ class StaticLargeObject(object): req.path) problem_segments = [] - if len(parsed_data) > self.max_manifest_segments: + object_segments = [seg for seg in parsed_data if 'path' in seg] + if len(object_segments) > self.max_manifest_segments: raise HTTPRequestEntityTooLarge( - 'Number of segments must be <= %d' % + 'Number of object-backed segments must be <= %d' % self.max_manifest_segments) try: out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS) @@ -974,10 +1087,15 @@ class StaticLargeObject(object): out_content_type = 'text/plain' # Ignore invalid header if not out_content_type: out_content_type = 'text/plain' - data_for_storage = [] + data_for_storage = [None] * len(parsed_data) + total_size = 0 path2indices = defaultdict(list) for index, seg_dict in enumerate(parsed_data): - path2indices[seg_dict['path']].append(index) + if 'data' in seg_dict: + data_for_storage[index] = seg_dict + total_size += len(base64.b64decode(seg_dict['data'])) + else: + path2indices[seg_dict['path']].append(index) def do_head(obj_name): obj_path = '/'.join(['', vrs, account, @@ -1023,30 +1141,45 @@ class StaticLargeObject(object): problem_segments.append( [quote(obj_name), 'Too small; each segment must be at least 1 byte.']) - if seg_dict.get('size_bytes') is not None and \ - seg_dict['size_bytes'] != head_seg_resp.content_length: + + _size_bytes = seg_dict.get('size_bytes') + size_mismatch = ( + _size_bytes is not None and + _size_bytes != head_seg_resp.content_length + ) + if size_mismatch: problem_segments.append([quote(obj_name), 'Size Mismatch']) - if seg_dict.get('etag') is not None and \ - seg_dict['etag'] != head_seg_resp.etag: + + _etag = seg_dict.get('etag') + etag_mismatch = ( + _etag is not None and + _etag != head_seg_resp.etag + ) + if etag_mismatch: problem_segments.append([quote(obj_name), 'Etag Mismatch']) + if head_seg_resp.last_modified: last_modified = head_seg_resp.last_modified else: # shouldn't happen last_modified = datetime.now() - last_modified_formatted = \ - last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f') - seg_data = {'name': '/' + seg_dict['path'].lstrip('/'), - 'bytes': head_seg_resp.content_length, - 'hash': head_seg_resp.etag, - 'content_type': head_seg_resp.content_type, - 'last_modified': last_modified_formatted} + last_modified_formatted = last_modified.strftime( + '%Y-%m-%dT%H:%M:%S.%f' + ) + seg_data = { + 'name': '/' + seg_dict['path'].lstrip('/'), + 'bytes': head_seg_resp.content_length, + 'hash': head_seg_resp.etag, + 'content_type': head_seg_resp.content_type, + 'last_modified': last_modified_formatted + } if seg_dict.get('range'): seg_data['range'] = seg_dict['range'] if config_true_value( head_seg_resp.headers.get('X-Static-Large-Object')): seg_data['sub_slo'] = True + return segment_length, seg_data heartbeat = config_true_value(req.params.get('heartbeat')) @@ -1059,10 +1192,8 @@ class StaticLargeObject(object): ('Content-Type', out_content_type), ]) separator = '\r\n\r\n' - data_for_storage = [None] * len(parsed_data) - def resp_iter(): - total_size = 0 + def resp_iter(total_size=total_size): # wsgi won't propagate start_response calls until some data has # been yielded so make sure first heartbeat is sent immediately if heartbeat: @@ -1102,7 +1233,10 @@ class StaticLargeObject(object): slo_etag = md5() for seg_data in data_for_storage: - if seg_data.get('range'): + if 'data' in seg_data: + raw_data = base64.b64decode(seg_data['data']) + slo_etag.update(md5(raw_data).hexdigest()) + elif seg_data.get('range'): slo_etag.update('%s:%s;' % (seg_data['hash'], seg_data['range'])) else: @@ -1183,6 +1317,8 @@ class StaticLargeObject(object): raise HTTPBadRequest( 'Too many buffered slo segments to delete.') seg_data = segments.pop(0) + if 'data' in seg_data: + continue if seg_data.get('sub_slo'): try: segments.extend( diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index 1ac348dec5..ec6ceb2c92 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -354,12 +354,25 @@ class SegmentedIterable(object): def _coalesce_requests(self): start_time = time.time() - pending_req = None - pending_etag = None - pending_size = None + pending_req = pending_etag = pending_size = None try: - for seg_path, seg_etag, seg_size, first_byte, last_byte \ - in self.listing_iter: + for seg_dict in self.listing_iter: + if 'raw_data' in seg_dict: + if pending_req: + yield pending_req, pending_etag, pending_size + + to_yield = seg_dict['raw_data'][ + seg_dict['first_byte']:seg_dict['last_byte'] + 1] + yield to_yield, None, len(seg_dict['raw_data']) + pending_req = pending_etag = pending_size = None + continue + + seg_path, seg_etag, seg_size, first_byte, last_byte = ( + seg_dict['path'], seg_dict.get('hash'), + seg_dict.get('bytes'), + seg_dict['first_byte'], seg_dict['last_byte']) + if seg_size is not None: + seg_size = int(seg_size) first_byte = first_byte or 0 go_to_end = last_byte is None or ( seg_size is not None and last_byte == seg_size - 1) @@ -441,7 +454,18 @@ class SegmentedIterable(object): bytes_left = self.response_body_length try: - for seg_req, seg_etag, seg_size in self._coalesce_requests(): + for data_or_req, seg_etag, seg_size in self._coalesce_requests(): + if isinstance(data_or_req, bytes): + chunk = data_or_req # ugly, awful overloading + if bytes_left is None: + yield chunk + elif bytes_left >= len(chunk): + yield chunk + bytes_left -= len(chunk) + else: + yield chunk[:bytes_left] + continue + seg_req = data_or_req seg_resp = seg_req.get_response(self.app) if not is_success(seg_resp.status_int): close_if_possible(seg_resp.app_iter) diff --git a/test/functional/test_slo.py b/test/functional/test_slo.py index d12a2185c5..78d478e3c5 100644 --- a/test/functional/test_slo.py +++ b/test/functional/test_slo.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import email.parser import hashlib import itertools @@ -205,6 +206,28 @@ class TestSloEnv(BaseEnv): 'size_bytes': None, 'range': '-1048578'}, ]), parms={'multipart-manifest': 'put'}) + file_item = cls.container.file("mixed-object-data-manifest") + file_item.write( + json.dumps([ + {'data': base64.b64encode('APRE' * 8)}, + {'path': seg_info['seg_a']['path']}, + {'data': base64.b64encode('APOS' * 16)}, + {'path': seg_info['seg_b']['path']}, + {'data': base64.b64encode('BPOS' * 32)}, + {'data': base64.b64encode('CPRE' * 64)}, + {'path': seg_info['seg_c']['path']}, + {'data': base64.b64encode('CPOS' * 8)}, + ]), parms={'multipart-manifest': 'put'} + ) + + file_item = cls.container.file("nested-data-manifest") + file_item.write( + json.dumps([ + {'path': '%s/%s' % (cls.container.name, + "mixed-object-data-manifest")} + ]), parms={'multipart-manifest': 'put'} + ) + class TestSlo(Base): env = TestSloEnv @@ -681,6 +704,25 @@ class TestSlo(Base): self.assertEqual('application/octet-stream', actual['content_type']) self.assertEqual(copied.etag, actual['hash']) + # Test copy manifest including data segments + source = self.env.container.file("mixed-object-data-manifest") + source_contents = source.read(parms={'multipart-manifest': 'get'}) + source_json = json.loads(source_contents) + source.copy( + self.env.container.name, + "copied-mixed-object-data-manifest", + parms={'multipart-manifest': 'get'}) + + copied = self.env.container.file("copied-mixed-object-data-manifest") + copied_contents = copied.read(parms={'multipart-manifest': 'get'}) + try: + copied_json = json.loads(copied_contents) + except ValueError: + self.fail("COPY didn't copy the manifest (invalid json on GET)") + self.assertEqual(source_contents, copied_contents) + self.assertEqual(copied_json[0], + {'data': base64.b64encode('APRE' * 8)}) + def test_slo_copy_the_manifest_updating_metadata(self): source = self.env.container.file("manifest-abcde") source.content_type = 'application/octet-stream' @@ -1115,6 +1157,56 @@ class TestSlo(Base): self.assertEqual('d', contents[-2]) self.assertEqual('e', contents[-1]) + def test_slo_data_segments(self): + # len('APRE' * 8) == 32 + # len('APOS' * 16) == 64 + # len('BPOS' * 32) == 128 + # len('CPRE' * 64) == 256 + # len(a_pre + seg_a + post_a) == 32 + 1024 ** 2 + 64 + # len(seg_b + post_b) == 1024 ** 2 + 128 + # len(c_pre + seg_c) == 256 + 1024 ** 2 + # len(total) == 3146208 + + for file_name in ("mixed-object-data-manifest", + "nested-data-manifest"): + file_item = self.env.container.file(file_name) + file_contents = file_item.read(size=3 * 1024 ** 2 + 456, + offset=28) + grouped_file_contents = [ + (char, sum(1 for _char in grp)) + for char, grp in itertools.groupby(file_contents)] + self.assertEqual([ + ('A', 1), + ('P', 1), + ('R', 1), + ('E', 1), + ('a', 1024 * 1024), + ] + [ + ('A', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ] * 16 + [ + ('b', 1024 * 1024), + ] + [ + ('B', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ] * 32 + [ + ('C', 1), + ('P', 1), + ('R', 1), + ('E', 1), + ] * 64 + [ + ('c', 1024 * 1024), + ] + [ + ('C', 1), + ('P', 1), + ('O', 1), + ('S', 1), + ], grouped_file_contents) + class TestSloUTF8(Base2, TestSlo): pass diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 7dcf43d95f..e960796e94 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -16,12 +16,15 @@ from six.moves import range +import base64 import hashlib import json import time import unittest + from mock import patch from StringIO import StringIO + from swift.common import swob, utils from swift.common.header_key_dict import HeaderKeyDict from swift.common.middleware import slo @@ -707,6 +710,29 @@ class TestSloPutManifest(SloTestCase): status, headers, body = self.call_slo(req) self.assertEqual(status, '201 Created') + def test_handle_multipart_put_invalid_data(self): + def do_test(bad_data): + test_json_data = json.dumps([{'path': '/cont/object', + 'etag': 'etagoftheobjectsegment', + 'size_bytes': 100}, + {'data': bad_data}]) + req = Request.blank('/v1/a/c/o', body=test_json_data) + with self.assertRaises(HTTPException) as catcher: + self.slo.handle_multipart_put(req, fake_start_response) + self.assertEqual(catcher.exception.status_int, 400) + + do_test('invalid') # insufficient padding + do_test(12345) + do_test(0) + do_test(True) + do_test(False) + do_test(None) + do_test({}) + do_test([]) + # Empties are no good, either + do_test('') + do_test('====') + def test_handle_multipart_put_success_unicode(self): test_json_data = json.dumps([{'path': u'/cont/object\u2661', 'etag': 'etagoftheobjectsegment', @@ -2367,8 +2393,10 @@ class TestSloGetManifest(SloTestCase): 'bytes=0-3,8-11']) # we set swift.source for everything but the first request self.assertIsNone(self.app.swift_sources[0]) - self.assertEqual(self.app.swift_sources[1:], - ['SLO'] * (len(self.app.swift_sources) - 1)) + self.assertEqual( + self.app.swift_sources[1:], + ['SLO'] * (len(self.app.swift_sources) - 1) + ) self.assertEqual(md5hex(''.join([ md5hex('a' * 5), ':0-3;', md5hex('a' * 5), ':1-4;', @@ -2654,18 +2682,21 @@ class TestSloGetManifest(SloTestCase): 'Etag': 'man%d' % i}, manifest_json) + submanifest_bytes = 6 for i in range(19, 0, -1): manifest_data = [ {'name': '/gettest/obj%d' % i, 'hash': md5hex('body%02d' % i), 'bytes': '6', 'content_type': 'text/plain'}, + {'data': base64.b64encode('-' * 3)}, {'name': '/gettest/man%d' % (i + 1), 'hash': 'man%d' % (i + 1), 'sub_slo': True, - 'bytes': len(manifest_json), + 'bytes': submanifest_bytes, 'content_type': 'application/json'}] + submanifest_bytes += 9 manifest_json = json.dumps(manifest_data) self.app.register( 'GET', '/v1/AUTH_test/gettest/man%d' % i, @@ -2683,8 +2714,10 @@ class TestSloGetManifest(SloTestCase): # we don't know at header-sending time that things are going to go # wrong, so we end up with a 200 and a truncated body self.assertEqual(status, '200 OK') - self.assertEqual(body, ('body01body02body03body04body05' + - 'body06body07body08body09body10')) + self.assertEqual(headers['Content-Length'], str(9 * 19 + 6)) + self.assertEqual(body, ( + 'body01---body02---body03---body04---body05---' + + 'body06---body07---body08---body09---body10---')) # but the error shows up in logs self.assertEqual(self.slo.logger.get_lines_for_level('error'), [ "While processing manifest '/v1/AUTH_test/gettest/man1', " @@ -3057,6 +3090,275 @@ class TestSloGetManifest(SloTestCase): 'gettest/not_exists_obj' ]) + def test_leading_data_segment(self): + slo_etag = md5hex( + md5hex('preamble') + + md5hex('a' * 5) + ) + preamble = base64.b64encode('preamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-preamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'data': preamble + }, { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }]) + ) + + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-preamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'preambleaaaaa') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '13'), headers) + + def test_trailing_data_segment(self): + slo_etag = md5hex( + md5hex('a' * 5) + + md5hex('postamble') + ) + postamble = base64.b64encode('postamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-postamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, { + 'data': postamble + }]) + ) + + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-postamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'aaaaapostamble') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '14'), headers) + + def test_data_segment_sandwich(self): + slo_etag = md5hex( + md5hex('preamble') + + md5hex('a' * 5) + + md5hex('postamble') + ) + preamble = base64.b64encode('preamble') + postamble = base64.b64encode('postamble') + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-single-prepostamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([{ + 'data': preamble, + }, { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, { + 'data': postamble + }]) + ) + + # Test the whole SLO + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'preambleaaaaapostamble') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '22'), headers) + + # Test complete preamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=0-7'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'preamble') + + # Test range within preamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-5'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'reamb') + + # Test complete postamble only + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=13-21'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'postamble') + + # Test partial pre and postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=4-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'mbleaaaaapost') + + # Test partial preamble and first byte of data + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=1-8'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'reamblea') + + # Test last byte of segment data and partial postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-single-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'apost') + + def test_bunches_of_data_segments(self): + slo_etag = md5hex( + md5hex('ABCDEF') + + md5hex('a' * 5) + + md5hex('123456') + + md5hex('GHIJKL') + + md5hex('b' * 10) + + md5hex('7890@#') + ) + self.app.register( + 'GET', '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + swob.HTTPOk, + { + 'Content-Type': 'application/json', + 'X-Static-Large-Object': 'true' + }, + json.dumps([ + { + 'data': base64.b64encode('ABCDEF'), + }, + { + 'name': '/gettest/a_5', + 'hash': md5hex('a' * 5), + 'content_type': 'text/plain', + 'bytes': '5', + }, + { + 'data': base64.b64encode('123456') + }, + { + 'data': base64.b64encode('GHIJKL'), + }, + { + 'name': '/gettest/b_10', + 'hash': md5hex('b' * 10), + 'content_type': 'text/plain', + 'bytes': '10', + }, + { + 'data': base64.b64encode('7890@#') + } + ]) + ) + + # Test the whole SLO + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('200 OK', status) + self.assertEqual(body, 'ABCDEFaaaaa123456GHIJKLbbbbbbbbbb7890@#') + self.assertIn(('Etag', '"%s"' % slo_etag), headers) + self.assertIn(('Content-Length', '39'), headers) + + # Test last byte first pre-amble to first byte of second postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=5-33'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'Faaaaa123456GHIJKLbbbbbbbbbb7') + + # Test only second complete preamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=17-22'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, 'GHIJKL') + + # Test only first complete postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=11-16'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '123456') + + # Test only range within first postamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-15'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '2345') + + # Test only range within first postamble and second preamble + req = Request.blank( + '/v1/AUTH_test/gettest/manifest-multi-prepostamble', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Range': 'bytes=12-18'}) + status, headers, body = self.call_slo(req) + + self.assertEqual('206 Partial Content', status) + self.assertEqual(body, '23456GH') + class TestSloConditionalGetOldManifest(SloTestCase): slo_data = [ @@ -3324,7 +3626,7 @@ class TestSwiftInfo(unittest.TestCase): self.assertEqual(swift_info['slo'].get('max_manifest_size'), mware.max_manifest_size) self.assertEqual(1000, mware.max_manifest_segments) - self.assertEqual(2097152, mware.max_manifest_size) + self.assertEqual(8388608, mware.max_manifest_size) self.assertEqual(1048576, mware.rate_limit_under_size) self.assertEqual(10, mware.rate_limit_after_segment) self.assertEqual(1, mware.rate_limit_segments_per_sec)