Static Large Object Support

DocImpact

Change-Id: I7edaa5e44208ab451f7f7566b64bb571b8eea1f9
This commit is contained in:
David Goetz 2013-02-13 12:31:55 -08:00
parent 569bd1e4f6
commit 5d73da158b
19 changed files with 1340 additions and 137 deletions

View File

@ -194,3 +194,11 @@ Container Quotas
:members:
:show-inheritance:
.. _slo-doc:
Static Large Objects
====================
.. automodule:: swift.common.middleware.slo
:members:
:show-inheritance:

View File

@ -13,9 +13,13 @@ special manifest file is created that, when downloaded, sends all the segments
concatenated as a single object. This also offers much greater upload speed
with the possibility of parallel uploads of the segments.
-------------------------------------
Using ``swift`` for Segmented Objects
-------------------------------------
---------------------
Dynamic Large Objects
---------------------
---------------
Using ``swift``
---------------
The quickest way to try out this feature is use the ``swift`` Swift Tool
included with the `python-swiftclient`_ library. You can use the ``-S``
@ -96,6 +100,19 @@ Here's an example using ``curl`` with tiny 1-byte segments::
curl -H 'X-Auth-Token: <token>' \
http://<storage_url>/container/myobject
--------------------
Static Large Objects
--------------------
----------
Direct API
----------
SLO support centers around the user generated manifest file. After the user
has uploaded the segments into their account a manifest file needs to be
built and uploaded. All object segments must be above 1 MB (by default) in
size. Please see the SLO docs for :ref:`slo-doc` further details.
----------------
Additional Notes
----------------
@ -117,11 +134,11 @@ Additional Notes
* The response's ``ETag`` for a ``GET`` or ``HEAD`` on the manifest file will
be the MD5 sum of the concatenated string of ETags for each of the segments
in the ``<container>/<prefix>`` listing, dynamically. Usually in Swift the
ETag is the MD5 sum of the contents of the object, and that holds true for
each segment independently. But, it's not feasible to generate such an ETag
for the manifest itself, so this method was chosen to at least offer change
detection.
in the manifest (for DLO, from the listing ``<container>/<prefix>``).
Usually in Swift the ETag is the MD5 sum of the contents of the object, and
that holds true for each segment independently. But it's not meaningful to
generate such an ETag for the manifest itself so this method was chosen to
at least offer change detection.
.. note::
@ -134,8 +151,8 @@ Additional Notes
History
-------
Large object support has gone through various iterations before settling on
this implementation.
Dynamic large object support has gone through various iterations before
settling on this implementation.
The primary factor driving the limitation of object size in swift is
maintaining balance among the partitions of the ring. To maintain an even
@ -168,20 +185,32 @@ The current "user manifest" design was chosen in order to provide a
transparent download of large objects to the client and still provide the
uploading client a clean API to support segmented uploads.
Alternative "explicit" user manifest options were discussed which would have
required a pre-defined format for listing the segments to "finalize" the
segmented upload. While this may offer some potential advantages, it was
decided that pushing an added burden onto the client which could potentially
limit adoption should be avoided in favor of a simpler "API" (essentially just
the format of the 'X-Object-Manifest' header).
To meet an many use cases as possible swift supports two types of large
object manifests. Dynamic and static large object manifests both support
the same idea of allowing the user to upload many segments to be later
downloaded as a single file.
During development it was noted that this "implicit" user manifest approach
which is based on the path prefix can be potentially affected by the eventual
consistency window of the container listings, which could theoretically cause
a GET on the manifest object to return an invalid whole object for that short
term. In reality you're unlikely to encounter this scenario unless you're
running very high concurrency uploads against a small testing environment
which isn't running the object-updaters or container-replicators.
Dynamic large objects rely on a container lising to provide the manifest.
This has the advantage of allowing the user to add/removes segments from the
manifest at any time. It has the disadvantage of relying on eventually
consistent container listings. All three copies of the container dbs must
be updated for a complete list to be guaranteed. Also, all segments must
be in a single container, which can limit concurrent upload speed.
Like all of swift, Large Object Support is living feature which will continue
to improve and may change over time.
Static large objects rely on a user provided manifest file. A user can
upload objects into multiple containers and then reference those objects
(segments) in a self generated manifest file. Future GETs to that file will
download the concatenation of the specified segments. This has the advantage of
being able to immediately download the complete object once the manifest has
been successfully PUT. Being able to upload segments into separate containers
also improves concurrent upload speed. It has the disadvantage that the
manifest is finalized once PUT. Any changes to it means it has to be replaced.
Between these two methods the user has great flexibility in how (s)he chooses
to upload and retrieve large objects to swift. Swift does not, however, stop
the user from harming themselves. In both cases the segments are deletable by
the user at any time. If a segment was deleted by mistake, a dynamic large
object, having no way of knowing it was ever there, would happily ignore the
deleted file and the user will get an incomplete file. A static large object
would, when failing to retrieve the object specified in the manifest, drop the
connection and the user would receive partial results.

View File

@ -60,7 +60,7 @@ use = egg:swift#object
# Comma separated list of headers that can be set in metadata on an object.
# This list is in addition to X-Object-Meta-* headers and cannot include
# Content-Type, etag, Content-Length, or deleted
# allowed_headers = Content-Disposition, Content-Encoding, X-Delete-At, X-Object-Manifest
# allowed_headers = Content-Disposition, Content-Encoding, X-Delete-At, X-Object-Manifest, X-Static-Large-Object
# auto_create_account_prefix = .
[filter:healthcheck]

View File

@ -34,7 +34,7 @@
# eventlet_debug = false
[pipeline:main]
pipeline = catch_errors healthcheck proxy-logging cache ratelimit tempauth container-quotas proxy-logging proxy-server
pipeline = catch_errors healthcheck proxy-logging cache slo ratelimit tempauth container-quotas proxy-logging proxy-server
[app:proxy-server]
use = egg:swift#proxy
@ -94,6 +94,10 @@ use = egg:swift#proxy
# If the timing sorting_method is used, the timings will only be valid for
# the number of seconds configured by timing_expiry.
# timing_expiry = 300
# If set to false will treat objects with X-Static-Large-Object header set
# as a regular object on GETs, i.e. will return that object's contents. Should
# be set to false if slo is not used in pipeline.
# allow_static_large_object = true
[filter:tempauth]
use = egg:swift#tempauth
@ -348,3 +352,10 @@ use = egg:swift#bulk
# Note: Put after auth in the pipeline.
[filter:container-quotas]
use = egg:swift#container_quotas
# Note: Put before both ratelimit and auth in the pipeline.
[filter:slo]
use = egg:swift#slo
# max_manifest_segments = 1000
# max_manifest_size = 2097152
# min_segment_size = 1048576

