Merge "Reorganize SLO unit tests"

This commit is contained in:
Jenkins 2013-11-27 21:09:41 +00:00 committed by Gerrit Code Review
commit 7ea00f30bc

View File

@ -14,155 +14,15 @@
# limitations under the License.
import unittest
from copy import deepcopy
from mock import patch
from hashlib import md5
from swift.common import swob
from swift.common.middleware import slo
from swift.common.utils import json
from swift.common.swob import Request, Response, HTTPException
from swift.common.utils import json, split_path
from swift.common.swob import Request, HTTPException
class FakeApp(object):
def __init__(self):
self.calls = 0
self.req_method_paths = []
def __call__(self, env, start_response):
self.calls += 1
if env['PATH_INFO'] == '/':
return Response(status=200, body='passed')(env, start_response)
if env['PATH_INFO'].startswith('/test_good/'):
j, v, a, cont, obj = env['PATH_INFO'].split('/')
if obj == 'a_2':
return Response(status=400)(env, start_response)
cont_len = 100
if obj == 'small_object':
cont_len = 10
headers = {'etag': 'etagoftheobjectsegment',
'Content-Length': cont_len}
if obj == 'slob':
headers['X-Static-Large-Object'] = 'true'
return Response(status=200, headers=headers)(env, start_response)
if env['PATH_INFO'].startswith('/test_good_check/'):
j, v, a, cont, obj = env['PATH_INFO'].split('/')
etag, size = obj.split('_')
last_mod = 'Fri, 01 Feb 2012 20:38:36 GMT'
if obj == 'a_1':
last_mod = ''
return Response(
status=200,
headers={'etag': etag, 'Last-Modified': last_mod,
'Content-Length': size})(env, start_response)
if env['PATH_INFO'].startswith('/test_get/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True',
'Content-Type': 'html;swift_bytes=55'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_get_broke_json/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data[:-5])(env, start_response)
if env['PATH_INFO'].startswith('/test_get_bad_json/'):
bad_data = json.dumps(
[{'name': '/c/a_1', 'something': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'bytes': '2'}])
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=bad_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_get_not_slo/'):
return Response(status=200, body='lalala')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_404/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
if env['PATH_INFO'].endswith('/c/man_404'):
return Response(status=404)(env, start_response)
if env['PATH_INFO'].endswith('/c/a_1'):
return Response(status=404)(env, start_response)
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_nested/'):
nested_data = json.dumps(
[{'name': '/b/b_2', 'hash': 'a', 'bytes': '1'},
{'name': '/c/c_3', 'hash': 'b', 'bytes': '2'}])
good_data = json.dumps(
[{'name': '/a/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/a/sub_nest', 'hash': 'a', 'sub_slo': True,
'bytes': len(nested_data)},
{'name': '/d/d_3', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
if 'sub_nest' in env['PATH_INFO']:
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=nested_data)(env, start_response)
else:
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_nested_404/'):
good_data = json.dumps(
[{'name': '/a/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/a/sub_nest', 'hash': 'a', 'bytes': '2',
'sub_slo': True},
{'name': '/d/d_3', 'hash': 'b', 'bytes': '3'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
if 'sub_nest' in env['PATH_INFO']:
return Response(status=404)(env, start_response)
else:
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_bad_json/'):
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body='bad json')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_bad_man/'):
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
return Response(status=200, body='')(env, start_response)
if env['PATH_INFO'].startswith('/test_delete_401/'):
good_data = json.dumps(
[{'name': '/c/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/d/b_2', 'hash': 'b', 'bytes': '2'}])
self.req_method_paths.append((env['REQUEST_METHOD'],
env['PATH_INFO']))
if env['PATH_INFO'].endswith('/d/b_2'):
return Response(status=401)(env, start_response)
return Response(status=200,
headers={'X-Static-Large-Object': 'True'},
body=good_data)(env, start_response)
test_xml_data = '''<?xml version="1.0" encoding="UTF-8"?>
<static_large_object>
<object_segment>
@ -181,25 +41,97 @@ def fake_start_response(*args, **kwargs):
pass
class TestStaticLargeObject(unittest.TestCase):
class FakeSwift(object):
def __init__(self):
self.calls = []
self.req_method_paths = []
self.uploaded = {}
# mapping of (method, path) --> (response class, headers, body)
self._responses = {}
def __call__(self, env, start_response):
method = env['REQUEST_METHOD']
path = env['PATH_INFO']
_, acc, cont, obj = split_path(env['PATH_INFO'], 0, 4,
rest_with_last=True)
self.calls.append((method, path))
try:
resp_class, raw_headers, body = self._responses[(method, path)]
headers = swob.HeaderKeyDict(raw_headers)
except KeyError:
if method == 'GET' and obj and path in self.uploaded:
resp_class = swob.HTTPOk
headers, body = self.uploaded[path]
else:
raise
# simulate object PUT
if method == 'PUT' and obj:
input = env['wsgi.input'].read()
etag = md5(input).hexdigest()
headers.setdefault('Etag', etag)
headers.setdefault('Content-Length', len(input))
# keep it for subsequent GET requests later
self.uploaded[path] = (deepcopy(headers), input)
if "CONTENT_TYPE" in env:
self.uploaded[path][0]['Content-Type'] = env["CONTENT_TYPE"]
return resp_class(headers=headers, body=body)(env, start_response)
@property
def call_count(self):
return len(self.calls)
def register(self, method, path, response_class, headers, body):
self._responses[(method, path)] = (response_class, headers, body)
class SloTestCase(unittest.TestCase):
def setUp(self):
self.app = FakeApp()
self.app = FakeSwift()
self.slo = slo.filter_factory({})(self.app)
self.slo.min_segment_size = 1
def tearDown(self):
pass
def call_app(self, req, app=None):
if app is None:
app = self.app
status = [None]
headers = [None]
def start_response(s, h, ei=None):
status[0] = s
headers[0] = h
body = ''.join(app(req.environ, start_response))
return status[0], headers[0], body
def call_slo(self, req):
return self.call_app(req, app=self.slo)
class TestSloMiddleware(SloTestCase):
def setUp(self):
super(TestSloMiddleware, self).setUp()
self.app.register(
'GET', '/', swob.HTTPOk, {}, 'passed')
self.app.register(
'PUT', '/', swob.HTTPOk, {}, 'passed')
def test_handle_multipart_no_obj(self):
req = Request.blank('/')
resp_iter = self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.calls, [('GET', '/')])
self.assertEquals(''.join(resp_iter), 'passed')
def test_slo_header_assigned(self):
req = Request.blank(
'/v/a/c/o', headers={'x-static-large-object': "true"})
'/v1/a/c/o', headers={'x-static-large-object': "true"})
resp = self.slo(req.environ, fake_start_response)
self.assert_(
resp[0].startswith('X-Static-Large-Object is a reserved header'))
@ -212,11 +144,62 @@ class TestStaticLargeObject(unittest.TestCase):
self.assertEquals('/cont/object',
slo.parse_input(data)[0]['path'])
bad_data = json.dumps([{'path': '/cont/object', 'size_bytes': 100}])
self.assertRaises(HTTPException, slo.parse_input, bad_data)
class TestSloPutManifest(SloTestCase):
def setUp(self):
super(TestSloPutManifest, self).setUp()
self.app.register(
'GET', '/', swob.HTTPOk, {}, 'passed')
self.app.register(
'PUT', '/', swob.HTTPOk, {}, 'passed')
self.app.register(
'HEAD', '/v1/AUTH_test/cont/object',
swob.HTTPOk,
{'Content-Length': '100', 'Etag': 'etagoftheobjectsegment'},
None)
self.app.register(
'HEAD', '/v1/AUTH_test/cont/object\xe2\x99\xa1',
swob.HTTPOk,
{'Content-Length': '100', 'Etag': 'etagoftheobjectsegment'},
None)
self.app.register(
'HEAD', '/v1/AUTH_test/cont/small_object',
swob.HTTPOk,
{'Content-Length': '10', 'Etag': 'etagoftheobjectsegment'},
None)
self.app.register(
'PUT', '/v1/AUTH_test/c/man', swob.HTTPCreated, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/c/man', swob.HTTPNoContent, {}, None)
self.app.register(
'HEAD', '/v1/AUTH_test/checktest/a_1',
swob.HTTPOk,
{'Content-Length': '1', 'Etag': 'a'},
None)
self.app.register(
'HEAD', '/v1/AUTH_test/checktest/badreq',
swob.HTTPBadRequest, {}, None)
self.app.register(
'HEAD', '/v1/AUTH_test/checktest/b_2',
swob.HTTPOk,
{'Content-Length': '2', 'Etag': 'b',
'Last-Modified': 'Fri, 01 Feb 2012 20:38:36 GMT'},
None)
self.app.register(
'HEAD', '/v1/AUTH_test/checktest/slob',
swob.HTTPOk,
{'X-Static-Large-Object': 'true', 'Etag': 'slob-etag'},
None)
self.app.register(
'PUT', '/v1/AUTH_test/checktest/man_3', swob.HTTPCreated, {}, None)
def test_put_manifest_too_quick_fail(self):
req = Request.blank('/v/a/c/o')
req = Request.blank('/v1/a/c/o')
req.content_length = self.slo.max_manifest_size + 1
try:
self.slo.handle_multipart_put(req, fake_start_response)
@ -225,7 +208,7 @@ class TestStaticLargeObject(unittest.TestCase):
self.assertEquals(e.status_int, 413)
with patch.object(self.slo, 'max_manifest_segments', 0):
req = Request.blank('/v/a/c/o', body=test_json_data)
req = Request.blank('/v1/a/c/o', body=test_json_data)
e = None
try:
self.slo.handle_multipart_put(req, fake_start_response)
@ -234,14 +217,14 @@ class TestStaticLargeObject(unittest.TestCase):
self.assertEquals(e.status_int, 413)
with patch.object(self.slo, 'min_segment_size', 1000):
req = Request.blank('/v/a/c/o', body=test_json_data)
req = Request.blank('/v1/a/c/o', body=test_json_data)
try:
self.slo.handle_multipart_put(req, fake_start_response)
except HTTPException as e:
pass
self.assertEquals(e.status_int, 400)
req = Request.blank('/v/a/c/o', headers={'X-Copy-From': 'lala'})
req = Request.blank('/v1/a/c/o', headers={'X-Copy-From': 'lala'})
try:
self.slo.handle_multipart_put(req, fake_start_response)
except HTTPException as e:
@ -258,7 +241,7 @@ class TestStaticLargeObject(unittest.TestCase):
def test_handle_multipart_put_success(self):
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
'/v1/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
@ -279,7 +262,7 @@ class TestStaticLargeObject(unittest.TestCase):
'etag': 'etagoftheobjectsegment',
'size_bytes': 10}])
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
'/v1/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
@ -291,13 +274,13 @@ class TestStaticLargeObject(unittest.TestCase):
'etag': 'etagoftheobjectsegment',
'size_bytes': 100}])
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
'/v1/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
body=test_json_data)
self.assertTrue('X-Static-Large-Object' not in req.headers)
self.slo(req.environ, fake_start_response)
self.assertTrue('X-Static-Large-Object' in req.headers)
self.assertTrue(req.environ['PATH_INFO'], '/cont/object\xe2\x99\xa4')
self.assertTrue(req.environ['PATH_INFO'], '/cont/object\xe2\x99\xa1')
def test_handle_multipart_put_no_xml(self):
req = Request.blank(
@ -322,9 +305,9 @@ class TestStaticLargeObject(unittest.TestCase):
'size_bytes': 100}]),
json.dumps('asdf'), json.dumps(None), json.dumps(5),
'not json', '1234', None, '', json.dumps({'path': None}),
json.dumps([{'path': '/c/o', 'etag': None,
json.dumps([{'path': '/cont/object', 'etag': None,
'size_bytes': 12}]),
json.dumps([{'path': '/c/o', 'etag': 'asdf',
json.dumps([{'path': '/cont/object', 'etag': 'asdf',
'size_bytes': 'sd'}]),
json.dumps([{'path': 12, 'etag': 'etagoftheobj',
'size_bytes': 100}]),
@ -335,22 +318,29 @@ class TestStaticLargeObject(unittest.TestCase):
json.dumps([{'path': None, 'etag': 'etagoftheobj',
'size_bytes': 100}])]:
req = Request.blank(
'/test_good/AUTH_test/c/man?multipart-manifest=put',
'/v1/AUTH_test/c/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
self.assertRaises(HTTPException, self.slo.handle_multipart_put,
req, fake_start_response)
def test_handle_multipart_put_check_data(self):
good_data = json.dumps(
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'}])
[{'path': '/checktest/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/checktest/b_2', 'etag': 'b', 'size_bytes': '2'}])
req = Request.blank(
'/test_good_check/A/c/man_3?multipart-manifest=put',
'/v1/AUTH_test/checktest/man_3?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
self.slo.handle_multipart_put(req, fake_start_response)
self.assertEquals(self.app.calls, 3)
self.assert_(req.environ['CONTENT_TYPE'].endswith(';swift_bytes=3'))
manifest_data = json.loads(req.environ['wsgi.input'].read())
status, headers, body = self.call_slo(req)
self.assertEquals(self.app.call_count, 3)
# go behind SLO's back and see what actually got stored
req = Request.blank(
'/v1/AUTH_test/checktest/man_3?multipart-manifest=get',
environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_app(req)
headers = dict(headers)
manifest_data = json.loads(body)
self.assert_(headers['Content-Type'].endswith(';swift_bytes=3'))
self.assertEquals(len(manifest_data), 2)
self.assertEquals(manifest_data[0]['hash'], 'a')
self.assertEquals(manifest_data[0]['bytes'], 1)
@ -359,49 +349,156 @@ class TestStaticLargeObject(unittest.TestCase):
def test_handle_multipart_put_check_data_bad(self):
bad_data = json.dumps(
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
{'path': '/c/a_2', 'etag': 'a', 'size_bytes': '1'},
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'},
{'path': '/d/slob', 'etag': 'a', 'size_bytes': '2'}])
[{'path': '/checktest/a_1', 'etag': 'a', 'size_bytes': '2'},
{'path': '/checktest/badreq', 'etag': 'a', 'size_bytes': '1'},
{'path': '/checktest/b_2', 'etag': 'not-b', 'size_bytes': '2'},
{'path': '/checktest/slob', 'etag': 'not-slob',
'size_bytes': '2'}])
req = Request.blank(
'/test_good/A/c/man?multipart-manifest=put',
'/v1/AUTH_test/checktest/man?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Accept': 'application/json'},
body=bad_data)
try:
self.slo.handle_multipart_put(req, fake_start_response)
except HTTPException as e:
self.assertEquals(self.app.calls, 4)
data = json.loads(e.body)
errors = data['Errors']
self.assertEquals(errors[0][0], '/c/a_1')
status, headers, body = self.call_slo(req)
self.assertEquals(self.app.call_count, 4)
errors = json.loads(body)['Errors']
self.assertEquals(len(errors), 5)
self.assertEquals(errors[0][0], '/checktest/a_1')
self.assertEquals(errors[0][1], 'Size Mismatch')
self.assertEquals(errors[2][0], '/c/a_2')
self.assertEquals(errors[2][1], '400 Bad Request')
self.assertEquals(errors[4][0], '/d/b_2')
self.assertEquals(errors[1][0], '/checktest/badreq')
self.assertEquals(errors[1][1], '400 Bad Request')
self.assertEquals(errors[2][0], '/checktest/b_2')
self.assertEquals(errors[2][1], 'Etag Mismatch')
self.assertEquals(errors[3][0], '/checktest/slob')
self.assertEquals(errors[3][1], 'Size Mismatch')
self.assertEquals(errors[4][0], '/checktest/slob')
self.assertEquals(errors[4][1], 'Etag Mismatch')
self.assertEquals(errors[-1][0], '/d/slob')
self.assertEquals(errors[-1][1], 'Etag Mismatch')
else:
self.assert_(False)
class TestSloDeleteManifest(SloTestCase):
def setUp(self):
super(TestSloDeleteManifest, self).setUp()
_submanifest_data = json.dumps(
[{'name': '/deltest/b_2', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/c_3', 'hash': 'b', 'bytes': '2'}])
self.app.register(
'GET', '/v1/AUTH_test/deltest/man_404',
swob.HTTPNotFound, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/man',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/gone', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/b_2', 'hash': 'b', 'bytes': '2'}]))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/man',
swob.HTTPNoContent, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/man-all-there',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/b_2', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/c_3', 'hash': 'b', 'bytes': '2'}]))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/gone',
swob.HTTPNotFound, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/a_1',
swob.HTTPOk, {'Content-Length': '1'}, 'a')
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/a_1',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/b_2',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/c_3',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/d_3',
swob.HTTPNoContent, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/manifest-with-submanifest',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/a_1',
'hash': 'a', 'bytes': '1'},
{'name': '/deltest/submanifest', 'sub_slo': True,
'hash': 'submanifest-etag',
'bytes': len(_submanifest_data)},
{'name': '/deltest/d_3',
'hash': 'd', 'bytes': '3'}]))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/manifest-with-submanifest',
swob.HTTPNoContent, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/submanifest',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
_submanifest_data)
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/submanifest',
swob.HTTPNoContent, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/manifest-missing-submanifest',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest/missing-submanifest',
'hash': 'a', 'bytes': '2', 'sub_slo': True},
{'name': '/deltest/d_3', 'hash': 'd', 'bytes': '3'}]))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/manifest-missing-submanifest',
swob.HTTPNoContent, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/missing-submanifest',
swob.HTTPNotFound, {}, None)
self.app.register(
'GET', '/v1/AUTH_test/deltest/manifest-badjson',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
"[not {json (at ++++all")
self.app.register(
'GET', '/v1/AUTH_test/deltest/manifest-with-unauth-segment',
swob.HTTPOk, {'Content-Type': 'application/json',
'X-Static-Large-Object': 'true'},
json.dumps([{'name': '/deltest/a_1', 'hash': 'a', 'bytes': '1'},
{'name': '/deltest-unauth/q_17',
'hash': '11', 'bytes': '17'}]))
self.app.register(
'DELETE', '/v1/AUTH_test/deltest/manifest-with-unauth-segment',
swob.HTTPNoContent, {}, None)
self.app.register(
'DELETE', '/v1/AUTH_test/deltest-unauth/q_17',
swob.HTTPUnauthorized, {}, None)
def test_handle_multipart_delete_man(self):
req = Request.blank(
'/test_good/A/c/man', environ={'REQUEST_METHOD': 'DELETE'})
'/v1/AUTH_test/deltest/man',
environ={'REQUEST_METHOD': 'DELETE'})
self.slo(req.environ, fake_start_response)
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.call_count, 1)
def test_handle_multipart_delete_whole_404(self):
req = Request.blank(
'/test_delete_404/A/c/man_404?multipart-manifest=delete',
'/v1/AUTH_test/deltest/man_404?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_404/A/c/man_404')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/man_404')])
self.assertEquals(resp_data['Response Status'], '200 OK')
self.assertEquals(resp_data['Response Body'], '')
self.assertEquals(resp_data['Number Deleted'], 0)
@ -410,68 +507,66 @@ class TestStaticLargeObject(unittest.TestCase):
def test_handle_multipart_delete_segment_404(self):
req = Request.blank(
'/test_delete_404/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/man?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 4)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_404/A/c/man'),
('DELETE', '/test_delete_404/A/c/a_1'),
('DELETE', '/test_delete_404/A/d/b_2'),
('DELETE', '/test_delete_404/A/c/man')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/man'),
('DELETE', '/v1/AUTH_test/deltest/gone'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/man')])
self.assertEquals(resp_data['Response Status'], '200 OK')
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Number Not Found'], 1)
def test_handle_multipart_delete_whole(self):
req = Request.blank(
'/test_delete/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/man-all-there?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
app_iter = self.slo(req.environ, fake_start_response)
list(app_iter) # iterate through whole response
self.assertEquals(self.app.calls, 4)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete/A/c/man'),
('DELETE', '/test_delete/A/c/a_1'),
('DELETE', '/test_delete/A/d/b_2'),
('DELETE', '/test_delete/A/c/man')])
self.call_slo(req)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/man-all-there'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/man-all-there')])
def test_handle_multipart_delete_nested(self):
req = Request.blank(
'/test_delete_nested/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/manifest-with-submanifest?' +
'multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE'})
app_iter = self.slo(req.environ, fake_start_response)
list(app_iter) # iterate through whole response
self.assertEquals(self.app.calls, 8)
self.call_slo(req)
self.assertEquals(
set(self.app.req_method_paths),
set([('GET', '/test_delete_nested/A/c/man'),
('GET', '/test_delete_nested/A/a/sub_nest'),
('DELETE', '/test_delete_nested/A/a/a_1'),
('DELETE', '/test_delete_nested/A/b/b_2'),
('DELETE', '/test_delete_nested/A/c/c_3'),
('DELETE', '/test_delete_nested/A/a/sub_nest'),
('DELETE', '/test_delete_nested/A/d/d_3'),
('DELETE', '/test_delete_nested/A/c/man')]))
set(self.app.calls),
set([('GET', '/v1/AUTH_test/deltest/manifest-with-submanifest'),
('GET', '/v1/AUTH_test/deltest/submanifest'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest/b_2'),
('DELETE', '/v1/AUTH_test/deltest/c_3'),
('DELETE', '/v1/AUTH_test/deltest/submanifest'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-with-submanifest')]))
def test_handle_multipart_delete_nested_404(self):
req = Request.blank(
'/test_delete_nested_404/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/manifest-missing-submanifest' +
'?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 5)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_nested_404/A/c/man'),
('DELETE', '/test_delete_nested_404/A/a/a_1'),
('GET', '/test_delete_nested_404/A/a/sub_nest'),
('DELETE', '/test_delete_nested_404/A/d/d_3'),
('DELETE', '/test_delete_nested_404/A/c/man')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('GET', '/v1/AUTH_test/deltest/' +
'missing-submanifest'),
('DELETE', '/v1/AUTH_test/deltest/d_3'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-missing-submanifest')])
self.assertEquals(resp_data['Response Status'], '200 OK')
self.assertEquals(resp_data['Response Body'], '')
self.assertEquals(resp_data['Number Deleted'], 3)
@ -480,60 +575,59 @@ class TestStaticLargeObject(unittest.TestCase):
def test_handle_multipart_delete_not_a_manifest(self):
req = Request.blank(
'/test_delete_bad_man/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/a_1?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad_man/A/c/man')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/a_1')])
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Response Body'], '')
self.assertEquals(resp_data['Number Deleted'], 0)
self.assertEquals(resp_data['Number Not Found'], 0)
self.assertEquals(resp_data['Errors'],
[['/c/man', 'Not an SLO manifest']])
[['/deltest/a_1', 'Not an SLO manifest']])
def test_handle_multipart_delete_bad_json(self):
req = Request.blank(
'/test_delete_bad_json/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/manifest-badjson?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 1)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_bad_json/A/c/man')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/manifest-badjson')])
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Response Body'], '')
self.assertEquals(resp_data['Number Deleted'], 0)
self.assertEquals(resp_data['Number Not Found'], 0)
self.assertEquals(resp_data['Errors'],
[['/c/man', 'Unable to load SLO manifest']])
[['/deltest/manifest-badjson',
'Unable to load SLO manifest']])
def test_handle_multipart_delete_401(self):
req = Request.blank(
'/test_delete_401/A/c/man?multipart-manifest=delete',
'/v1/AUTH_test/deltest/manifest-with-unauth-segment' +
'?multipart-manifest=delete',
environ={'REQUEST_METHOD': 'DELETE',
'HTTP_ACCEPT': 'application/json'})
app_iter = self.slo(req.environ, fake_start_response)
app_iter = list(app_iter) # iterate through whole response
resp_data = json.loads(app_iter[0])
self.assertEquals(self.app.calls, 4)
self.assertEquals(self.app.req_method_paths,
[('GET', '/test_delete_401/A/c/man'),
('DELETE', '/test_delete_401/A/c/a_1'),
('DELETE', '/test_delete_401/A/d/b_2'),
('DELETE', '/test_delete_401/A/c/man')])
status, response, body = self.call_slo(req)
resp_data = json.loads(body)
self.assertEquals(self.app.calls,
[('GET', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment'),
('DELETE', '/v1/AUTH_test/deltest/a_1'),
('DELETE', '/v1/AUTH_test/deltest-unauth/q_17'),
('DELETE', '/v1/AUTH_test/deltest/' +
'manifest-with-unauth-segment')])
self.assertEquals(resp_data['Response Status'], '400 Bad Request')
self.assertEquals(resp_data['Response Body'], '')
self.assertEquals(resp_data['Number Deleted'], 2)
self.assertEquals(resp_data['Number Not Found'], 0)
self.assertEquals(resp_data['Errors'],
[['/d/b_2', '401 Unauthorized']])
[['/deltest-unauth/q_17', '401 Unauthorized']])
if __name__ == '__main__':
unittest.main()