Refactor Bulk middleware to handle long running requests

Change-Id: I8ea0ff86518d453597faae44ec3918298e2d5147
This commit is contained in:
David Goetz 2013-04-30 14:45:46 -07:00
parent 407e08fa30
commit af2607c457
7 changed files with 327 additions and 224 deletions

View File

@ -362,8 +362,9 @@ use = egg:swift#proxy_logging
[filter:bulk]
use = egg:swift#bulk
# max_containers_per_extraction = 10000
# max_failed_files = 1000
# max_deletes_per_request = 1000
# max_failed_extractions = 1000
# max_deletes_per_request = 10000
# yield_frequency = 60
# Note: Put after auth in the pipeline.
[filter:container-quotas]

View File

@ -16,11 +16,12 @@
import tarfile
from urllib import quote, unquote
from xml.sax import saxutils
from time import time
from swift.common.swob import Request, HTTPBadGateway, \
HTTPCreated, HTTPBadRequest, HTTPNotFound, HTTPUnauthorized, HTTPOk, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPNotAcceptable, \
HTTPLengthRequired, wsgify
from swift.common.utils import json
HTTPLengthRequired, HTTPException, HTTPServerError, wsgify
from swift.common.utils import json, get_logger
from swift.common.constraints import check_utf8, MAX_FILE_SIZE
from swift.common.http import HTTP_BAD_REQUEST, HTTP_UNAUTHORIZED, \
HTTP_NOT_FOUND
@ -44,25 +45,18 @@ ACCEPTABLE_FORMATS = ['text/plain', 'application/json', 'application/xml',
def get_response_body(data_format, data_dict, error_list):
"""
Returns a properly formatted response body according to format.
Returns a properly formatted response body according to format. Handles
json and xml, otherwise will return text/plain. Note: xml response does not
include xml declaration.
:params data_format: resulting format
:params data_dict: generated data about results.
:params error_list: list of quoted filenames that failed
"""
if data_format == 'text/plain':
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
if data_format == 'application/json':
data_dict['Errors'] = error_list
return json.dumps(data_dict)
if data_format.endswith('/xml'):
output = '<?xml version="1.0" encoding="UTF-8"?>\n<delete>\n'
if data_format and data_format.endswith('/xml'):
output = '<delete>\n'
for key in sorted(data_dict.keys()):
xml_key = key.replace(' ', '_').lower()
output += '<%s>%s</%s>\n' % (xml_key, data_dict[key], xml_key)
@ -74,7 +68,15 @@ def get_response_body(data_format, data_dict, error_list):
name, status in error_list])
output += '</errors>\n</delete>\n'
return output
raise HTTPNotAcceptable('Invalid output type')
output = ''
for key in sorted(data_dict.keys()):
output += '%s: %s\n' % (key, data_dict[key])
output += 'Errors:\n'
output += '\n'.join(
['%s, %s' % (name, status)
for name, status in error_list])
return output
class Bulk(object):
@ -106,13 +108,31 @@ class Bulk(object):
Only regular files will be uploaded. Empty directories, symlinks, etc will
not be uploaded.
If all valid files were uploaded successfully will return an HTTPCreated
response. If any files failed to be created will return an HTTPBadGateway
response. In both cases the response body will specify the number of files
successfully uploaded and a list of the files that failed. The return body
will be formatted in the way specified in the request's Accept header.
Acceptable formats are text/plain, application/json, application/xml, and
text/xml.
The response from bulk operations functions differently from other swift
responses. This is because a short request body sent from the client could
result in many operations on the proxy server and precautions need to be
made to prevent the request from timing out due to lack of activity. To
this end, the client will always receive a 200 Ok response, regardless of
the actual success of the call. The body of the response must be parsed to
determine the actual success of the operation. In addition to this the
client may receive zero or more whitespace characters prepended to the
actual response body while the proxy server is completing the request.
The format of the response body defaults to text/plain but can be either
json or xml depending on the Accept header. Acceptable formats are
text/plain, application/json, application/xml, and text/xml. An example
body is as follows:
{"Response Code": "201 Created",
"Response Body": "",
"Errors": [],
"Number Files Created": 10}
If all valid files were uploaded successfully the Response Code will be a
201 Created. If any files failed to be created the response code
corresponds to the subrequest's error. Possible codes are 400, 401, 502 (on
server errors), etc. In both cases the response body will specify the
number of files successfully uploaded and a list of the files that failed.
There are proxy logs created for each file (which becomes a subrequest) in
the tar. The subrequest's proxy log will have a swift.source set to "EA"
@ -127,7 +147,7 @@ class Bulk(object):
single request. Responds to DELETE requests with query parameter
?bulk-delete set. The Content-Type should be set to text/plain.
The body of the DELETE request will be a newline separated list of url
encoded objects to delete. You can only delete 1000 (configurable) objects
encoded objects to delete. You can delete 10,000 (configurable) objects
per request. The objects specified in the DELETE request body must be URL
encoded and in the form:
@ -137,10 +157,21 @@ class Bulk(object):
/container_name
If all items were successfully deleted (or did not exist), will return an
HTTPOk. If any failed to delete, will return an HTTPBadGateway. In
both cases the response body will specify the number of items
successfully deleted, not found, and a list of those that failed.
The response is similar to bulk deletes as in every response will be a 200
Ok and you must parse the response body for acutal results. An example
response is:
{"Number Not Found": 0,
"Response Code": "200 OK",
"Response Body": "",
"Errors": [],
"Number Deleted": 6}
If all items were successfully deleted (or did not exist), the Response
Code will be a 200 Ok. If any failed to delete, the response code
corresponds to the subrequest's error. Possible codes are 400, 401, 502 (on
server errors), etc. In all cases the response body will specify the number
of items successfully deleted, not found, and a list of those that failed.
The return body will be formatted in the way specified in the request's
Accept header. Acceptable formats are text/plain, application/json,
application/xml, and text/xml.
@ -155,12 +186,14 @@ class Bulk(object):
def __init__(self, app, conf):
self.app = app
self.logger = get_logger(conf, log_route='bulk')
self.max_containers = int(
conf.get('max_containers_per_extraction', 10000))
self.max_failed_extractions = int(
conf.get('max_failed_extractions', 1000))
self.max_deletes_per_request = int(
conf.get('max_deletes_per_request', 1000))
conf.get('max_deletes_per_request', 10000))
self.yield_frequency = int(conf.get('yield_frequency', 60))
def create_container(self, req, container_path):
"""
@ -213,32 +246,53 @@ class Bulk(object):
raise HTTPBadRequest('Invalid File Name')
return objs_to_delete
def handle_delete(self, req, objs_to_delete=None, user_agent='BulkDelete',
swift_source='BD'):
def handle_delete_iter(self, req, objs_to_delete=None,
user_agent='BulkDelete', swift_source='BD'):
"""
A generator that can be assigned to a swob Response's app_iter which,
when iterated over, will delete the objects specified in request body.
Will occasionally yield whitespace while request is being processed.
When the request is completed will yield a response body that can be
parsed to determine success. See above documentation for details.
:params req: a swob Request
:raises HTTPException: on unhandled errors
:returns: a swob Response
:params objs_to_delete: a list of dictionaries that specifies the
objects to be deleted. If None, uses
self.get_objs_to_delete to query request.
"""
last_yield = time()
separator = ''
failed_files = []
resp_dict = {'Response Status': HTTPOk().status,
'Response Body': '',
'Number Deleted': 0,
'Number Not Found': 0}
try:
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
raise HTTPNotAcceptable(request=req)
if out_content_type.endswith('/xml'):
yield '<?xml version="1.0" encoding="UTF-8"?>\n'
try:
vrs, account, _junk = req.split_path(2, 3, True)
except ValueError:
return HTTPNotFound(request=req)
raise HTTPNotFound(request=req)
incoming_format = req.headers.get('Content-Type')
if incoming_format and not incoming_format.startswith('text/plain'):
if incoming_format and \
not incoming_format.startswith('text/plain'):
# For now only accept newline separated object names
return HTTPNotAcceptable(request=req)
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
return HTTPNotAcceptable(request=req)
raise HTTPNotAcceptable(request=req)
if objs_to_delete is None:
objs_to_delete = self.get_objs_to_delete(req)
failed_files = []
success_count = not_found_count = 0
failed_file_response_type = HTTPBadRequest
req.environ['eventlet.minimum_write_chunk_size'] = 0
for obj_to_delete in objs_to_delete:
if last_yield + self.yield_frequency < time():
separator = '\r\n\r\n'
last_yield = time()
yield ' '
obj_to_delete = obj_to_delete.strip().lstrip('/')
if not obj_to_delete:
continue
@ -257,55 +311,80 @@ class Bulk(object):
delete_obj_req = Request.blank(delete_path, new_env)
resp = delete_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
success_count += 1
resp_dict['Number Deleted'] += 1
elif resp.status_int == HTTP_NOT_FOUND:
not_found_count += 1
resp_dict['Number Not Found'] += 1
elif resp.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
failed_files.append([quote(delete_path),
HTTP_UNAUTHORIZED])
raise HTTPUnauthorized(request=req)
else:
if resp.status_int // 100 == 5:
failed_file_response_type = HTTPBadGateway
failed_files.append([quote(delete_path), resp.status])
resp_body = get_response_body(
out_content_type,
{'Number Deleted': success_count,
'Number Not Found': not_found_count},
failed_files)
if (success_count or not_found_count) and not failed_files:
return HTTPOk(resp_body, content_type=out_content_type)
if failed_files:
return failed_file_response_type(
resp_body, content_type=out_content_type)
return HTTPBadRequest('Invalid bulk delete.')
resp_dict['Response Status'] = \
failed_file_response_type().status
elif not (resp_dict['Number Deleted'] or
resp_dict['Number Not Found']):
resp_dict['Response Status'] = HTTPBadRequest().status
resp_dict['Response Body'] = 'Invalid bulk delete.'
def handle_extract(self, req, compress_type):
except HTTPException, err:
resp_dict['Response Status'] = err.status
resp_dict['Response Body'] = err.body
except Exception:
self.logger.exception('Error in bulk delete.')
resp_dict['Response Status'] = HTTPServerError().status
yield separator + get_response_body(out_content_type,
resp_dict, failed_files)
def handle_extract_iter(self, req, compress_type):
"""
A generator that can be assigned to a swob Response's app_iter which,
when iterated over, will extract and PUT the objects pulled from the
request body. Will occasionally yield whitespace while request is being
processed. When the request is completed will yield a response body
that can be parsed to determine success. See above documentation for
details.
:params req: a swob Request
:params compress_type: specifying the compression type of the tar.
Accepts '', 'gz, or 'bz2'
:raises HTTPException: on unhandled errors
:returns: a swob response to request
"""
success_count = 0
resp_dict = {'Response Status': HTTPCreated().status,
'Response Body': '', 'Number Files Created': 0}
failed_files = []
last_yield = time()
separator = ''
existing_containers = set()
try:
out_content_type = req.accept.best_match(ACCEPTABLE_FORMATS)
if not out_content_type:
return HTTPNotAcceptable(request=req)
raise HTTPNotAcceptable(request=req)
if out_content_type.endswith('/xml'):
yield '<?xml version="1.0" encoding="UTF-8"?>\n'
if req.content_length is None and \
req.headers.get('transfer-encoding', '').lower() != 'chunked':
return HTTPLengthRequired(request=req)
req.headers.get('transfer-encoding',
'').lower() != 'chunked':
raise HTTPLengthRequired(request=req)
try:
vrs, account, extract_base = req.split_path(2, 3, True)
except ValueError:
return HTTPNotFound(request=req)
raise HTTPNotFound(request=req)
extract_base = extract_base or ''
extract_base = extract_base.rstrip('/')
try:
tar = tarfile.open(mode='r|' + compress_type,
fileobj=req.body_file)
failed_response_type = HTTPBadRequest
req.environ['eventlet.minimum_write_chunk_size'] = 0
while True:
if last_yield + self.yield_frequency < time():
separator = '\r\n\r\n'
last_yield = time()
yield ' '
tar_info = tar.next()
if tar_info is None or \
len(failed_files) >= self.max_failed_extractions:
@ -340,7 +419,7 @@ class Bulk(object):
existing_containers.add(container)
except CreateContainerError, err:
if err.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
raise HTTPUnauthorized(request=req)
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]),
err.status])
@ -351,7 +430,7 @@ class Bulk(object):
HTTP_BAD_REQUEST])
continue
if len(existing_containers) > self.max_containers:
return HTTPBadRequest(
raise HTTPBadRequest(
'More than %d base level containers in tar.' %
self.max_containers)
@ -366,41 +445,55 @@ class Bulk(object):
create_obj_req = Request.blank(destination, new_env)
resp = create_obj_req.get_response(self.app)
if resp.status_int // 100 == 2:
success_count += 1
resp_dict['Number Files Created'] += 1
else:
if resp.status_int == HTTP_UNAUTHORIZED:
return HTTPUnauthorized(request=req)
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]),
HTTP_UNAUTHORIZED])
raise HTTPUnauthorized(request=req)
if resp.status_int // 100 == 5:
failed_response_type = HTTPBadGateway
failed_files.append([
quote(destination[:MAX_PATH_LENGTH]), resp.status])
resp_body = get_response_body(
out_content_type,
{'Number Files Created': success_count},
failed_files)
if success_count and not failed_files:
return HTTPCreated(resp_body, content_type=out_content_type)
if failed_files:
return HTTPBadGateway(resp_body, content_type=out_content_type)
return HTTPBadRequest('Invalid Tar File: No Valid Files')
resp_dict['Response Status'] = failed_response_type().status
elif not resp_dict['Number Files Created']:
resp_dict['Response Status'] = HTTPBadRequest().status
resp_dict['Response Body'] = 'Invalid Tar File: No Valid Files'
except HTTPException, err:
resp_dict['Response Status'] = err.status
resp_dict['Response Body'] = err.body
except tarfile.TarError, tar_error:
return HTTPBadRequest('Invalid Tar File: %s' % tar_error)
resp_dict['Response Status'] = HTTPBadRequest().status,
resp_dict['Response Body'] = 'Invalid Tar File: %s' % tar_error
except Exception:
self.logger.exception('Error in extract archive.')
resp_dict['Response Status'] = HTTPServerError().status
yield separator + get_response_body(
out_content_type, resp_dict, failed_files)
@wsgify
def __call__(self, req):
extract_type = req.params.get('extract-archive')
resp = None
if extract_type is not None and req.method == 'PUT':
archive_type = {
'tar': '', 'tar.gz': 'gz',
'tar.bz2': 'bz2'}.get(extract_type.lower().strip('.'))
if archive_type is not None:
return self.handle_extract(req, archive_type)
resp = HTTPOk(request=req)
resp.app_iter = self.handle_extract_iter(req, archive_type)
else:
return HTTPBadRequest("Unsupported archive format")
resp = HTTPBadRequest("Unsupported archive format")
if 'bulk-delete' in req.params and req.method == 'DELETE':
return self.handle_delete(req)
resp = HTTPOk(request=req)
resp.app_iter = self.handle_delete_iter(req)
return self.app
return resp or self.app
def filter_factory(global_conf, **local_conf):

View File

@ -138,8 +138,10 @@ from cStringIO import StringIO
from datetime import datetime
import mimetypes
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, wsgify
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
HTTPOk, HTTPPreconditionFailed, wsgify
from swift.common.utils import json, get_logger, config_true_value
from swift.common.constraints import check_utf8
from swift.common.middleware.bulk import get_response_body, \
ACCEPTABLE_FORMATS, Bulk
@ -303,8 +305,15 @@ class StaticLargeObject(object):
successful, will delete the manifest file.
:params req: a swob.Request with an obj in path
:raises HTTPServerError: on invalid manifest
:returns: swob.Response on failure, otherwise self.app
:returns: swob.Response whose app_iter set to Bulk.handle_delete_iter
"""
if not check_utf8(req.path_info):
raise HTTPPreconditionFailed(
request=req, body='Invalid UTF8 or contains NULL')
try:
vrs, account, container, obj = req.split_path(4, 4, True)
except ValueError:
raise HTTPBadRequest('Not an SLO manifest')
new_env = req.environ.copy()
new_env['REQUEST_METHOD'] = 'GET'
del(new_env['wsgi.input'])
@ -321,17 +330,17 @@ class StaticLargeObject(object):
raise HTTPBadRequest('Not an SLO manifest')
try:
manifest = json.loads(get_man_resp.body)
# append the manifest file for deletion at the end
manifest.append(
{'name': '/'.join(['', container, obj]).decode('utf-8')})
except ValueError:
raise HTTPServerError('Invalid manifest file')
delete_resp = self.bulk_deleter.handle_delete(
resp = HTTPOk(request=req)
resp.app_iter = self.bulk_deleter.handle_delete_iter(
req,
objs_to_delete=[o['name'].encode('utf-8') for o in manifest],
user_agent='MultipartDELETE', swift_source='SLO')
if delete_resp.status_int // 100 == 2:
# delete the manifest file itself
return self.app
else:
return delete_resp
return resp
return get_man_resp
@wsgify

View File

@ -189,7 +189,7 @@ class SegmentedIterable(object):
'obj': self.controller.object_name, 'err': err})
err.swift_logged = True
self.response.status_int = HTTP_CONFLICT
raise StopIteration('Invalid manifiest segment')
raise
except (Exception, Timeout), err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception(_(

View File

@ -111,6 +111,11 @@ class TestUntar(unittest.TestCase):
self.app.calls = 0
rmtree(self.testdir)
def handle_extract_and_iter(self, req, compress_format):
resp_body = ''.join(
self.bulk.handle_extract_iter(req, compress_format))
return resp_body
def test_create_container_for_path(self):
req = Request.blank('/')
self.assertEquals(
@ -147,8 +152,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, compress_format)
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, compress_format)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Files Created'], 6)
# test out xml
@ -157,18 +162,19 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, compress_format)
self.assertEquals(resp.status_int, 201)
resp_body = self.handle_extract_and_iter(req, compress_format)
self.assert_('<response_status>201 Created</response_status>' in
resp_body)
self.assert_('<number_files_created>6</number_files_created>' in
resp.body)
resp_body)
# test out nonexistent format
req = Request.blank('/tar_works/acc/cont/',
headers={'Accept': 'good_xml'})
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar' + extension))
resp = self.bulk.handle_extract(req, compress_format)
self.assertEquals(resp.status_int, 406)
resp_body = self.handle_extract_and_iter(req, compress_format)
self.assert_('Response Status: 406' in resp_body)
def test_extract_call(self):
base_name = 'base_works_gz'
@ -198,7 +204,8 @@ class TestUntar(unittest.TestCase):
os.path.join(self.testdir, 'tar_works.tar.gz'))
req.headers['transfer-encoding'] = 'Chunked'
req.method = 'PUT'
self.bulk(req.environ, fake_start_response)
app_iter = self.bulk(req.environ, fake_start_response)
resp_body = ''.join([i for i in app_iter])
self.assertEquals(self.app.calls, 7)
self.app.calls = 0
@ -221,18 +228,19 @@ class TestUntar(unittest.TestCase):
req.headers['transfer-encoding'] = 'Chunked'
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar'))
t = self.bulk(req.environ, fake_start_response)
app_iter = self.bulk(req.environ, fake_start_response)
resp_body = ''.join([i for i in app_iter])
self.assertEquals(self.app.calls, 7)
def test_bad_container(self):
req = Request.blank('/invalid/', body='')
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 404)
resp_body = self.handle_extract_and_iter(req, '')
self.assertTrue('404 Not Found' in resp_body)
def test_content_length_required(self):
req = Request.blank('/create_cont_fail/acc/cont')
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 411)
resp_body = self.handle_extract_and_iter(req, '')
self.assertTrue('411 Length Required' in resp_body)
def build_tar(self, dir_tree=None):
if not dir_tree:
@ -260,8 +268,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Files Created'], 4)
def test_extract_tar_fail_cont_401(self):
@ -270,8 +278,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 401)
resp_body = self.handle_extract_and_iter(req, '')
self.assertTrue('Response Status: 401 Unauthorized' in resp_body)
def test_extract_tar_fail_obj_401(self):
self.build_tar()
@ -279,8 +287,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
self.assertEquals(resp.status_int, 401)
resp_body = self.handle_extract_and_iter(req, '')
self.assertTrue('Response Status: 401 Unauthorized' in resp_body)
def test_extract_tar_fail_obj_name_len(self):
self.build_tar()
@ -289,8 +297,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Files Created'], 4)
self.assertEquals(resp_data['Errors'][0][0],
'/tar_works/acc/cont/base_fails1/' + ('f' * 101))
@ -301,8 +309,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, 'gz')
self.assertEquals(resp.status_int, 400)
resp_body = self.handle_extract_and_iter(req, 'gz')
self.assert_('400 Bad Request' in resp_body)
self.assertEquals(self.app.calls, 0)
def test_extract_tar_fail_max_file_name_length(self):
@ -314,8 +322,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assertEquals(self.app.calls, 5)
self.assertEquals(resp_data['Errors'][0][0],
'/tar_works/acc/cont/base_fails1/' + ('f' * 101))
@ -336,8 +344,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(
os.path.join(self.testdir, 'tar_works.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assert_(resp_data['Errors'][0][1].startswith('413'))
def test_extract_tar_fail_max_cont(self):
@ -351,9 +359,9 @@ class TestUntar(unittest.TestCase):
body = open(os.path.join(self.testdir, 'tar_fails.tar')).read()
req = Request.blank('/tar_works/acc/', body=body)
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_body = self.handle_extract_and_iter(req, '')
self.assertEquals(self.app.calls, 3)
self.assertEquals(resp.status_int, 400)
self.assert_('400 Bad Request' in resp_body)
def test_extract_tar_fail_create_cont(self):
dir_tree = [{'base_fails1': [
@ -367,8 +375,8 @@ class TestUntar(unittest.TestCase):
req.environ['wsgi.input'] = open(os.path.join(self.testdir,
'tar_fails.tar'))
req.headers['transfer-encoding'] = 'chunked'
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assertEquals(self.app.calls, 4)
self.assertEquals(len(resp_data['Errors']), 5)
@ -384,14 +392,15 @@ class TestUntar(unittest.TestCase):
raise ValueError('Test')
with patch.object(self.bulk, 'create_container', bad_create):
resp = self.bulk.handle_extract(req, '')
resp_data = json.loads(resp.body)
resp_body = self.handle_extract_and_iter(req, '')
resp_data = json.loads(resp_body)
self.assertEquals(self.app.calls, 0)
self.assertEquals(len(resp_data['Errors']), 5)
def test_get_response_body(self):
self.assertRaises(
HTTPException, bulk.get_response_body, 'badformat', {}, [])
txt_body = bulk.get_response_body(
'bad_formay', {'hey': 'there'}, [['json > xml', '202 Accepted']])
self.assert_('hey: there' in txt_body)
xml_body = bulk.get_response_body(
'text/xml', {'hey': 'there'}, [['json > xml', '202 Accepted']])
self.assert_('&gt' in xml_body)
@ -407,35 +416,35 @@ class TestDelete(unittest.TestCase):
self.app.calls = 0
self.app.delete_paths = []
def handle_delete_and_iter(self, req):
resp_body = ''.join(self.bulk.handle_delete_iter(req))
return resp_body
def test_bulk_delete_works(self):
req = Request.blank('/delete_works/AUTH_Acc', body='/c/f\n/c/f404',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
resp_body = self.handle_delete_and_iter(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f', '/delete_works/AUTH_Acc/c/f404'])
self.assertEquals(self.app.calls, 2)
resp_data = json.loads(resp.body)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(resp_data['Number Not Found'], 1)
def test_bulk_delete_bad_accept_and_content_type(self):
def test_bulk_delete_bad_content_type(self):
req = Request.blank('/delete_works/AUTH_Acc',
headers={'Accept': 'badformat'})
req.method = 'DELETE'
req.environ['wsgi.input'] = StringIO('/c/f\n/c/f404')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 406)
req = Request.blank('/delete_works/AUTH_Acc',
headers={'Accept': 'application/json',
'Content-Type': 'text/xml'})
req.method = 'DELETE'
req.environ['wsgi.input'] = StringIO('/c/f\n/c/f404')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 406)
resp_body = self.handle_delete_and_iter(req)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Response Status'], '406 Not Acceptable')
def test_bulk_delete_call(self):
def fake_start_response(*args, **kwargs):
@ -444,7 +453,8 @@ class TestDelete(unittest.TestCase):
req.method = 'DELETE'
req.headers['Transfer-Encoding'] = 'chunked'
req.environ['wsgi.input'] = StringIO('/c/f')
self.bulk(req.environ, fake_start_response)
list(self.bulk(req.environ,
fake_start_response)) # iterate over whole resp
self.assertEquals(
self.app.delete_paths, ['/delete_works/AUTH_Acc/c/f'])
self.assertEquals(self.app.calls, 1)
@ -474,14 +484,14 @@ class TestDelete(unittest.TestCase):
body='/c/f\n\n\n/c/f404\n\n\n/c/%2525',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
resp_body = self.handle_delete_and_iter(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/f',
'/delete_works/AUTH_Acc/c/f404',
'/delete_works/AUTH_Acc/c/%25'])
self.assertEquals(self.app.calls, 3)
resp_data = json.loads(resp.body)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Number Not Found'], 1)
@ -491,23 +501,8 @@ class TestDelete(unittest.TestCase):
data = '\n\n' * self.bulk.max_deletes_per_request
req.environ['wsgi.input'] = StringIO(data)
req.content_length = len(data)
try:
self.bulk.handle_delete(req)
except HTTPException, err:
self.assertEquals(err.status_int, 413)
else:
self.fail('413 not raised')
def test_bulk_delete_raised_error(self):
def fake_start_response(*args, **kwargs):
self.assertTrue(args[0].startswith('413'))
req = Request.blank('/delete_works/AUTH_Acc?bulk-delete')
req.method = 'DELETE'
data = '\n\n' * self.bulk.max_deletes_per_request
req.environ['wsgi.input'] = StringIO(data)
req.content_length = len(data)
self.bulk(req.environ, fake_start_response)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('413 Request Entity Too Large' in resp_body)
def test_bulk_delete_works_unicode(self):
body = (u'/c/ obj \u2661\r\n'.encode('utf8') +
@ -516,14 +511,14 @@ class TestDelete(unittest.TestCase):
req = Request.blank('/delete_works/AUTH_Acc', body=body,
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
resp_body = self.handle_delete_and_iter(req)
self.assertEquals(
self.app.delete_paths,
['/delete_works/AUTH_Acc/c/ obj \xe2\x99\xa1',
'/delete_works/AUTH_Acc/c/ objbadutf8'])
self.assertEquals(self.app.calls, 2)
resp_data = json.loads(resp.body)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Deleted'], 1)
self.assertEquals(len(resp_data['Errors']), 2)
self.assertEquals(
@ -535,36 +530,37 @@ class TestDelete(unittest.TestCase):
def test_bulk_delete_no_body(self):
req = Request.blank('/unauth/AUTH_acc/')
self.assertRaises(HTTPException, self.bulk.handle_delete, req)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('411 Length Required' in resp_body)
def test_bulk_delete_no_files_in_body(self):
req = Request.blank('/unauth/AUTH_acc/', body=' ')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 400)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('400 Bad Request' in resp_body)
def test_bulk_delete_unauth(self):
req = Request.blank('/unauth/AUTH_acc/', body='/c/f\n')
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 401)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('401 Unauthorized' in resp_body)
def test_bulk_delete_500_resp(self):
req = Request.blank('/broke/AUTH_acc/', body='/c/f\n')
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 502)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('502 Bad Gateway' in resp_body)
def test_bulk_delete_bad_path(self):
req = Request.blank('/delete_cont_fail/')
resp = self.bulk.handle_delete(req)
self.assertEquals(resp.status_int, 404)
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('404 Not Found' in resp_body)
def test_bulk_delete_container_delete(self):
req = Request.blank('/delete_cont_fail/AUTH_Acc', body='c\n',
headers={'Accept': 'application/json'})
req.method = 'DELETE'
resp = self.bulk.handle_delete(req)
resp_data = json.loads(resp.body)
resp_body = self.handle_delete_and_iter(req)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Deleted'], 0)
self.assertEquals(resp_data['Errors'][0][1], '409 Conflict')
@ -575,8 +571,8 @@ class TestDelete(unittest.TestCase):
data = '/c/f\nc/' + ('1' * bulk.MAX_PATH_LENGTH) + '\n/c/f'
req.environ['wsgi.input'] = StringIO(data)
req.headers['Transfer-Encoding'] = 'chunked'
resp = self.bulk.handle_delete(req)
resp_data = json.loads(resp.body)
resp_body = self.handle_delete_and_iter(req)
resp_data = json.loads(resp_body)
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Errors'][0][1], '400 Bad Request')
@ -584,12 +580,8 @@ class TestDelete(unittest.TestCase):
body = '/c/f\nc/' + ('123456' * bulk.MAX_PATH_LENGTH) + '\n'
req = Request.blank('/delete_works/AUTH_Acc', body=body)
req.method = 'DELETE'
try:
self.bulk.handle_delete(req)
except HTTPException, err:
self.assertEquals(err.status_int, 400)
else:
self.fail('400 not raised')
resp_body = self.handle_delete_and_iter(req)
self.assertTrue('400 Bad Request' in resp_body)
if __name__ == '__main__':
unittest.main()

View File

@ -351,7 +351,8 @@ class TestStaticLargeObject(unittest.TestCase):
req = Request.blank(
'/test_delete/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
app_iter = self.slo(req.environ, fake_start_response)
list(app_iter) # iterate through whole response
self.assertEquals(self.app.calls, 4)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete/A/c/man'),
@ -383,7 +384,8 @@ class TestStaticLargeObject(unittest.TestCase):
req = Request.blank(
'/test_delete_bad/A/c/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
app_iter = self.slo(req.environ, fake_start_response)
list(app_iter) # iterate through whole response
self.assertEquals(self.app.calls, 2)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad/A/c/man'),

View File

@ -40,7 +40,7 @@ from swift.account import server as account_server
from swift.container import server as container_server
from swift.obj import server as object_server
from swift.common import ring
from swift.common.exceptions import ChunkReadTimeout
from swift.common.exceptions import ChunkReadTimeout, SloSegmentError
from swift.common.constraints import MAX_META_NAME_LENGTH, \
MAX_META_VALUE_LENGTH, MAX_META_COUNT, MAX_META_OVERALL_SIZE, \
MAX_FILE_SIZE, MAX_ACCOUNT_NAME_LENGTH, MAX_CONTAINER_NAME_LENGTH
@ -1204,9 +1204,11 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'Aa') # dropped connection
self.assertEqual(resp.content_length, 4) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertRaises(SloSegmentError, lambda: resp.body)
# dropped connection, exception is caught by eventlet as it is
# iterating over response
self.assertEqual(
requested,
@ -1260,9 +1262,11 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'Aa') # dropped connection
self.assertEqual(resp.content_length, 4) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertRaises(SloSegmentError, lambda: resp.body)
# dropped connection, exception is caught by eventlet as it is
# iterating over response
self.assertEqual(
requested,
@ -1320,9 +1324,11 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/manifest')
resp = controller.GET(req)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, 'Aa') # dropped connection
self.assertEqual(resp.content_length, 6) # content incomplete
self.assertEqual(resp.content_type, 'text/html')
self.assertRaises(SloSegmentError, lambda: resp.body)
# dropped connection, exception is caught by eventlet as it is
# iterating over response
self.assertEqual(
requested,