View File

@ -106,6 +106,7 @@ setup(
'filter_factory',
'proxy_logging=swift.common.middleware.proxy_logging:'
'filter_factory',
'slo=swift.common.middleware.slo:filter_factory',
],
},
)

View File

@ -42,6 +42,8 @@ MAX_META_VALUE_LENGTH = constraints_conf_int('max_meta_value_length', 256)
MAX_META_COUNT = constraints_conf_int('max_meta_count', 90)
#: Max overall size of metadata
MAX_META_OVERALL_SIZE = constraints_conf_int('max_meta_overall_size', 4096)
#: Max size of any header
MAX_HEADER_SIZE = constraints_conf_int('max_header_size', 8192)
#: Max object name length
MAX_OBJECT_NAME_LENGTH = constraints_conf_int('max_object_name_length', 1024)
#: Max object list length of a get request for a container
@ -68,12 +70,14 @@ def check_metadata(req, target_type):
:param req: request object
:param target_type: str: one of: object, container, or account: indicates
which type the target storage for the metadata is
:raises HTTPBadRequest: bad metadata
:returns: HTTPBadRequest with bad metadata otherwise None
"""
prefix = 'x-%s-meta-' % target_type.lower()
meta_count = 0
meta_size = 0
for key, value in req.headers.iteritems():
if isinstance(value, basestring) and len(value) > MAX_HEADER_SIZE:
return HTTPBadRequest('Header Line Too Long')
if not key.lower().startswith(prefix):
continue
key = key[len(prefix):]
@ -108,11 +112,11 @@ def check_object_creation(req, object_name):
:param req: HTTP request object
:param object_name: name of object to be created
:raises HTTPRequestEntityTooLarge: the object is too large
:raises HTTPLengthRequered: missing content-length header and not
a chunked request
:raises HTTPBadRequest: missing or bad content-type header, or
bad metadata
:returns HTTPRequestEntityTooLarge: the object is too large
:returns HTTPLengthRequired: missing content-length header and not
a chunked request
:returns HTTPBadRequest: missing or bad content-type header, or
bad metadata
"""
if req.content_length and req.content_length > MAX_FILE_SIZE:
return HTTPRequestEntityTooLarge(body='Your request is too large.',

View File

@ -102,3 +102,7 @@ class ListingIterNotAuthorized(ListingIterError):
def __init__(self, aresp):
self.aresp = aresp
class SloSegmentError(SwiftException):
pass

View File

@ -42,6 +42,41 @@ ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml',
'text/xml']
def get_response_body(data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'text/plain':
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format.endswith('/xml'):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
output += '<errors>\n'
output += '\n'.join(
['<object>'
'<name>%s</name><status>%s</status>'
'</object>' % (saxutils.escape(name), status) for
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
raise HTTPNotAcceptable('Invalid output type')
class Bulk(object):
"""
Middleware that will do many operations on a single request.
@ -165,7 +200,7 @@ class Bulk(object):
self.max_deletes_per_request)
if '\n' in line:
obj_to_delete, line = line.split('\n', 1)
objs_to_delete.append(obj_to_delete)
objs_to_delete.append(unquote(obj_to_delete))
else:
data = req.body_file.read(MAX_PATH_LENGTH)
if data:
@ -173,46 +208,13 @@ class Bulk(object):
else:
data_remaining = False
if line.strip():
objs_to_delete.append(line)
objs_to_delete.append(unquote(line))
if len(line) > MAX_PATH_LENGTH * 2:
raise HTTPBadRequest('Invalid File Name')
return objs_to_delete
def get_response_body(self, data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'text/plain':
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format.endswith('/xml'):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
output += '<errors>\n'
output += '\n'.join(
['<object>'
'<name>%s</name><status>%s</status>'
'</object>' % (saxutils.escape(name), status) for
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
raise HTTPNotAcceptable('Invalid output type')
def handle_delete(self, req):
def handle_delete(self, req, objs_to_delete=None, user_agent='BulkDelete',
swift_source='BD'):
"""
:params req: a swob Request
:raises HTTPException: on unhandled errors
@ -231,7 +233,8 @@ class Bulk(object):
if not out_content_type:
return HTTPNotAcceptable(request=req)
objs_to_delete = self.get_objs_to_delete(req)
if objs_to_delete is None:
objs_to_delete = self.get_objs_to_delete(req)
failed_files = []
success_count = not_found_count = 0
failed_file_response_type = HTTPBadRequest
@ -239,7 +242,6 @@ class Bulk(object):
obj_to_delete = obj_to_delete.strip().lstrip('/')
if not obj_to_delete:
continue
obj_to_delete = unquote(obj_to_delete)
delete_path = '/'.join(['', vrs, account, obj_to_delete])
if not check_utf8(delete_path):
failed_files.append([quote(delete_path),
@ -250,8 +252,8 @@ class Bulk(object):
del(new_env['wsgi.input'])
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s BulkDelete' % req.environ.get('HTTP_USER_AGENT')
new_env['swift.source'] = 'BD'
'%s %s' % (req.environ.get('HTTP_USER_AGENT'), user_agent)
new_env['swift.source'] = swift_source
delete_obj_req = Request.blank(delete_path, new_env)
resp = delete_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
@ -265,7 +267,7 @@ class Bulk(object):
failed_file_response_type = HTTPBadGateway
failed_files.append([quote(delete_path), resp.status])
resp_body = self.get_response_body(
resp_body = get_response_body(
out_content_type,
{'Number Deleted': success_count,
'Number Not Found': not_found_count},
@ -371,7 +373,7 @@ class Bulk(object):
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]), resp.status])
resp_body = self.get_response_body(
resp_body = get_response_body(
out_content_type,
{'Number Files Created': success_count},
failed_files)

View File

@ -0,0 +1,355 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Middleware that will provide Static Large Object (SLO) support.
This feature is very similar to Dynamic Large Object (DLO) support in that
it allows the user to upload many objects concurrently and afterwards
download them as a single object. It is different in that it does not rely
on eventually consistent container listings to do so. Instead, a user
defined manifest of the object segments is used.
----------------------
Uploading the Manifest
----------------------
After the user has uploaded the objects to be concatenated a manifest is
uploaded. The request must be a PUT with the query parameter::
?multipart-manifest=put
The body of this request will be an ordered list of files in
json data format. The data to be supplied for each segment is::
path: the path to the segment (not including account)
/container/object_name
etag: the etag given back when the segment was PUT
size_bytes: the size of the segment in bytes
The format of the list will be::
json:
[{"path": "/cont/object",
"etag": "etagoftheobjectsegment",
"size_bytes": 1048576}, ...]
The number of object segments is limited to a configurable amount, default
1000. Each segment, except for the final one, must be at least 1 megabyte
(configurable). On upload, the middleware will head every segment passed in and
verify the size and etag of each. If any of the objects do not match (not
found, size/etag mismatch, below minimum size) then the user will receive a 4xx
error response.
If everything matches, a json manifest generated from the user input is
sent to object servers with an extra "X-Static-Large-Object: True" header
and a modified Content-Type. The parameter: swift_bytes=$total_size will be
appended to the existing Content-Type, where total_size is the sum of all
the included segments' size_bytes. This extra parameter will be hidden from
the user.
Manifest files can reference objects in separate containers, which
will improve concurrent upload speed. Objects can be referenced by
multiple manifests.
-------------------------
Retrieving a Large Object
-------------------------
A GET request to the manifest object will return the concatenation of the
objects from the manifest much like DLO. If any of the segments from the
manifest are not found or their Etag/Content Length no longer match the
connection will drop. In this case a 409 Conflict will be logged in the proxy
logs and the user will receive incomplete results.
The headers from this GET or HEAD request will return the metadata attached
to the manifest object itself with some exceptions::
Content-Length: the total size of the SLO (the sum of the sizes of
the segments in the manifest)
X-Static-Large-Object: True
Etag: the etag of the SLO (generated the same way as DLO)
A GET request with the query parameter::
?multipart-manifest=get
Will return the actual manifest file itself. This is generated json and does
not match the data sent from the original multipart-manifest=put. This call's
main purpose is for debugging.
When the manifest object is uploaded you are more or less guaranteed that
every segment in the manifest exists and matched the specifications.
However, there is nothing that prevents the user from breaking the
SLO download by deleting/replacing a segment referenced in the manifest. It is
left to the user use caution in handling the segments.
-----------------------
Deleting a Large Object
-----------------------
A DELETE request will just delete the manifest object itself.
A DELETE with a query parameter::
?multipart-manifest=delete
will delete all the segments referenced in the manifest and then, if
successful, the manifest itself. The failure response will be similar to
the bulk delete middleware.
------------------------
Modifying a Large Object
------------------------
PUTs / POSTs will work as expected, PUTs will just overwrite the manifest
object for example.
------------------
Container Listings
------------------
In a container listing the size listed for SLO manifest objects will be the
total_size of the concatenated segments in the manifest. The overall
X-Container-Bytes-Used for the container (and subsequently for the account)
will not reflect total_size of the manifest but the actual size of the json
data stored. The reason for this somewhat confusing discrepancy is we want the
container listing to reflect the size of the manifest object when it is
downloaded. We do not, however, want to count the bytes-used twice (for both
the manifest and the segments it's referring to) in the container and account
metadata which can be used for stats purposes.
"""
from urllib import quote
from cStringIO import StringIO
from datetime import datetime
import mimetypes
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, wsgify
from swift.common.utils import json, get_logger, config_true_value
from swift.common.middleware.bulk import get_response_body, \
ACCEPTABLE_FORMATS, Bulk
def parse_input(raw_data):
"""
Given a request will parse the body and return a list of dictionaries
:raises: HTTPException on parse errors
:returns: a list of dictionaries on success
"""
try:
parsed_data = json.loads(raw_data)
except ValueError:
raise HTTPBadRequest("Manifest must be valid json.")
req_keys = set(['path', 'etag', 'size_bytes'])
try:
for seg_dict in parsed_data:
if (set(seg_dict.keys()) != req_keys or
'/' not in seg_dict['path'].lstrip('/')):
raise HTTPBadRequest('Invalid SLO Manifest File')
except (AttributeError, TypeError):
raise HTTPBadRequest('Invalid SLO Manifest File')
return parsed_data
class StaticLargeObject(object):
"""
StaticLargeObject Middleware
See above for a full description.
The proxy logs created for any subrequests made will have swift.source set
to "SLO".
:param app: The next WSGI filter or app in the paste.deploy chain.
:param conf: The configuration dict for the middleware.
"""
def __init__(self, app, conf):
self.conf = conf
self.app = app
self.logger = get_logger(conf, log_route='slo')
self.max_manifest_segments = int(self.conf.get('max_manifest_segments',
1000))
self.max_manifest_size = int(self.conf.get('max_manifest_size',
1024 * 1024 * 2))
self.min_segment_size = int(self.conf.get('min_segment_size',
1024 * 1024))
self.bulk_deleter = Bulk(
app, {'max_deletes_per_request': self.max_manifest_segments})
def handle_multipart_put(self, req):
"""
Will handle the PUT of a SLO manifest.
Heads every object in manifest to check if is valid and if so will
save a manifest generated from the user input.
:params req: a swob.Request with an obj in path
:raises: HttpException on errors
"""
try:
vrs, account, container, obj = req.split_path(1, 4, True)
except ValueError:
return self.app
if req.content_length > self.max_manifest_size:
raise HTTPRequestEntityTooLarge(
"Manifest File > %d bytes" % self.max_manifest_size)
if req.headers.get('X-Copy-From'):
raise HTTPMethodNotAllowed(
'Multipart Manifest PUTs cannot be Copy requests')
parsed_data = parse_input(req.body_file.read(self.max_manifest_size))
problem_segments = []
if len(parsed_data) > self.max_manifest_segments:
raise HTTPRequestEntityTooLarge(
'Number segments must be <= %d' % self.max_manifest_segments)
total_size = 0
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
out_content_type = 'text/plain'
data_for_storage = []
for index, seg_dict in enumerate(parsed_data):
obj_path = '/'.join(
['', vrs, account, seg_dict['path'].lstrip('/')])
try:
seg_size = int(seg_dict['size_bytes'])
except (ValueError, TypeError):
raise HTTPBadRequest('Invalid Manifest File')
if seg_size < self.min_segment_size and \
(index == 0 or index < len(parsed_data) - 1):
raise HTTPBadRequest(
'Each segment, except the last, must be larger than '
'%d bytes.' % self.min_segment_size)
new_env = req.environ.copy()
if isinstance(obj_path, unicode):
obj_path = obj_path.encode('utf-8')
new_env['PATH_INFO'] = obj_path
new_env['REQUEST_METHOD'] = 'HEAD'
new_env['swift.source'] = 'SLO'
del(new_env['wsgi.input'])
del(new_env['QUERY_STRING'])
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartPUT' % req.environ.get('HTTP_USER_AGENT')
head_seg_resp = \
Request.blank(obj_path, new_env).get_response(self.app)
if head_seg_resp.status_int // 100 == 2:
total_size += seg_size
if seg_size != head_seg_resp.content_length:
problem_segments.append([quote(obj_path), 'Size Mismatch'])
if seg_dict['etag'] != head_seg_resp.etag:
problem_segments.append([quote(obj_path), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
else:
# shouldn't happen
last_modified = datetime.now()
last_modified_formatted = \
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
data_for_storage.append(
{'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': seg_size,
'hash': seg_dict['etag'],
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted})
else:
problem_segments.append([quote(obj_path),
head_seg_resp.status])
if problem_segments:
resp_body = get_response_body(
out_content_type, {}, problem_segments)
raise HTTPBadRequest(resp_body, content_type=out_content_type)
env = req.environ
if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'
env['swift.content_type_overriden'] = True
env['CONTENT_TYPE'] += ";swift_bytes=%d" % total_size
env['HTTP_X_STATIC_LARGE_OBJECT'] = 'True'
json_data = json.dumps(data_for_storage)
env['CONTENT_LENGTH'] = str(len(json_data))
env['wsgi.input'] = StringIO(json_data)
return self.app
def handle_multipart_delete(self, req):
"""
Will delete all the segments in the SLO manifest and then, if
successful, will delete the manifest file.
:params req: a swob.Request with an obj in path
:raises HTTPServerError: on invalid manifest
:returns: swob.Response on failure, otherwise self.app
"""
new_env = req.environ.copy()
new_env['REQUEST_METHOD'] = 'GET'
del(new_env['wsgi.input'])
new_env['QUERY_STRING'] = 'multipart-manifest=get'
new_env['CONTENT_LENGTH'] = 0
new_env['HTTP_USER_AGENT'] = \
'%s MultipartDELETE' % req.environ.get('HTTP_USER_AGENT')
new_env['swift.source'] = 'SLO'
get_man_resp = \
Request.blank('', new_env).get_response(self.app)
if get_man_resp.status_int // 100 == 2:
if not config_true_value(
get_man_resp.headers.get('X-Static-Large-Object')):
raise HTTPBadRequest('Not an SLO manifest')
try:
manifest = json.loads(get_man_resp.body)
except ValueError:
raise HTTPServerError('Invalid manifest file')
delete_resp = self.bulk_deleter.handle_delete(
req,
objs_to_delete=[o['name'].encode('utf-8') for o in manifest],
user_agent='MultipartDELETE', swift_source='SLO')
if delete_resp.status_int // 100 == 2:
# delete the manifest file itself
return self.app
else:
return delete_resp
return get_man_resp
@wsgify
def __call__(self, req):
"""
WSGI entry point
"""
try:
vrs, account, container, obj = req.split_path(1, 4, True)
except ValueError:
return self.app
if obj:
if req.method == 'PUT' and \
req.params.get('multipart-manifest') == 'put':
return self.handle_multipart_put(req)
if req.method == 'DELETE' and \
req.params.get('multipart-manifest') == 'delete':
return self.handle_multipart_delete(req)
return self.app
def filter_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
def slo_filter(app):
return StaticLargeObject(app, conf)
return slo_filter

View File

@ -646,7 +646,10 @@ def _req_environ_property(environ_field):
return self.environ.get(environ_field, None)
def setter(self, value):
self.environ[environ_field] = value
if isinstance(value, unicode):
self.environ[environ_field] = value.encode('utf-8')
else:
self.environ[environ_field] = value
return property(getter, setter, doc=("Get and set the %s property "
"in the WSGI environment") % environ_field)
@ -730,6 +733,8 @@ class Request(object):
"""
headers = headers or {}
environ = environ or {}
if isinstance(path, unicode):
path = path.encode('utf-8')
parsed_path = urlparse.urlparse(path)
server_name = 'localhost'
if parsed_path.netloc:

View File

@ -305,6 +305,26 @@ class ContainerController(object):
return HTTPNotAcceptable(request=req)
return HTTPNoContent(request=req, headers=headers, charset='utf-8')
def derive_content_type_metadata(self, content_type, size):
"""
Will check the last parameter and if it starts with 'swift_bytes=' will
strip it off. Returns either the passed in content_type and size
or the content_type without the swift_bytes param and its value as
the new size.
:params content_type: Content Type from db
:params size: # bytes from db, an int
:returns: tuple: content_type, size
"""
if ';' in content_type:
new_content_type, param = content_type.rsplit(';', 1)
if param.lstrip().startswith('swift_bytes='):
key, value = param.split('=')
try:
return new_content_type, int(value)
except ValueError:
self.logger.exception("Invalid swift_bytes")
return content_type, size
@public
@timing_stats()
def GET(self, req):
@ -375,6 +395,8 @@ class ContainerController(object):
# python isoformat() doesn't include msecs when zero
if len(created_at) < len("1970-01-01T00:00:00.000000"):
created_at += ".000000"
content_type, size = self.derive_content_type_metadata(
content_type, size)
data.append({'last_modified': created_at, 'bytes': size,
'content_type': content_type, 'hash': etag,
'name': name})
@ -393,6 +415,8 @@ class ContainerController(object):
xml_output.append('<subdir name="%s"><name>%s</name>'
'</subdir>' % (name, name))
else:
content_type, size = self.derive_content_type_metadata(
content_type, size)
content_type = saxutils.escape(content_type)
xml_output.append(
'<object><name>%s</name><hash>%s</hash>'

View File

@ -417,6 +417,7 @@ class ObjectController(object):
content-encoding,
x-delete-at,
x-object-manifest,
x-static-large-object,
'''
self.allowed_headers = set(
i.strip().lower() for i in

View File

@ -43,11 +43,11 @@ from swift.common.constraints import check_metadata, check_object_creation, \
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
ListingIterNotAuthorized, ListingIterError
ListingIterNotAuthorized, ListingIterError, SloSegmentError
from swift.common.http import is_success, is_client_error, HTTP_CONTINUE, \
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, \
HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_NOT_FOUND, HTTP_CONFLICT, \
HTTP_INTERNAL_SERVER_ERROR, HTTP_SERVICE_UNAVAILABLE, \
HTTP_INSUFFICIENT_STORAGE
HTTP_INSUFFICIENT_STORAGE, HTTP_OK
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
@ -65,6 +65,27 @@ def segment_listing_iter(listing):
yield seg_dict
def copy_headers_into(from_r, to_r):
"""
Will copy desired headers from from_r to to_r
:params from_r: a swob Request or Response
:params to_r: a swob Request or Response
"""
for k, v in from_r.headers.items():
if k.lower().startswith('x-object-meta-'):
to_r.headers[k] = v
def check_content_type(req):
if not req.environ.get('swift.content_type_overriden') and \
';' in req.headers.get('content-type', ''):
for param in req.headers['content-type'].split(';')[1:]:
if param.lstrip().startswith('swift_'):
return HTTPBadRequest("Invalid Content-Type, "
"swift_* is not a valid parameter name.")
return None
class SegmentedIterable(object):
"""
Iterable that returns the object contents for a segmented object in Swift.
@ -74,7 +95,9 @@ class SegmentedIterable(object):
status would have already been sent to the client).
:param controller: The ObjectController instance to work with.
:param container: The container the object segments are within.
:param container: The container the object segments are within. If
container is None will derive container from elements
in listing using split('/', 1).
:param listing: The listing of object segments to iterate over; this may
be an iterator or list that returns dicts with 'name' and
'bytes' keys.
@ -82,10 +105,12 @@ class SegmentedIterable(object):
any (default: None)
"""
def __init__(self, controller, container, listing, response=None):
def __init__(self, controller, container, listing, response=None,
is_slo=False):
self.controller = controller
self.container = container
self.listing = segment_listing_iter(listing)
self.is_slo = is_slo
self.segment = 0
self.segment_dict = None
self.segment_peek = None
@ -103,22 +128,27 @@ class SegmentedIterable(object):
"""
Loads the self.segment_iter with the next object segment's contents.
:raises: StopIteration when there are no more object segments.
:raises: StopIteration when there are no more object segments or
segment no longer matches SLO manifest specifications.
"""
try:
self.segment += 1
self.segment_dict = self.segment_peek or self.listing.next()
self.segment_peek = None
if self.container is None:
container, obj = \
self.segment_dict['name'].lstrip('/').split('/', 1)
else:
container, obj = self.container, self.segment_dict['name']
partition, nodes = self.controller.app.object_ring.get_nodes(
self.controller.account_name, self.container,
self.segment_dict['name'])
path = '/%s/%s/%s' % (self.controller.account_name, self.container,
self.segment_dict['name'])
self.controller.account_name, container, obj)
path = '/%s/%s/%s' % (self.controller.account_name, container, obj)
req = Request.blank(path)
if self.seek:
req.range = 'bytes=%s-' % self.seek
self.seek = 0
if self.segment > self.controller.app.rate_limit_after_segment:
if not self.is_slo and self.segment > \
self.controller.app.rate_limit_after_segment:
sleep(max(self.next_get_time - time.time(), 0))
self.next_get_time = time.time() + \
1.0 / self.controller.app.rate_limit_segments_per_sec
@ -128,15 +158,41 @@ class SegmentedIterable(object):
self.controller.iter_nodes(partition, nodes,
self.controller.app.object_ring),
path, len(nodes))
if self.is_slo and resp.status_int == HTTP_NOT_FOUND:
raise SloSegmentError(_(
'Could not load object segment %(path)s:'
' %(status)s') % {'path': path, 'status': resp.status_int})
if not is_success(resp.status_int):
raise Exception(_(
'Could not load object segment %(path)s:'
' %(status)s') % {'path': path, 'status': resp.status_int})
if self.is_slo:
if (resp.content_length != self.segment_dict['bytes'] or
resp.etag != self.segment_dict['hash']):
raise SloSegmentError(_(
'Object segment no longer valid: '
'%(path)s etag: %(r_etag)s != %(s_etag)s or '
'size: %(r_size)s != %(s_size)s') %
{'path': path, 'r_etag': resp.etag,
's_etag': self.segment_dict['hash'],
'r_size': resp.content_length,
's_size': self.segment_dict['bytes']})
self.segment_iter = resp.app_iter
# See NOTE: swift_conn at top of file about this.
self.segment_iter_swift_conn = getattr(resp, 'swift_conn', None)
except StopIteration:
raise
except SloSegmentError, err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.error(_(
'ERROR: While processing manifest '
'/%(acc)s/%(cont)s/%(obj)s, %(err)s'),
{'acc': self.controller.account_name,
'cont': self.controller.container_name,
'obj': self.controller.object_name, 'err': err})
err.swift_logged = True
self.response.status_int = HTTP_CONFLICT
raise StopIteration('Invalid manifiest segment')
except (Exception, Timeout), err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception(_(
@ -183,7 +239,7 @@ class SegmentedIterable(object):
def app_iter_range(self, start, stop):
"""
Non-standard iterator function for use with Webob in serving Range
Non-standard iterator function for use with Swob in serving Range
requests more quickly. This will skip over segments and do a range
request on the first segment to return data from, if needed.
@ -342,7 +398,49 @@ class ObjectController(Controller):
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
if 'x-object-manifest' in resp.headers:
if ';' in resp.headers.get('content-type', ''):
# strip off swift_bytes from content-type
content_type, check_extra_meta = \
resp.headers['content-type'].rsplit(';', 1)
if check_extra_meta.lstrip().startswith('swift_bytes='):
resp.content_type = content_type
large_object = None
if config_true_value(resp.headers.get('x-static-large-object')) and \
req.params.get('multipart-manifest') != 'get' and \
self.app.allow_static_large_object:
large_object = 'SLO'
listing_page1 = ()
listing = []
lcontainer = None # container name is included in listing
if resp.status_int == HTTP_OK and \
req.method == 'GET' and not req.range:
try:
listing = json.loads(resp.body)
except ValueError:
listing = []
else:
# need to make a second request to get whole manifest
new_req = req.copy_get()
new_req.method = 'GET'
new_req.range = None
nodes = self.app.sort_nodes(nodes)
new_resp = self.GETorHEAD_base(
new_req, _('Object'), partition,
self.iter_nodes(partition, nodes, self.app.object_ring),
req.path_info, len(nodes))
if new_resp.status_int // 100 == 2:
try:
listing = json.loads(new_resp.body)
except ValueError:
listing = []
else:
return HTTPServiceUnavailable(
"Unable to load SLO manifest", request=req)
if 'x-object-manifest' in resp.headers and \
req.params.get('multipart-manifest') != 'get':
large_object = 'DLO'
lcontainer, lprefix = \
resp.headers['x-object-manifest'].split('/', 1)
lcontainer = unquote(lcontainer)
@ -362,6 +460,7 @@ class ObjectController(Controller):
except StopIteration:
listing_page1 = listing = ()
if large_object:
if len(listing_page1) >= CONTAINER_LISTING_LIMIT:
resp = Response(headers=resp.headers, request=req,
conditional_response=True)
@ -381,27 +480,35 @@ class ObjectController(Controller):
return head_response
else:
resp.app_iter = SegmentedIterable(
self, lcontainer, listing, resp)
self, lcontainer, listing, resp,
is_slo=(large_object == 'SLO'))
else:
# For objects with a reasonable number of segments, we'll serve
# them with a set content-length and computed etag.
if listing:
listing = list(listing)
content_length = sum(o['bytes'] for o in listing)
last_modified = max(o['last_modified'] for o in listing)
last_modified = datetime(*map(int, re.split('[^\d]',
last_modified)[:-1]))
etag = md5(
''.join(o['hash'] for o in listing)).hexdigest()
try:
content_length = sum(o['bytes'] for o in listing)
last_modified = \
max(o['last_modified'] for o in listing)
last_modified = datetime(*map(int, re.split('[^\d]',
last_modified)[:-1]))
etag = md5(
''.join(o['hash'] for o in listing)).hexdigest()
except KeyError:
return HTTPServerError('Invalid Manifest File',
request=req)
else:
content_length = 0
last_modified = resp.last_modified
etag = md5().hexdigest()
resp = Response(headers=resp.headers, request=req,
conditional_response=True)
resp.app_iter = SegmentedIterable(self, lcontainer, listing,
resp)
resp.app_iter = SegmentedIterable(
self, lcontainer, listing, resp,
is_slo=(large_object == 'SLO'))
resp.content_length = content_length
resp.last_modified = last_modified
resp.etag = etag
@ -449,6 +556,10 @@ class ObjectController(Controller):
self.object_name))
req.headers['X-Fresh-Metadata'] = 'true'
req.environ['swift_versioned_copy'] = True
if req.environ.get('QUERY_STRING'):
req.environ['QUERY_STRING'] += '&multipart-manifest=get'
else:
req.environ['QUERY_STRING'] = 'multipart-manifest=get'
resp = self.PUT(req)
# Older editions returned 202 Accepted on object POSTs, so we'll
# convert any 201 Created responses to that for compatibility with
@ -658,7 +769,8 @@ class ObjectController(Controller):
req.headers['Content-Type'] = guessed_type or \
'application/octet-stream'
content_type_manually_set = False
error_response = check_object_creation(req, self.object_name)
error_response = check_object_creation(req, self.object_name) or \
check_content_type(req)
if error_response:
return error_response
if object_versions and not req.environ.get('swift_versioned_copy'):
@ -744,12 +856,14 @@ class ObjectController(Controller):
source_resp.headers['Content-Type']
if not config_true_value(
new_req.headers.get('x-fresh-metadata', 'false')):
for k, v in source_resp.headers.items():
if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v
for k, v in req.headers.items():
if k.lower().startswith('x-object-meta-'):
new_req.headers[k] = v
copy_headers_into(source_resp, new_req)
copy_headers_into(req, new_req)
# copy over x-static-large-object for POSTs and manifest copies
if 'X-Static-Large-Object' in source_resp.headers and \
req.params.get('multipart-manifest') == 'get':
new_req.headers['X-Static-Large-Object'] = \
source_resp.headers['X-Static-Large-Object']
req = new_req
node_iter = self.iter_nodes(partition, nodes, self.app.object_ring)
pile = GreenPile(len(nodes))
@ -867,9 +981,7 @@ class ObjectController(Controller):
if 'last-modified' in source_resp.headers:
resp.headers['X-Copied-From-Last-Modified'] = \
source_resp.headers['last-modified']
for k, v in req.headers.items():
if k.lower().startswith('x-object-meta-'):
resp.headers[k] = v
copy_headers_into(req, resp)
resp.last_modified = float(req.headers['X-Timestamp'])
return resp

View File

@ -113,6 +113,8 @@ class Application(object):
self.node_timings = {}
self.timing_expiry = int(conf.get('timing_expiry', 300))
self.sorting_method = conf.get('sorting_method', 'shuffle').lower()
self.allow_static_large_object = config_true_value(
conf.get('allow_static_large_object', 'true'))
def get_controller(self, path):
"""

View File

@ -390,8 +390,8 @@ class TestUntar(unittest.TestCase):
def test_get_response_body(self):
self.assertRaises(
HTTPException, self.bulk.get_response_body, 'badformat', {}, [])
xml_body = self.bulk.get_response_body(
HTTPException, bulk.get_response_body, 'badformat', {}, [])
xml_body = bulk.get_response_body(
'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']])
self.assert_('&gt' in xml_body)
@ -461,18 +461,20 @@ class TestDelete(unittest.TestCase):
results = self.bulk.get_objs_to_delete(req)
self.assertEquals(results, ['1', '2', '3'])
def test_bulk_delete_works_extra_newlines(self):
def test_bulk_delete_works_extra_newlines_extra_quoting(self):
req = Request.blank('/delete_works/AUTH_Acc',
body='/c/f\n\n\n/c/f404\n\n\n',
body='/c/f\n\n\n/c/f404\n\n\n/c/%2525',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f', '/delete_works/AUTH_Acc/c/f404'])
self.assertEquals(self.app.calls, 2)
['/delete_works/AUTH_Acc/c/f',
'/delete_works/AUTH_Acc/c/f404',
'/delete_works/AUTH_Acc/c/%25'])
self.assertEquals(self.app.calls, 3)
resp_data = json.loads(resp.body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Number Not Found'], 1)
def test_bulk_delete_too_many_newlines(self):

View File

@ -0,0 +1,383 @@
# Copyright (c) 2013 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from mock import patch
from swift.common.middleware import slo
from swift.common.utils import json
from swift.common.constraints import MAX_META_VALUE_LENGTH
from swift.common.swob import Request, Response, HTTPException, \
HTTPRequestEntityTooLarge
class FakeApp(object):
def __init__(self):
self.calls = 0
self.req_method_paths = []
def __call__(self, env, start_response):
self.calls += 1
if env['PATH_INFO'] == '/':
return Response(status=200, body='passed')(env, start_response)
if env['PATH_INFO'].startswith('/test_good/'):
j, v, a, cont, obj = env['PATH_INFO'].split('/')
if obj == 'a_2':
return Response(status=400)(env, start_response)
cont_len = 100
if obj == 'small_object':
cont_len = 10
return Response(
status=200,
headers={'etag': 'etagoftheobjectsegment',
'Content-Length': cont_len})(env, start_response)
if env['PATH_INFO'].startswith('/test_good_check/'):
j, v, a, cont, obj = env['PATH_INFO'].split('/')
etag, size = obj.split('_')
last_mod = 'Fri, 01 Feb 2012 20:38:36 GMT'
if obj == 'a_1':
last_mod = ''
return Response(
status=200,
headers={'etag': etag, 'Last-Modified': last_mod,
'Content-Length': size})(env, start_response)
if env['PATH_INFO'].startswith('/test_get/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True',
'Content-Type': 'html;swift_bytes=55'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_get_broke_json/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data[:-5])(env, start_response)
if env['PATH_INFO'].startswith('/test_get_bad_json/'):
bad_data = json.dumps(
[{'name': '/c/a_1', 'something': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=bad_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_get_not_slo/'):
return Response(status=200, body='lalala')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_404/'):
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=404)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_bad_json/'):
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body='bad json')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_bad_man/'):
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200, body='')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_bad/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
if env['PATH_INFO'].endswith('/c/a_1'):
return Response(status=401)(env, start_response)
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
test_xml_data = '''<?xml version="1.0" encoding="UTF-8"?>
<static_large_object>
<object_segment>
<path>/cont/object</path>
<etag>etagoftheobjectsegment</etag>
<size_bytes>100</size_bytes>
</object_segment>
</static_large_object>
'''
test_json_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
def fake_start_response(*args, **kwargs):
pass
class TestStaticLargeObject(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.slo = slo.filter_factory({})(self.app)
self.slo.min_segment_size = 1
def tearDown(self):
pass
def test_handle_multipart_no_obj(self):
req = Request.blank('/')
resp_iter = self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(''.join(resp_iter), 'passed')
def test_parse_input(self):
self.assertRaises(HTTPException, slo.parse_input, 'some non json')
data = json.dumps(
[{'path': '/cont/object', 'etag': 'etagoftheobjecitsegment',
'size_bytes': 100}])
self.assertEquals('/cont/object',
slo.parse_input(data)[0]['path'])
bad_data = json.dumps([{'path': '/cont/object', 'size_bytes': 100}])
self.assertRaises(HTTPException, slo.parse_input, bad_data)
def test_put_manifest_too_quick_fail(self):
req = Request.blank('/v/a/c/o')
req.content_length = self.slo.max_manifest_size + 1
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
pass
self.assertEquals(e.status_int, 413)
with patch.object(self.slo, 'max_manifest_segments', 0):
req = Request.blank('/v/a/c/o', body=test_json_data)
e = None
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
pass
self.assertEquals(e.status_int, 413)
with patch.object(self.slo, 'min_segment_size', 1000):
req = Request.blank('/v/a/c/o', body=test_json_data)
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
pass
self.assertEquals(e.status_int, 400)
req = Request.blank('/v/a/c/o', headers={'X-Copy-From': 'lala'})
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
pass
self.assertEquals(e.status_int, 405)
# ignores requests to /
req = Request.blank(
'/?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data)
self.assertEquals(self.slo.handle_multipart_put(req), self.app)
def test_handle_multipart_put_success(self):
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
self.slo(req.environ, fake_start_response)
self.assertTrue('X-Static-Large-Object' in req.headers)
def test_handle_multipart_put_success_allow_small_last_segment(self):
with patch.object(self.slo, 'min_segment_size', 50):
test_json_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100},
{'path': '/cont/small_object',
'etag': 'etagoftheobjectsegment',
'size_bytes': 10}])
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
self.slo(req.environ, fake_start_response)
self.assertTrue('X-Static-Large-Object' in req.headers)
def test_handle_multipart_put_success_unicode(self):
test_json_data = json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
self.slo(req.environ, fake_start_response)
self.assertTrue('X-Static-Large-Object' in req.headers)
self.assertTrue(req.environ['PATH_INFO'], '/cont/object\xe2\x99\xa4')
def test_handle_multipart_put_no_xml(self):
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_xml_data)
no_xml = self.slo(req.environ, fake_start_response)
self.assertEquals(no_xml, ['Manifest must be valid json.'])
def test_handle_multipart_put_bad_data(self):
bad_data = json.dumps([{'path': '/cont/object',
'etag': 'etagoftheobj',
'size_bytes': 'lala'}])
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
self.assertRaises(HTTPException, self.slo.handle_multipart_put, req)
for bad_data in [
json.dumps([{'path': '/cont', 'etag': 'etagoftheobj',
'size_bytes': 100}]),
json.dumps('asdf'), json.dumps(None), json.dumps(5),
'not json', '1234', None, '', json.dumps({'path': None}),
json.dumps([{'path': '/c/o', 'etag': None,
'size_bytes': 12}]),
json.dumps([{'path': '/c/o', 'etag': 'asdf',
'size_bytes': 'sd'}]),
json.dumps([{'path': 12, 'etag': 'etagoftheobj',
'size_bytes': 100}]),
json.dumps([{'path': u'/cont/object\u2661',
'etag': 'etagoftheobj', 'size_bytes': 100}]),
json.dumps([{'path': 12, 'size_bytes': 100}]),
json.dumps([{'path': 12, 'size_bytes': 100}]),
json.dumps([{'path': None, 'etag': 'etagoftheobj',
'size_bytes': 100}])]:
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
self.assertRaises(HTTPException, self.slo.handle_multipart_put,
req)
def test_handle_multipart_put_check_data(self):
good_data = json.dumps(
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'}])
req = Request.blank(
'/test_good_check/A/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
self.slo.handle_multipart_put(req)
self.assertEquals(self.app.calls, 2)
self.assert_(req.environ['CONTENT_TYPE'].endswith(';swift_bytes=3'))
manifest_data = json.loads(req.environ['wsgi.input'].read())
self.assertEquals(len(manifest_data), 2)
self.assertEquals(manifest_data[0]['hash'], 'a')
self.assertEquals(manifest_data[0]['bytes'], 1)
self.assert_(not manifest_data[0]['last_modified'].startswith('2012'))
self.assert_(manifest_data[1]['last_modified'].startswith('2012'))
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/c/a_2', 'etag': 'a', 'size_bytes': '1'},
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'}])
req = Request.blank(
'/test_good/A/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json'},
body=bad_data)
try:
self.slo.handle_multipart_put(req)
except HTTPException, e:
self.assertEquals(self.app.calls, 3)
data = json.loads(e.body)
errors = data['Errors']
self.assertEquals(errors[0][0], '/test_good/A/c/a_1')
self.assertEquals(errors[0][1], 'Size Mismatch')
self.assertEquals(errors[2][1], '400 Bad Request')
self.assertEquals(errors[-1][0], '/test_good/A/d/b_2')
self.assertEquals(errors[-1][1], 'Etag Mismatch')
else:
self.assert_(False)
def test_handle_multipart_delete_man(self):
req = Request.blank(
'/test_good/A/c/man', environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
def test_handle_multipart_delete_whole_404(self):
req = Request.blank(
'/test_delete_404/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_404/A/c/man')])
def test_handle_multipart_delete_whole(self):
req = Request.blank(
'/test_delete/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 4)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete/A/c/man'),
('DELETE', '/test_delete/A/c/a_1'),
('DELETE', '/test_delete/A/d/b_2'),
('DELETE', '/test_delete/A/c/man')])
def test_handle_multipart_delete_bad_manifest(self):
req = Request.blank(
'/test_delete_bad_man/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad_man/A/c/man')])
self.assertEquals(resp, ['Not an SLO manifest'])
def test_handle_multipart_delete_bad_json(self):
req = Request.blank(
'/test_delete_bad_json/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
resp = self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad_json/A/c/man')])
self.assertEquals(resp, ['Invalid manifest file'])
def test_handle_multipart_delete_whole_bad(self):
req = Request.blank(
'/test_delete_bad/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 2)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad/A/c/man'),
('DELETE', '/test_delete_bad/A/c/a_1')])
if __name__ == '__main__':
unittest.main()

View File

@ -203,5 +203,13 @@ class TestConstraints(unittest.TestCase):
valid_utf8_str]:
self.assertTrue(constraints.check_utf8(true_argument))
def test_validate_bad_meta(self):
req = Request.blank(
'/v/a/c/o',
headers={'x-object-meta-hello':
'ab' * constraints.MAX_HEADER_SIZE})
self.assertEquals(constraints.check_metadata(req, 'object').status_int,
HTTP_BAD_REQUEST)
if __name__ == '__main__':
unittest.main()

View File

@ -19,6 +19,7 @@ import unittest
import datetime
import re
from StringIO import StringIO
from urllib import quote
import swift.common.swob
@ -492,6 +493,14 @@ class TestRequest(unittest.TestCase):
except ValueError, err:
self.assertEquals(str(err), 'Invalid path: o%0An%20e')
def test_unicode_path(self):
req = swift.common.swob.Request.blank(u'/\u2661')
self.assertEquals(req.path, quote(u'/\u2661'.encode('utf-8')))
def test_unicode_query(self):
req = swift.common.swob.Request.blank(u'/')
req.query_string = u'x=\u2661'
self.assertEquals(req.params['x'], u'\u2661'.encode('utf-8'))
class TestStatusMap(unittest.TestCase):

View File

@ -233,13 +233,19 @@ def fake_http_connect(*code_iter, **kwargs):
return FakeConn(100)
def getheaders(self):
etag = self.etag
if not etag:
if isinstance(self.body, str):
etag = '"' + md5(self.body).hexdigest() + '"'
else:
etag = '"68b329da9893e34099c7d8ad5cb9c940"'
headers = {'content-length': len(self.body),
'content-type': 'x-application/test',
'x-timestamp': self.timestamp,
'last-modified': self.timestamp,
'x-object-meta-test': 'testing',
'etag':
self.etag or '"68b329da9893e34099c7d8ad5cb9c940"',
'etag': etag,
'x-works': 'yes',
'x-account-container-count': 12345}
if not self.timestamp:
@ -952,27 +958,29 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(test_errors, [])
def test_GET_manifest_no_segments(self):
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # GET manifest
simplejson.dumps([])) # GET empty listing
for hdict in [{"X-Object-Manifest": "segments/seg"},
{"X-Static-Large-Object": "True"}]:
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # GET manifest
simplejson.dumps([])) # GET empty listing
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET manifest
200, # GET empty listing
headers={"X-Object-Manifest": "segments/seg"},
body_iter=response_bodies)
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET manifest
200, # GET empty listing
headers=hdict,
body_iter=response_bodies)
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, '')
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, '')
def test_GET_manifest_limited_listing(self):
listing1 = [{"hash": "454dfc73af632012ce3e6217dc464241",
@ -1024,6 +1032,7 @@ class TestObjectController(unittest.TestCase):
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
@ -1079,6 +1088,229 @@ class TestObjectController(unittest.TestCase):
swift.proxy.controllers.obj.CONTAINER_LISTING_LIMIT = \
_orig_container_listing_limit
def test_GET_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'Bb') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
200, # GET seg02
headers={"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'AaBb')
self.assertEqual(resp.content_length, 4)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}]])
def test_GET_bad_etag_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "invalidhash",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'Bb') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
200, # GET seg02
headers={"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'Aa') # dropped connection
self.assertEqual(resp.content_length, 4) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}]])
def test_GET_bad_404_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"},
{"hash": "invalidhash",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg03",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
'') # GET seg02
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
404, # GET seg02
headers={"X-Static-Large-Object": "True",
'content-type': 'text/html; swift_bytes=4'},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'Aa') # dropped connection
self.assertEqual(resp.content_length, 6) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/seg02', {}],
['GET', '/a/d2/seg02', {}],
['GET', '/a/d2/seg02', {}]]) # 2nd segment not found
def test_HEAD_manifest_slo(self):
listing = [{"hash": "454dfc73af632012ce3e6217dc464241",
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "474bab96c67528d42d5c0c52b35228eb",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 2,
"name": "/d2/seg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # HEAD manifest
simplejson.dumps(listing)) # GET manifest
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
200, # HEAD listing1
200, # GET listing1
headers={"X-Static-Large-Object": "True"},
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest',
environ={'REQUEST_METHOD': 'HEAD'})
resp = controller.HEAD(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['HEAD', '/a/c/manifest', {}],
['GET', '/a/c/manifest', {}]])
def test_PUT_auto_content_type(self):
with save_globals():
controller = proxy_server.ObjectController(self.app, 'account',
@ -1197,6 +1429,17 @@ class TestObjectController(unittest.TestCase):
res = controller.PUT(req)
self.assertEquals(res.status_int, 413)
def test_PUT_bad_content_type(self):
with save_globals():
set_http_connect(201, 201, 201)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
req = Request.blank('/a/c/o', {}, headers={
'Content-Length': 0, 'Content-Type': 'foo/bar;swift_hey=45'})
self.app.update_request(req)
res = controller.PUT(req)
self.assertEquals(res.status_int, 400)
def test_PUT_getresponse_exceptions(self):
with save_globals():