tests: use subclasses for S3Acl tests

We remove s3api.FakeSwift and replace it with the "normal" FakeSwift.
Additionally the @s3acl decorator is removed and replaced with an
inheritance based pattern.  This simplifies maintenance using more
familiar patterns and improves debugging.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I55b596a42af01870b49fda22800f7a1293163eb8
This commit is contained in:
Alistair Coles 2023-12-04 11:38:17 +00:00
parent 1c31973d33
commit b07d87c4be
14 changed files with 2086 additions and 2209 deletions

@ -119,13 +119,16 @@ class FakeSwift(object):
* received ``POST /v1/a/c/o?x=y``, if it matches a registered ``POST``,
will update uploaded ``/v1/a/c/o``
"""
ALLOWED_METHODS = [
DEFAULT_ALLOWED_METHODS = [
'PUT', 'POST', 'DELETE', 'GET', 'HEAD', 'OPTIONS', 'REPLICATE',
'SSYNC', 'UPDATE']
container_existence_skip_cache = 0.0
account_existence_skip_cache = 0.0
def __init__(self):
def __init__(self, allowed_methods=None):
self.allowed_methods = set(self.DEFAULT_ALLOWED_METHODS)
if allowed_methods:
self.allowed_methods.update(allowed_methods)
self._calls = []
self.req_bodies = []
self._unclosed_req_keys = defaultdict(int)
@ -136,6 +139,7 @@ class FakeSwift(object):
self.uploaded = {}
# mapping of (method, path) --> (response class, headers, body)
self._responses = {}
self._sticky_headers = {}
self.logger = debug_logger('fake-swift')
self.account_ring = FakeRing()
self.container_ring = FakeRing()
@ -192,7 +196,21 @@ class FakeSwift(object):
# HEAD resp never has body
body = None
return resp_class, HeaderKeyDict(headers), body
try:
is_success = resp_class().is_success
except Exception:
# test_reconciler passes in an exploding response
is_success = False
if is_success and method in ('GET', 'HEAD'):
# update sticky resp headers with headers from registered resp
sticky_headers = self._sticky_headers.get(env['PATH_INFO'], {})
resp_headers = HeaderKeyDict(sticky_headers)
resp_headers.update(headers)
else:
# error responses don't get sticky resp headers
resp_headers = HeaderKeyDict(headers)
return resp_class, resp_headers, body
def _get_policy_index(self, acc, cont):
path = '/v1/%s/%s' % (acc, cont)
@ -219,7 +237,7 @@ class FakeSwift(object):
def __call__(self, env, start_response):
method = env['REQUEST_METHOD']
if method not in self.ALLOWED_METHODS:
if method not in self.allowed_methods:
raise HTTPNotImplemented()
path, acc, cont, obj = self._parse_path(env)
@ -315,6 +333,9 @@ class FakeSwift(object):
return LeakTrackingIter(wsgi_iter, self.mark_closed,
self.mark_read, (method, path))
def clear_calls(self):
del self._calls[:]
def mark_opened(self, key):
self._unclosed_req_keys[key] += 1
self._unread_req_paths[key] += 1
@ -353,6 +374,14 @@ class FakeSwift(object):
def call_count(self):
return len(self._calls)
def update_sticky_response_headers(self, path, headers):
"""
Tests setUp can use this to ensure any successful GET/HEAD response for
a given path will include these headers.
"""
sticky_headers = self._sticky_headers.setdefault(path, {})
sticky_headers.update(headers)
def register(self, method, path, response_class, headers, body=b''):
path = normalize_path(path)
self._responses[(method, path)] = [(response_class, headers, body)]

@ -12,20 +12,24 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from datetime import datetime
import email
import mock
import time
from contextlib import contextmanager
from swift.common import swob
from swift.common.http import is_success
from swift.common.middleware.s3api.s3api import filter_factory
from swift.common.middleware.s3api.etree import fromstring
from swift.common.middleware.s3api.subresource import Owner, encode_acl, \
Grant, User, ACL, PERMISSIONS, AllUsers, AuthenticatedUsers
from test.debug_logger import debug_logger
from test.unit.common.middleware.s3api.helpers import FakeSwift
from test.unit.common.middleware.helpers import FakeSwift
class FakeApp(object):
@ -33,8 +37,9 @@ class FakeApp(object):
account_existence_skip_cache = 0.0
def __init__(self):
self.remote_user = 'authorized'
self._pipeline_final_app = self
self.swift = FakeSwift()
self.swift = FakeSwift(allowed_methods=['TEST'])
self.logger = debug_logger()
def _update_s3_path_info(self, env):
@ -50,26 +55,38 @@ class FakeApp(object):
path = env['PATH_INFO']
env['PATH_INFO'] = path.replace(tenant_user, 'AUTH_' + tenant)
def __call__(self, env, start_response):
@staticmethod
def authorize_cb(req):
# Assume swift owner, if not yet set
req.environ.setdefault('swift_owner', True)
# But then default to blocking authz, to ensure we've replaced
# the default auth system
return swob.HTTPForbidden(request=req)
def handle(self, env):
if 's3api.auth_details' in env:
self._update_s3_path_info(env)
else:
return
if self.remote_user:
env['REMOTE_USER'] = self.remote_user
if env['REQUEST_METHOD'] == 'TEST':
env['swift.authorize'] = self.authorize_cb
else:
env['swift.authorize'] = lambda req: None
def authorize_cb(req):
# Assume swift owner, if not yet set
req.environ.setdefault('REMOTE_USER', 'authorized')
req.environ.setdefault('swift_owner', True)
# But then default to blocking authz, to ensure we've replaced
# the default auth system
return swob.HTTPForbidden(request=req)
env['swift.authorize'] = authorize_cb
if 'swift.authorize_override' in env:
return
def __call__(self, env, start_response):
self.handle(env)
return self.swift(env, start_response)
class S3ApiTestCase(unittest.TestCase):
def __init__(self, name):
unittest.TestCase.__init__(self, name)
@ -100,6 +117,11 @@ class S3ApiTestCase(unittest.TestCase):
self.s3api = filter_factory({}, **self.conf)(self.app)
self.logger = self.s3api.logger = self.swift.logger = debug_logger()
# if you change the registered acl response for /bucket or
# /bucket/object tearDown will complain at you; you can set this to
# True in order to indicate you know what you're doing
self.s3acl_response_modified = False
self.swift.register('HEAD', '/v1/AUTH_test',
swob.HTTPOk, {}, None)
self.swift.register('HEAD', '/v1/AUTH_test/bucket',
@ -110,7 +132,6 @@ class S3ApiTestCase(unittest.TestCase):
swob.HTTPNoContent, {}, None)
self.swift.register('DELETE', '/v1/AUTH_test/bucket',
swob.HTTPNoContent, {}, None)
self.swift.register('GET', '/v1/AUTH_test/bucket/object',
swob.HTTPOk, {'etag': 'object etag'}, "")
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
@ -135,7 +156,7 @@ class S3ApiTestCase(unittest.TestCase):
# register bucket HEAD response with given policy index header
headers = {'X-Backend-Storage-Policy-Index': str(bucket_policy_index)}
self.swift.register('HEAD', '/v1/AUTH_test/' + bucket,
swob.HTTPNoContent, headers, None)
swob.HTTPNoContent, headers)
def _assert_policy_index(self, req_headers, resp_headers, policy_index):
self.assertNotIn('X-Backend-Storage-Policy-Index', req_headers)
@ -213,5 +234,114 @@ class S3ApiTestCase(unittest.TestCase):
else:
return status[0], headers[0], body
@contextmanager
def stubbed_container_info(self, versioning_enabled=False):
"""
some tests might want to opt-out of container_info HEAD requests; e.g.
with self.stubbed_container_info():
status, headers, body = self.call_s3api(req)
"""
fake_info = {'status': 204}
if versioning_enabled:
fake_info['sysmeta'] = {
'versions-container': '\x00versions\x00bucket',
}
with mock.patch('swift.common.middleware.s3api.s3request.'
'get_container_info', return_value=fake_info):
yield
def call_s3api(self, req, **kwargs):
return self.call_app(req, app=self.s3api, **kwargs)
def _gen_test_headers(owner, grants=[], resource='container'):
if not grants:
grants = [Grant(User('test:tester'), 'FULL_CONTROL')]
return encode_acl(resource, ACL(owner, grants))
def _gen_grant(permission):
# generate Grant with a grantee named by "permission"
account_name = '%s:%s' % ('test', permission.lower())
return Grant(User(account_name), permission)
class S3ApiTestCaseAcl(S3ApiTestCase):
def setUp(self):
super(S3ApiTestCaseAcl, self).setUp()
self.s3api.conf.s3_acl = True
# some extra buckets for s3acl tests
buckets = ['bucket', 'public', 'authenticated']
for bucket in buckets:
path = '/v1/AUTH_test/' + bucket
self.swift.register('HEAD', path, swob.HTTPNoContent, {}, None),
self.swift.register('GET', path, swob.HTTPOk, {}, json.dumps([])),
for account in ('AUTH_test', 'AUTH_X'):
self.swift.register('TEST', '/v1/' + account,
swob.HTTPMethodNotAllowed, {}, None)
# setup sticky ACL headers...
grants = [_gen_grant(perm) for perm in PERMISSIONS]
self.default_owner = Owner('test:tester', 'test:tester')
container_headers = _gen_test_headers(self.default_owner, grants)
object_headers = _gen_test_headers(
self.default_owner, grants, 'object')
public_headers = _gen_test_headers(
self.default_owner, [Grant(AllUsers(), 'READ')])
authenticated_headers = _gen_test_headers(
self.default_owner, [Grant(AuthenticatedUsers(), 'READ')],
'bucket')
sticky_s3acl_headers = {
'/v1/AUTH_test/bucket': container_headers,
'/v1/AUTH_test/bucket+segments': container_headers,
'/v1/AUTH_test/bucket/object': object_headers,
'/v1/AUTH_test/public': public_headers,
'/v1/AUTH_test/authenticated': authenticated_headers,
}
for path, headers in sticky_s3acl_headers.items():
self.swift.update_sticky_response_headers(path, headers)
def tearDown(self):
# sanity the test didn't break the the ACLs
swift_path_acl_resp_checks = {
'/v1/AUTH_test/bucket': (
'X-Container-Sysmeta-S3api-Acl', '/bucket',
swob.HTTPNoContent),
'/v1/AUTH_test/bucket/object': (
'X-Object-Sysmeta-S3api-Acl', '/bucket/object', swob.HTTPOk),
}
check_paths = []
for swift_path, (acl, check, resp_class) in \
swift_path_acl_resp_checks.items():
if self.s3acl_response_modified:
# this is expected to reset back to the original sticky headers
self.swift.register('HEAD', swift_path, resp_class, {}, None)
req = swob.Request.blank(swift_path, method='HEAD')
status, headers, body = self.call_app(req)
if is_success(int(status.split()[0])):
self.assertIn(acl, headers,
'In tearDown it seems the test (accidently?) '
'removed the ACL on %s' % swift_path)
check_paths.append(check)
else:
self.fail('test changed resp for %s' % swift_path)
account_expected = {
'test:tester': 200,
'test:other': 403,
}
for account, expected in account_expected.items():
for path in check_paths:
req = swob.Request.blank(path, method='HEAD', headers={
'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(int(status.split()[0]), expected,
'In tearDown it seems the test (accidently?) '
'broke ACL access for %s to %s' % (
account, path))

@ -15,87 +15,6 @@
# This stuff can't live in test/unit/__init__.py due to its swob dependency.
from swift.common import swob
from swift.common.utils import split_path
from swift.common.request_helpers import is_sys_meta
from test.unit.common.middleware.helpers import FakeSwift as BaseFakeSwift
class FakeSwift(BaseFakeSwift):
"""
A good-enough fake Swift proxy server to use in testing middleware.
"""
ALLOWED_METHODS = BaseFakeSwift.ALLOWED_METHODS + ['TEST']
def __init__(self, s3_acl=False):
super(FakeSwift, self).__init__()
self.s3_acl = s3_acl
self.remote_user = 'authorized'
def _fake_auth_middleware(self, env):
if 'swift.authorize_override' in env:
return
if 's3api.auth_details' not in env:
return
tenant_user = env['s3api.auth_details']['access_key']
tenant, user = tenant_user.rsplit(':', 1)
path = env['PATH_INFO']
env['PATH_INFO'] = path.replace(tenant_user, 'AUTH_' + tenant)
if self.remote_user:
env['REMOTE_USER'] = self.remote_user
if env['REQUEST_METHOD'] == 'TEST':
def authorize_cb(req):
# Assume swift owner, if not yet set
req.environ.setdefault('swift_owner', True)
# But then default to blocking authz, to ensure we've replaced
# the default auth system
return swob.HTTPForbidden(request=req)
env['swift.authorize'] = authorize_cb
else:
env['swift.authorize'] = lambda req: None
def __call__(self, env, start_response):
if self.s3_acl:
self._fake_auth_middleware(env)
return super(FakeSwift, self).__call__(env, start_response)
def register(self, method, path, response_class, headers, body):
# assuming the path format like /v1/account/container/object
resource_map = ['account', 'container', 'object']
index = len(list(filter(None, split_path(path, 0, 4, True)[1:]))) - 1
resource = resource_map[index]
if (method, path) in self._responses:
old_headers = self._responses[(method, path)][0][1]
headers = headers.copy()
for key, value in old_headers.items():
if is_sys_meta(resource, key) and key not in headers:
# keep old sysmeta for s3acl
headers.update({key: value})
if body is not None and not isinstance(body, (bytes, list)):
body = body.encode('utf8')
return super(FakeSwift, self).register(
method, path, response_class, headers, body)
def register_unconditionally(self, method, path, response_class, headers,
body):
# register() keeps old sysmeta around, but
# register_unconditionally() keeps nothing.
if body is not None and not isinstance(body, bytes):
body = body.encode('utf8')
self._responses[(method, path)] = [(response_class, headers, body)]
def clear_calls(self):
del self._calls[:]
class UnreadableInput(object):
# Some clients will send neither a Content-Length nor a Transfer-Encoding

@ -26,15 +26,14 @@ from swift.common.middleware.s3api.s3response import InvalidArgument
from swift.common.middleware.s3api.acl_utils import handle_acl_header
from swift.common.utils import md5
from test.unit.common.middleware.s3api import S3ApiTestCase
from test.unit.common.middleware.s3api import S3ApiTestCase, S3ApiTestCaseAcl
from test.unit.common.middleware.s3api.helpers import UnreadableInput
from test.unit.common.middleware.s3api.test_s3_acl import s3acl
class TestS3ApiAcl(S3ApiTestCase):
class BaseS3ApiAcl(object):
def setUp(self):
super(TestS3ApiAcl, self).setUp()
super(BaseS3ApiAcl, self).setUp()
# All ACL API should be called against to existing bucket.
self.swift.register('PUT', '/v1/AUTH_test/bucket',
HTTPAccepted, {}, None)
@ -46,7 +45,6 @@ class TestS3ApiAcl(S3ApiTestCase):
name = elem.find('./AccessControlList/Grant/Grantee/ID').text
self.assertEqual(name, owner)
@s3acl
def test_bucket_acl_GET(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'GET'},
@ -58,6 +56,55 @@ class TestS3ApiAcl(S3ApiTestCase):
self.assertSetEqual(set((('HEAD', '/v1/AUTH_test/bucket'),)),
set(self.swift.calls))
def _test_put_no_body(self, use_content_length=False,
use_transfer_encoding=False, string_to_md5=b''):
content_md5 = base64.b64encode(
md5(string_to_md5, usedforsecurity=False).digest()).strip()
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket?acl',
environ={
'REQUEST_METHOD': 'PUT',
'wsgi.input': fake_input},
headers={
'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body='')
if not use_content_length:
req.environ.pop('CONTENT_LENGTH')
if use_transfer_encoding:
req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader')
self.assertEqual(self._get_error_message(body),
'Your request was missing a required header.')
self.assertIn(b'<MissingHeaderName>x-amz-acl</MissingHeaderName>',
body)
def test_bucket_fails_with_neither_acl_header_nor_xml_PUT(self):
self._test_put_no_body()
self._test_put_no_body(string_to_md5=b'test')
self._test_put_no_body(use_content_length=True)
self._test_put_no_body(use_content_length=True, string_to_md5=b'test')
self._test_put_no_body(use_transfer_encoding=True)
self._test_put_no_body(use_transfer_encoding=True, string_to_md5=b'zz')
def test_object_acl_GET(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
if not self.s3api.conf.s3_acl:
self._check_acl('test:tester', body)
self.assertSetEqual(set((('HEAD', '/v1/AUTH_test/bucket/object'),)),
set(self.swift.calls))
class TestS3ApiAclNoSetup(BaseS3ApiAcl, S3ApiTestCase):
def test_bucket_acl_PUT(self):
elem = Element('AccessControlPolicy')
owner = SubElement(elem, 'Owner')
@ -99,19 +146,6 @@ class TestS3ApiAcl(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
@s3acl(s3acl_only=True)
def test_bucket_canned_acl_PUT_with_s3acl(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'X-AMZ-ACL': 'public-read'})
with mock.patch('swift.common.middleware.s3api.s3request.'
'handle_acl_header') as mock_handler:
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
self.assertEqual(mock_handler.call_count, 0)
def test_bucket_fails_with_both_acl_header_and_xml_PUT(self):
elem = Element('AccessControlPolicy')
owner = SubElement(elem, 'Owner')
@ -135,54 +169,6 @@ class TestS3ApiAcl(S3ApiTestCase):
self.assertEqual(self._get_error_code(body),
'UnexpectedContent')
def _test_put_no_body(self, use_content_length=False,
use_transfer_encoding=False, string_to_md5=b''):
content_md5 = base64.b64encode(
md5(string_to_md5, usedforsecurity=False).digest()).strip()
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket?acl',
environ={
'REQUEST_METHOD': 'PUT',
'wsgi.input': fake_input},
headers={
'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body='')
if not use_content_length:
req.environ.pop('CONTENT_LENGTH')
if use_transfer_encoding:
req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader')
self.assertEqual(self._get_error_message(body),
'Your request was missing a required header.')
self.assertIn(b'<MissingHeaderName>x-amz-acl</MissingHeaderName>',
body)
@s3acl
def test_bucket_fails_with_neither_acl_header_nor_xml_PUT(self):
self._test_put_no_body()
self._test_put_no_body(string_to_md5=b'test')
self._test_put_no_body(use_content_length=True)
self._test_put_no_body(use_content_length=True, string_to_md5=b'test')
self._test_put_no_body(use_transfer_encoding=True)
self._test_put_no_body(use_transfer_encoding=True, string_to_md5=b'zz')
@s3acl
def test_object_acl_GET(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
if not self.s3api.conf.s3_acl:
self._check_acl('test:tester', body)
self.assertSetEqual(set((('HEAD', '/v1/AUTH_test/bucket/object'),)),
set(self.swift.calls))
def test_invalid_xml(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
@ -210,7 +196,30 @@ class TestS3ApiAcl(S3ApiTestCase):
[('X-Container-Read', '.'),
('X-Container-Write', '.')])
@s3acl(s3acl_only=True)
def test_handle_acl_with_invalid_header_string(self):
req = Request.blank('/bucket', headers={'X-Amz-Acl': 'invalid'})
with self.assertRaises(InvalidArgument) as cm:
handle_acl_header(req)
self.assertTrue('argument_name' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_name'], 'x-amz-acl')
self.assertTrue('argument_value' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_value'], 'invalid')
class TestS3ApiAclCommonSetup(BaseS3ApiAcl, S3ApiTestCaseAcl):
def test_bucket_canned_acl_PUT_with_s3acl(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'X-AMZ-ACL': 'public-read'})
with mock.patch('swift.common.middleware.s3api.s3request.'
'handle_acl_header') as mock_handler:
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
self.assertEqual(mock_handler.call_count, 0)
def test_handle_acl_header_with_s3acl(self):
def check_generated_acl_header(acl, targets):
req = Request.blank('/bucket',
@ -227,15 +236,6 @@ class TestS3ApiAcl(S3ApiTestCase):
check_generated_acl_header('private',
['X-Container-Read', 'X-Container-Write'])
def test_handle_acl_with_invalid_header_string(self):
req = Request.blank('/bucket', headers={'X-Amz-Acl': 'invalid'})
with self.assertRaises(InvalidArgument) as cm:
handle_acl_header(req)
self.assertTrue('argument_name' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_name'], 'x-amz-acl')
self.assertTrue('argument_value' in cm.exception.info)
self.assertEqual(cm.exception.info['argument_value'], 'invalid')
if __name__ == '__main__':
unittest.main()

@ -33,15 +33,14 @@ from swift.common.middleware.s3api.subresource import Owner, encode_acl, \
from swift.common.middleware.s3api.s3request import MAX_32BIT_INT
from test.unit.common.middleware.helpers import normalize_path
from test.unit.common.middleware.s3api import S3ApiTestCase
from test.unit.common.middleware.s3api.test_s3_acl import s3acl
from test.unit.common.middleware.s3api import S3ApiTestCase, S3ApiTestCaseAcl
from test.unit.common.middleware.s3api.helpers import UnreadableInput
# Example etag from ProxyFS; note that it is already quote-wrapped
PFS_ETAG = '"pfsv2/AUTH_test/01234567/89abcdef-32"'
class TestS3ApiBucket(S3ApiTestCase):
class BaseS3ApiBucket(object):
def setup_objects(self):
self.objects = (('lily', '2011-01-05T02:19:14.275290', '0', '3909'),
(u'lily-\u062a', '2011-01-05T02:19:14.275290', 0, 390),
@ -127,9 +126,327 @@ class TestS3ApiBucket(S3ApiTestCase):
]))
def setUp(self):
super(TestS3ApiBucket, self).setUp()
super(BaseS3ApiBucket, self).setUp()
self.setup_objects()
def _add_versions_request(self, orig_objects=None, versioned_objects=None,
bucket='junk'):
if orig_objects is None:
orig_objects = self.objects_list
if versioned_objects is None:
versioned_objects = self.versioned_objects
all_versions = versioned_objects + [
dict(i, version_id='null', is_latest=True)
for i in orig_objects]
all_versions.sort(key=lambda o: (
o['name'], '' if o['version_id'] == 'null' else o['version_id']))
self.swift.register(
'GET', '/v1/AUTH_test/%s' % bucket, swob.HTTPOk,
{'Content-Type': 'application/json'}, json.dumps(all_versions))
def _assert_delete_markers(self, elem):
delete_markers = elem.findall('./DeleteMarker')
self.assertEqual(len(delete_markers), 1)
self.assertEqual(delete_markers[0].find('./IsLatest').text, 'false')
self.assertEqual(delete_markers[0].find('./VersionId').text, '2')
self.assertEqual(delete_markers[0].find('./Key').text, 'rose')
def _test_bucket_PUT_with_location(self, root_element):
elem = Element(root_element)
SubElement(elem, 'LocationConstraint').text = 'us-east-1'
xml = tostring(elem)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def _test_method_error_delete(self, path, sw_resp):
self.swift.register('HEAD', '/v1/AUTH_test' + path, sw_resp, {}, None)
return self._test_method_error('DELETE', path, sw_resp)
def test_bucket_GET_error(self):
code = self._test_method_error('GET', '/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('GET', '/bucket', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('GET', '/bucket', swob.HTTPNotFound)
self.assertEqual(code, 'NoSuchBucket')
code = self._test_method_error('GET', '/bucket',
swob.HTTPServiceUnavailable)
self.assertEqual(code, 'ServiceUnavailable')
code = self._test_method_error('GET', '/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
def test_bucket_GET_non_json(self):
# Suppose some middleware accidentally makes it return txt instead
resp_body = b'\n'.join([b'obj%d' % i for i in range(100)])
self.swift.register('GET', '/v1/AUTH_test/bucket', swob.HTTPOk, {},
resp_body)
# When we do our GET...
req = Request.blank('/bucket',
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
# ...there isn't much choice but to error...
self.assertEqual(self._get_error_code(body), 'InternalError')
# ... but we should at least log the body to aid in debugging
self.assertIn(
'Got non-JSON response trying to list /bucket: %r'
% (resp_body[:60] + b'...'),
self.s3api.logger.get_lines_for_level('error'))
def test_bucket_PUT_error(self):
code = self._test_method_error('PUT', '/bucket', swob.HTTPCreated,
headers={'Content-Length': 'a'})
self.assertEqual(code, 'InvalidArgument')
code = self._test_method_error('PUT', '/bucket', swob.HTTPCreated,
headers={'Content-Length': '-1'})
self.assertEqual(code, 'InvalidArgument')
code = self._test_method_error('PUT', '/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('PUT', '/bucket', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted)
self.assertEqual(code, 'BucketAlreadyOwnedByYou')
with mock.patch(
'swift.common.middleware.s3api.s3request.get_container_info',
return_value={'sysmeta': {'s3api-acl': '{"Owner": "nope"}'}}):
code = self._test_method_error(
'PUT', '/bucket', swob.HTTPAccepted)
self.assertEqual(code, 'BucketAlreadyExists')
code = self._test_method_error('PUT', '/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
code = self._test_method_error(
'PUT', '/bucket', swob.HTTPServiceUnavailable)
self.assertEqual(code, 'ServiceUnavailable')
code = self._test_method_error(
'PUT', '/bucket+bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/192.168.11.1', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/bucket.-bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/bucket-.bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error('PUT', '/bucket*', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error('PUT', '/b', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/%s' % ''.join(['b' for x in range(64)]),
swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
def test_bucket_PUT_bucket_already_owned_by_you(self):
self.swift.register(
'PUT', '/v1/AUTH_test/bucket', swob.HTTPAccepted,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '409 Conflict')
self.assertIn(b'BucketAlreadyOwnedByYou', body)
def test_bucket_PUT_first_put_fail(self):
self.swift.register(
'PUT', '/v1/AUTH_test/bucket',
swob.HTTPServiceUnavailable,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '503 Service Unavailable')
# The last call was PUT not POST for acl set
self.assertEqual(self.swift.calls, [
('PUT', '/v1/AUTH_test/bucket'),
])
def test_bucket_PUT(self):
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
# Apparently some clients will include a chunked transfer-encoding
# even with no body
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Transfer-Encoding': 'chunked'})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT',
'wsgi.input': fake_input},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
def test_bucket_PUT_with_location(self):
self._test_bucket_PUT_with_location('CreateBucketConfiguration')
def test_bucket_PUT_with_ami_location(self):
# ec2-ami-tools apparently uses CreateBucketConstraint instead?
self._test_bucket_PUT_with_location('CreateBucketConstraint')
def test_bucket_PUT_with_strange_location(self):
# Even crazier: it doesn't seem to matter
self._test_bucket_PUT_with_location('foo')
def test_bucket_PUT_with_location_error(self):
elem = Element('CreateBucketConfiguration')
SubElement(elem, 'LocationConstraint').text = 'XXX'
xml = tostring(elem)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body),
'InvalidLocationConstraint')
def test_bucket_PUT_with_location_invalid_xml(self):
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body='invalid_xml')
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedXML')
def test_bucket_DELETE_error(self):
code = self._test_method_error_delete('/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error_delete('/bucket', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error_delete('/bucket', swob.HTTPNotFound)
self.assertEqual(code, 'NoSuchBucket')
code = self._test_method_error_delete('/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
# bucket not empty is now validated at s3api
self.swift._responses.get(('HEAD', '/v1/AUTH_test/bucket'))
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1'}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertNotIn('You must delete all versions in the bucket',
self._get_error_message(body))
def test_bucket_DELETE_error_with_enabled_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1',
'X-Container-Sysmeta-Versions-Enabled': 'True'},
None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertIn('You must delete all versions in the bucket',
self._get_error_message(body))
def test_bucket_DELETE_error_with_suspended_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1',
'X-Container-Sysmeta-Versions-Enabled': 'False'},
None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertIn('You must delete all versions in the bucket',
self._get_error_message(body))
def test_bucket_DELETE(self):
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '204')
def test_bucket_DELETE_with_empty_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket+versioning',
swob.HTTPNoContent, {}, None)
self.swift.register('DELETE', '/v1/AUTH_test/bucket+versioning',
swob.HTTPNoContent, {}, None)
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '204')
def test_bucket_DELETE_error_while_segment_bucket_delete(self):
# An error occurred while deleting segment objects
self.swift.register('DELETE', '/v1/AUTH_test/bucket+segments/lily',
swob.HTTPServiceUnavailable, {}, json.dumps([]))
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '503')
called = [(method, path) for method, path, _ in
self.swift.calls_with_headers]
# Don't delete original bucket when error occurred in segment container
self.assertNotIn(('DELETE', '/v1/AUTH_test/bucket'), called)
class TestS3ApiBucketNoACL(BaseS3ApiBucket, S3ApiTestCase):
def test_bucket_HEAD(self):
req = Request.blank('/junk',
environ={'REQUEST_METHOD': 'HEAD'},
@ -194,39 +511,6 @@ class TestS3ApiBucket(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '404')
@s3acl
def test_bucket_GET_error(self):
code = self._test_method_error('GET', '/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('GET', '/bucket', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('GET', '/bucket', swob.HTTPNotFound)
self.assertEqual(code, 'NoSuchBucket')
code = self._test_method_error('GET', '/bucket',
swob.HTTPServiceUnavailable)
self.assertEqual(code, 'ServiceUnavailable')
code = self._test_method_error('GET', '/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
@s3acl
def test_bucket_GET_non_json(self):
# Suppose some middleware accidentally makes it return txt instead
resp_body = b'\n'.join([b'obj%d' % i for i in range(100)])
self.swift.register('GET', '/v1/AUTH_test/bucket', swob.HTTPOk, {},
resp_body)
# When we do our GET...
req = Request.blank('/bucket',
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
# ...there isn't much choice but to error...
self.assertEqual(self._get_error_code(body), 'InternalError')
# ... but we should at least log the body to aid in debugging
self.assertIn(
'Got non-JSON response trying to list /bucket: %r'
% (resp_body[:60] + b'...'),
self.s3api.logger.get_lines_for_level('error'))
def test_bucket_GET(self):
bucket_name = 'junk'
req = Request.blank('/%s' % bucket_name,
@ -765,28 +1049,6 @@ class TestS3ApiBucket(S3ApiTestCase):
self.assertEqual([v.find('./StorageClass').text for v in versions],
['STANDARD' for v in objects])
def _add_versions_request(self, orig_objects=None, versioned_objects=None,
bucket='junk'):
if orig_objects is None:
orig_objects = self.objects_list
if versioned_objects is None:
versioned_objects = self.versioned_objects
all_versions = versioned_objects + [
dict(i, version_id='null', is_latest=True)
for i in orig_objects]
all_versions.sort(key=lambda o: (
o['name'], '' if o['version_id'] == 'null' else o['version_id']))
self.swift.register(
'GET', '/v1/AUTH_test/%s' % bucket, swob.HTTPOk,
{'Content-Type': 'application/json'}, json.dumps(all_versions))
def _assert_delete_markers(self, elem):
delete_markers = elem.findall('./DeleteMarker')
self.assertEqual(len(delete_markers), 1)
self.assertEqual(delete_markers[0].find('./IsLatest').text, 'false')
self.assertEqual(delete_markers[0].find('./VersionId').text, '2')
self.assertEqual(delete_markers[0].find('./Key').text, 'rose')
def test_bucket_GET_with_versions(self):
self._add_versions_request()
req = Request.blank('/junk?versions',
@ -1205,150 +1467,6 @@ class TestS3ApiBucket(S3ApiTestCase):
'?limit=1001&prefix=subdir/&versions=')),
])
@s3acl
def test_bucket_PUT_error(self):
code = self._test_method_error('PUT', '/bucket', swob.HTTPCreated,
headers={'Content-Length': 'a'})
self.assertEqual(code, 'InvalidArgument')
code = self._test_method_error('PUT', '/bucket', swob.HTTPCreated,
headers={'Content-Length': '-1'})
self.assertEqual(code, 'InvalidArgument')
code = self._test_method_error('PUT', '/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('PUT', '/bucket', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted)
self.assertEqual(code, 'BucketAlreadyOwnedByYou')
with mock.patch(
'swift.common.middleware.s3api.s3request.get_container_info',
return_value={'sysmeta': {'s3api-acl': '{"Owner": "nope"}'}}):
code = self._test_method_error(
'PUT', '/bucket', swob.HTTPAccepted)
self.assertEqual(code, 'BucketAlreadyExists')
code = self._test_method_error('PUT', '/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
code = self._test_method_error(
'PUT', '/bucket', swob.HTTPServiceUnavailable)
self.assertEqual(code, 'ServiceUnavailable')
code = self._test_method_error(
'PUT', '/bucket+bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/192.168.11.1', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/bucket.-bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/bucket-.bucket', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error('PUT', '/bucket*', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error('PUT', '/b', swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
code = self._test_method_error(
'PUT', '/%s' % ''.join(['b' for x in range(64)]),
swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
@s3acl(s3acl_only=True)
def test_bucket_PUT_error_non_swift_owner(self):
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted,
env={'swift_owner': False})
self.assertEqual(code, 'AccessDenied')
@s3acl
def test_bucket_PUT_bucket_already_owned_by_you(self):
self.swift.register(
'PUT', '/v1/AUTH_test/bucket', swob.HTTPAccepted,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '409 Conflict')
self.assertIn(b'BucketAlreadyOwnedByYou', body)
@s3acl
def test_bucket_PUT_first_put_fail(self):
self.swift.register(
'PUT', '/v1/AUTH_test/bucket',
swob.HTTPServiceUnavailable,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '503 Service Unavailable')
# The last call was PUT not POST for acl set
self.assertEqual(self.swift.calls, [
('PUT', '/v1/AUTH_test/bucket'),
])
@s3acl
def test_bucket_PUT(self):
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
# Apparently some clients will include a chunked transfer-encoding
# even with no body
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Transfer-Encoding': 'chunked'})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket',
environ={'REQUEST_METHOD': 'PUT',
'wsgi.input': fake_input},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(body, b'')
self.assertEqual(status.split()[0], '200')
self.assertEqual(headers['Location'], '/bucket')
def _test_bucket_PUT_with_location(self, root_element):
elem = Element(root_element)
SubElement(elem, 'LocationConstraint').text = 'us-east-1'
xml = tostring(elem)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
@s3acl
def test_bucket_PUT_with_location(self):
self._test_bucket_PUT_with_location('CreateBucketConfiguration')
@s3acl
def test_bucket_PUT_with_ami_location(self):
# ec2-ami-tools apparently uses CreateBucketConstraint instead?
self._test_bucket_PUT_with_location('CreateBucketConstraint')
@s3acl
def test_bucket_PUT_with_strange_location(self):
# Even crazier: it doesn't seem to matter
self._test_bucket_PUT_with_location('foo')
def test_bucket_PUT_with_mixed_case_location(self):
self.s3api.conf.location = 'RegionOne'
elem = Element('CreateBucketConfiguration')
@ -1385,7 +1503,8 @@ class TestS3ApiBucket(S3ApiTestCase):
self.assertEqual(headers.get('X-Container-Read'), '.r:*,.rlistings')
self.assertNotIn('X-Container-Sysmeta-S3api-Acl', headers)
@s3acl(s3acl_only=True)
class TestS3ApiBucketAcl(BaseS3ApiBucket, S3ApiTestCaseAcl):
def test_bucket_PUT_with_canned_s3acl(self):
account = 'test:tester'
acl = \
@ -1403,144 +1522,10 @@ class TestS3ApiBucket(S3ApiTestCase):
self.assertEqual(headers.get('X-Container-Sysmeta-S3api-Acl'),
acl['x-container-sysmeta-s3api-acl'])
@s3acl
def test_bucket_PUT_with_location_error(self):
elem = Element('CreateBucketConfiguration')
SubElement(elem, 'LocationConstraint').text = 'XXX'
xml = tostring(elem)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body),
'InvalidLocationConstraint')
@s3acl
def test_bucket_PUT_with_location_invalid_xml(self):
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body='invalid_xml')
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedXML')
def _test_method_error_delete(self, path, sw_resp):
self.swift.register('HEAD', '/v1/AUTH_test' + path, sw_resp, {}, None)
return self._test_method_error('DELETE', path, sw_resp)
@s3acl
def test_bucket_DELETE_error(self):
code = self._test_method_error_delete('/bucket', swob.HTTPUnauthorized)
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error_delete('/bucket', swob.HTTPForbidden)
def test_bucket_PUT_error_non_swift_owner(self):
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted,
env={'swift_owner': False})
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error_delete('/bucket', swob.HTTPNotFound)
self.assertEqual(code, 'NoSuchBucket')
code = self._test_method_error_delete('/bucket', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
# bucket not empty is now validated at s3api
self.swift._responses.get(('HEAD', '/v1/AUTH_test/bucket'))
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1'}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertNotIn('You must delete all versions in the bucket',
self._get_error_message(body))
@s3acl
def test_bucket_DELETE_error_with_enabled_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1',
'X-Container-Sysmeta-Versions-Enabled': 'True'},
None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertIn('You must delete all versions in the bucket',
self._get_error_message(body))
@s3acl
def test_bucket_DELETE_error_with_suspended_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': '1',
'X-Container-Sysmeta-Versions-Enabled': 'False'},
None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, _headers, body = self.call_s3api(req)
self.assertEqual('409 Conflict', status)
self.assertEqual('BucketNotEmpty', self._get_error_code(body))
self.assertIn('You must delete all versions in the bucket',
self._get_error_message(body))
@s3acl
def test_bucket_DELETE(self):
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '204')
@s3acl
def test_bucket_DELETE_with_empty_versioning(self):
self.swift.register('HEAD', '/v1/AUTH_test/bucket+versioning',
swob.HTTPNoContent, {}, None)
self.swift.register('DELETE', '/v1/AUTH_test/bucket+versioning',
swob.HTTPNoContent, {}, None)
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '204')
@s3acl
def test_bucket_DELETE_error_while_segment_bucket_delete(self):
# An error occurred while deleting segment objects
self.swift.register('DELETE', '/v1/AUTH_test/bucket+segments/lily',
swob.HTTPServiceUnavailable, {}, json.dumps([]))
# overwrite default HEAD to return x-container-object-count
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
{'X-Container-Object-Count': 0}, None)
req = Request.blank('/bucket',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '503')
called = [(method, path) for method, path, _ in
self.swift.calls_with_headers]
# Don't delete original bucket when error occurred in segment container
self.assertNotIn(('DELETE', '/v1/AUTH_test/bucket'), called)
def _test_bucket_for_s3acl(self, method, account):
req = Request.blank('/bucket',
@ -1550,25 +1535,21 @@ class TestS3ApiBucket(S3ApiTestCase):
return self.call_s3api(req)
@s3acl(s3acl_only=True)
def test_bucket_GET_without_permission(self):
status, headers, body = self._test_bucket_for_s3acl('GET',
'test:other')
self.assertEqual(self._get_error_code(body), 'AccessDenied')
@s3acl(s3acl_only=True)
def test_bucket_GET_with_read_permission(self):
status, headers, body = self._test_bucket_for_s3acl('GET',
'test:read')
self.assertEqual(status.split()[0], '200')
@s3acl(s3acl_only=True)
def test_bucket_GET_with_fullcontrol_permission(self):
status, headers, body = \
self._test_bucket_for_s3acl('GET', 'test:full_control')
self.assertEqual(status.split()[0], '200')
@s3acl(s3acl_only=True)
def test_bucket_GET_with_owner_permission(self):
status, headers, body = self._test_bucket_for_s3acl('GET',
'test:tester')
@ -1582,18 +1563,15 @@ class TestS3ApiBucket(S3ApiTestCase):
return self.call_s3api(req)
@s3acl(s3acl_only=True)
def test_bucket_GET_authenticated_users(self):
status, headers, body = \
self._test_bucket_GET_canned_acl('authenticated')
self.assertEqual(status.split()[0], '200')
@s3acl(s3acl_only=True)
def test_bucket_GET_all_users(self):
status, headers, body = self._test_bucket_GET_canned_acl('public')
self.assertEqual(status.split()[0], '200')
@s3acl(s3acl_only=True)
def test_bucket_DELETE_without_permission(self):
status, headers, body = self._test_bucket_for_s3acl('DELETE',
'test:other')
@ -1602,7 +1580,6 @@ class TestS3ApiBucket(S3ApiTestCase):
called = [method for method, _, _ in self.swift.calls_with_headers]
self.assertNotIn('DELETE', called)
@s3acl(s3acl_only=True)
def test_bucket_DELETE_with_write_permission(self):
status, headers, body = self._test_bucket_for_s3acl('DELETE',
'test:write')
@ -1611,7 +1588,6 @@ class TestS3ApiBucket(S3ApiTestCase):
called = [method for method, _, _ in self.swift.calls_with_headers]
self.assertNotIn('DELETE', called)
@s3acl(s3acl_only=True)
def test_bucket_DELETE_with_fullcontrol_permission(self):
status, headers, body = \
self._test_bucket_for_s3acl('DELETE', 'test:full_control')

@ -1,69 +0,0 @@
# Copyright (c) 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This stuff can't live in test/unit/__init__.py due to its swob dependency.
import unittest
from test.unit.common.middleware.s3api.helpers import FakeSwift
from swift.common.middleware.s3api.utils import sysmeta_header
from swift.common.swob import HeaderKeyDict
from mock import MagicMock
class S3ApiHelperTestCase(unittest.TestCase):
def setUp(self):
self.method = 'HEAD'
self.path = '/v1/AUTH_test/bucket'
def _check_headers(self, swift, method, path, headers):
_, response_headers, _ = swift._responses[(method, path)][0]
self.assertEqual(headers, response_headers)
def test_fake_swift_sysmeta(self):
swift = FakeSwift()
orig_headers = HeaderKeyDict()
orig_headers.update({sysmeta_header('container', 'acl'): 'test',
'x-container-meta-foo': 'bar'})
swift.register(self.method, self.path, MagicMock(), orig_headers, None)
self._check_headers(swift, self.method, self.path, orig_headers)
new_headers = orig_headers.copy()
del new_headers[sysmeta_header('container', 'acl').title()]
swift.register(self.method, self.path, MagicMock(), new_headers, None)
self._check_headers(swift, self.method, self.path, orig_headers)
def test_fake_swift_sysmeta_overwrite(self):
swift = FakeSwift()
orig_headers = HeaderKeyDict()
orig_headers.update({sysmeta_header('container', 'acl'): 'test',
'x-container-meta-foo': 'bar'})
swift.register(self.method, self.path, MagicMock(), orig_headers, None)
self._check_headers(swift, self.method, self.path, orig_headers)
new_headers = orig_headers.copy()
new_headers[sysmeta_header('container', 'acl').title()] = 'bar'
swift.register(self.method, self.path, MagicMock(), new_headers, None)
self.assertFalse(orig_headers == new_headers)
self._check_headers(swift, self.method, self.path, new_headers)
if __name__ == '__main__':
unittest.main()

@ -24,18 +24,17 @@ from swift.common import swob
from swift.common.swob import Request
from test.unit import make_timestamp_iter
from test.unit.common.middleware.s3api import S3ApiTestCase
from test.unit.common.middleware.s3api import S3ApiTestCase, S3ApiTestCaseAcl
from test.unit.common.middleware.s3api.helpers import UnreadableInput
from swift.common.middleware.s3api.etree import fromstring, tostring, \
Element, SubElement
from swift.common.utils import md5
from test.unit.common.middleware.s3api.test_s3_acl import s3acl
class TestS3ApiMultiDelete(S3ApiTestCase):
class BaseS3ApiMultiDelete(object):
def setUp(self):
super(TestS3ApiMultiDelete, self).setUp()
super(BaseS3ApiMultiDelete, self).setUp()
self.swift.register('HEAD', '/v1/AUTH_test/bucket/Key1',
swob.HTTPOk, {}, None)
self.swift.register('HEAD', '/v1/AUTH_test/bucket/Key2',
@ -45,7 +44,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
swob.HTTPOk, {}, None)
self.ts = make_timestamp_iter()
@s3acl
def test_object_multi_DELETE_to_object(self):
elem = Element('Delete')
obj = SubElement(elem, 'Object')
@ -64,7 +62,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
@s3acl
def test_object_multi_DELETE(self):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
swob.HTTPNoContent, {}, None)
@ -109,7 +106,8 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
status, headers, body = self.call_s3api(req)
with self.stubbed_container_info():
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
@ -130,7 +128,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
('DELETE', '/v1/AUTH_test/bucket/business/caf\xc3\xa9'),
])
@s3acl
def test_object_multi_DELETE_with_error(self):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
swob.HTTPNoContent, {}, None)
@ -170,7 +167,8 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
status, headers, body = self.call_s3api(req)
with self.stubbed_container_info():
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
@ -196,7 +194,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
('DELETE', '/v1/AUTH_test/bucket/Key4?multipart-manifest=delete'),
])
@s3acl
def test_object_multi_DELETE_with_non_json(self):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
swob.HTTPNoContent, {}, None)
@ -242,7 +239,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
'Could not parse SLO delete response (200 OK): %s: ' % b'asdf'])
self.s3api.logger.clear()
@s3acl
def test_object_multi_DELETE_quiet(self):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
swob.HTTPNoContent, {}, None)
@ -272,7 +268,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), 0)
@s3acl
def test_object_multi_DELETE_no_key(self):
self.swift.register('DELETE', '/v1/AUTH_test/bucket/Key1',
swob.HTTPNoContent, {}, None)
@ -297,7 +292,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'UserKeyMustBeSpecified')
@s3acl
def test_object_multi_DELETE_versioned_enabled(self):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent, {
@ -344,7 +338,9 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
status, headers, body = self.call_s3api(req)
# XXX versioning_enabled=True not required?
with self.stubbed_container_info():
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
self.assertEqual(self.swift.calls, [
@ -363,7 +359,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
self.assertEqual({'Key1', 'Key2', 'Key3', 'Key4'}, set(
e.findtext('Key') for e in elem.findall('Deleted')))
@s3acl
def test_object_multi_DELETE_versioned_suspended(self):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent, {}, None)
@ -402,7 +397,9 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
status, headers, body = self.call_s3api(req)
# XXX versioning_enabled=True not required?
with self.stubbed_container_info():
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), 3)
@ -421,7 +418,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
('DELETE', '/v1/AUTH_test/bucket/Key3'),
])
@s3acl
def test_object_multi_DELETE_with_invalid_md5(self):
elem = Element('Delete')
for key in ['Key1', 'Key2']:
@ -438,7 +434,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidDigest')
@s3acl
def test_object_multi_DELETE_without_md5(self):
elem = Element('Delete')
for key in ['Key1', 'Key2']:
@ -454,7 +449,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidRequest')
@s3acl
def test_object_multi_DELETE_lots_of_keys(self):
elem = Element('Delete')
for i in range(self.s3api.conf.max_multi_delete_objects):
@ -483,7 +477,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
self.assertEqual(len(elem.findall('Deleted')),
self.s3api.conf.max_multi_delete_objects)
@s3acl
def test_object_multi_DELETE_too_many_keys(self):
elem = Element('Delete')
for i in range(self.s3api.conf.max_multi_delete_objects + 1):
@ -502,7 +495,6 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedXML')
@s3acl
def test_object_multi_DELETE_unhandled_exception(self):
exploding_resp = mock.MagicMock(
side_effect=Exception('kaboom'))
@ -525,61 +517,40 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
self.assertEqual(status.split()[0], '200')
self.assertIn(b'<Error><Key>Key1</Key><Code>Server Error</Code>', body)
def _test_object_multi_DELETE(self, account):
self.keys = ['Key1', 'Key2']
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
swob.HTTPNoContent, {}, None)
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
swob.HTTPNotFound, {}, None)
elem = Element('Delete')
for key in self.keys:
obj = SubElement(elem, 'Object')
SubElement(obj, 'Key').text = key
body = tostring(elem, use_s3ns=False)
content_md5 = (
base64.b64encode(md5(body, usedforsecurity=False).digest())
def _test_no_body(self, use_content_length=False,
use_transfer_encoding=False, string_to_md5=b''):
content_md5 = (base64.b64encode(
md5(string_to_md5, usedforsecurity=False).digest())
.strip())
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket?delete',
environ={
'REQUEST_METHOD': 'POST',
'wsgi.input': fake_input},
headers={
'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body='')
if not use_content_length:
req.environ.pop('CONTENT_LENGTH')
if use_transfer_encoding:
req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self._get_error_code(body), 'MissingRequestBodyError')
req = Request.blank('/bucket?delete',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
req.date = datetime.now()
req.content_type = 'text/plain'
def test_object_multi_DELETE_empty_body(self):
self._test_no_body()
self._test_no_body(string_to_md5=b'test')
self._test_no_body(use_content_length=True)
self._test_no_body(use_content_length=True, string_to_md5=b'test')
self._test_no_body(use_transfer_encoding=True)
self._test_no_body(use_transfer_encoding=True, string_to_md5=b'test')
return self.call_s3api(req)
@s3acl(s3acl_only=True)
def test_object_multi_DELETE_without_permission(self):
status, headers, body = self._test_object_multi_DELETE('test:other')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
errors = elem.findall('Error')
self.assertEqual(len(errors), len(self.keys))
for e in errors:
self.assertTrue(e.find('Key').text in self.keys)
self.assertEqual(e.find('Code').text, 'AccessDenied')
self.assertEqual(e.find('Message').text, 'Access Denied.')
@s3acl(s3acl_only=True)
def test_object_multi_DELETE_with_write_permission(self):
status, headers, body = self._test_object_multi_DELETE('test:write')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
@s3acl(s3acl_only=True)
def test_object_multi_DELETE_with_fullcontrol_permission(self):
status, headers, body = \
self._test_object_multi_DELETE('test:full_control')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
class TestS3ApiMultiDeleteNoAcl(BaseS3ApiMultiDelete, S3ApiTestCase):
def test_object_multi_DELETE_with_system_entity(self):
self.keys = ['Key1', 'Key2']
@ -620,38 +591,61 @@ class TestS3ApiMultiDelete(S3ApiTestCase):
self.assertNotIn(b'root:/root', body)
self.assertIn(b'<Deleted><Key>Key1</Key></Deleted>', body)
def _test_no_body(self, use_content_length=False,
use_transfer_encoding=False, string_to_md5=b''):
content_md5 = (base64.b64encode(
md5(string_to_md5, usedforsecurity=False).digest())
.strip())
with UnreadableInput(self) as fake_input:
req = Request.blank(
'/bucket?delete',
environ={
'REQUEST_METHOD': 'POST',
'wsgi.input': fake_input},
headers={
'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body='')
if not use_content_length:
req.environ.pop('CONTENT_LENGTH')
if use_transfer_encoding:
req.environ['HTTP_TRANSFER_ENCODING'] = 'chunked'
status, headers, body = self.call_s3api(req)
self.assertEqual(status, '400 Bad Request')
self.assertEqual(self._get_error_code(body), 'MissingRequestBodyError')
@s3acl
def test_object_multi_DELETE_empty_body(self):
self._test_no_body()
self._test_no_body(string_to_md5=b'test')
self._test_no_body(use_content_length=True)
self._test_no_body(use_content_length=True, string_to_md5=b'test')
self._test_no_body(use_transfer_encoding=True)
self._test_no_body(use_transfer_encoding=True, string_to_md5=b'test')
class TestS3ApiMultiDeleteAcl(BaseS3ApiMultiDelete, S3ApiTestCaseAcl):
def _test_object_multi_DELETE(self, account):
self.keys = ['Key1', 'Key2']
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[0],
swob.HTTPNoContent, {}, None)
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket/%s' % self.keys[1],
swob.HTTPNotFound, {}, None)
elem = Element('Delete')
for key in self.keys:
obj = SubElement(elem, 'Object')
SubElement(obj, 'Key').text = key
body = tostring(elem, use_s3ns=False)
content_md5 = (
base64.b64encode(md5(body, usedforsecurity=False).digest())
.strip())
req = Request.blank('/bucket?delete',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header(),
'Content-MD5': content_md5},
body=body)
req.date = datetime.now()
req.content_type = 'text/plain'
return self.call_s3api(req)
def test_object_multi_DELETE_without_permission(self):
status, headers, body = self._test_object_multi_DELETE('test:other')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
errors = elem.findall('Error')
self.assertEqual(len(errors), len(self.keys))
for e in errors:
self.assertTrue(e.find('Key').text in self.keys)
self.assertEqual(e.find('Code').text, 'AccessDenied')
self.assertEqual(e.find('Message').text, 'Access Denied.')
def test_object_multi_DELETE_with_write_permission(self):
status, headers, body = self._test_object_multi_DELETE('test:write')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
def test_object_multi_DELETE_with_fullcontrol_permission(self):
status, headers, body = \
self._test_object_multi_DELETE('test:full_control')
self.assertEqual(status.split()[0], '200')
elem = fromstring(body)
self.assertEqual(len(elem.findall('Deleted')), len(self.keys))
if __name__ == '__main__':

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -14,94 +14,17 @@
# limitations under the License.
import unittest
import functools
import sys
import traceback
from mock import patch, MagicMock
from swift.common import swob
from swift.common.swob import Request
from swift.common.utils import json
from swift.common.middleware.s3api.etree import tostring, Element, SubElement
from swift.common.middleware.s3api.subresource import ACL, ACLPrivate, User, \
encode_acl, AuthenticatedUsers, AllUsers, Owner, Grant, PERMISSIONS
from test.unit.common.middleware.s3api.test_s3api import S3ApiTestCase
from test.unit.common.middleware.s3api.exceptions import NotMethodException
from test.unit.common.middleware.s3api import FakeSwift
Owner, Grant
from test.unit.common.middleware.s3api import S3ApiTestCaseAcl
XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
def s3acl(func=None, s3acl_only=False, versioning_enabled=True):
"""
NOTE: s3acl decorator needs an instance of s3api testing framework.
(i.e. An instance for first argument is necessary)
"""
if func is None:
return functools.partial(
s3acl,
s3acl_only=s3acl_only,
versioning_enabled=versioning_enabled)
@functools.wraps(func)
def s3acl_decorator(*args, **kwargs):
if not args and not kwargs:
raise NotMethodException('Use s3acl decorator for a method')
def call_func(failing_point=''):
try:
# For maintainability, we patch 204 status for every
# get_container_info. if you want, we can rewrite the
# statement easily with nested decorator like as:
#
# @s3acl
# @patch(xxx)
# def test_xxxx(self)
fake_info = {'status': 204}
if versioning_enabled:
fake_info['sysmeta'] = {
'versions-container': '\x00versions\x00bucket',
}
with patch('swift.common.middleware.s3api.s3request.'
'get_container_info', return_value=fake_info):
func(*args, **kwargs)
except AssertionError:
# Make traceback message to clarify the assertion
exc_type, exc_instance, exc_traceback = sys.exc_info()
formatted_traceback = ''.join(traceback.format_tb(
exc_traceback))
message = '\n%s\n%s' % (formatted_traceback,
exc_type.__name__)
if exc_instance.args:
message += ':\n%s' % (exc_instance.args[0],)
message += failing_point
raise exc_type(message)
instance = args[0]
if not s3acl_only:
call_func()
instance.swift._calls = []
instance.s3api.conf.s3_acl = True
instance.swift.s3_acl = True
owner = Owner('test:tester', 'test:tester')
generate_s3acl_environ('test', instance.swift, owner)
call_func(' (fail at s3_acl)')
return s3acl_decorator
def _gen_test_headers(owner, grants=[], resource='container'):
if not grants:
grants = [Grant(User('test:tester'), 'FULL_CONTROL')]
return encode_acl(resource, ACL(owner, grants))
def _make_xml(grantee):
owner = 'test:tester'
permission = 'READ'
@ -116,69 +39,7 @@ def _make_xml(grantee):
return tostring(elem)
def generate_s3acl_environ(account, swift, owner):
def gen_grant(permission):
# generate Grant with a grantee named by "permission"
account_name = '%s:%s' % (account, permission.lower())
return Grant(User(account_name), permission)
grants = [gen_grant(perm) for perm in PERMISSIONS]
container_headers = _gen_test_headers(owner, grants)
object_headers = _gen_test_headers(owner, grants, 'object')
object_body = 'hello'
object_headers['Content-Length'] = len(object_body)
# TEST method is used to resolve a tenant name
swift.register('TEST', '/v1/AUTH_test', swob.HTTPMethodNotAllowed,
{}, None)
swift.register('TEST', '/v1/AUTH_X', swob.HTTPMethodNotAllowed,
{}, None)
# for bucket
swift.register('HEAD', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
container_headers, None)
swift.register('HEAD', '/v1/AUTH_test/bucket+segments', swob.HTTPNoContent,
container_headers, None)
swift.register('PUT', '/v1/AUTH_test/bucket',
swob.HTTPCreated, {}, None)
swift.register('GET', '/v1/AUTH_test/bucket', swob.HTTPNoContent,
container_headers, json.dumps([]))
swift.register('POST', '/v1/AUTH_test/bucket',
swob.HTTPNoContent, {}, None)
swift.register('DELETE', '/v1/AUTH_test/bucket',
swob.HTTPNoContent, {}, None)
# necessary for canned-acl tests
public_headers = _gen_test_headers(owner, [Grant(AllUsers(), 'READ')])
swift.register('GET', '/v1/AUTH_test/public', swob.HTTPNoContent,
public_headers, json.dumps([]))
authenticated_headers = _gen_test_headers(
owner, [Grant(AuthenticatedUsers(), 'READ')], 'bucket')
swift.register('GET', '/v1/AUTH_test/authenticated',
swob.HTTPNoContent, authenticated_headers,
json.dumps([]))
# for object
swift.register('HEAD', '/v1/AUTH_test/bucket/object', swob.HTTPOk,
object_headers, None)
class TestS3ApiS3Acl(S3ApiTestCase):
def setUp(self):
super(TestS3ApiS3Acl, self).setUp()
self.s3api.conf.s3_acl = True
self.swift.s3_acl = True
account = 'test'
owner_name = '%s:tester' % account
self.default_owner = Owner(owner_name, owner_name)
generate_s3acl_environ(account, self.swift, self.default_owner)
def tearDown(self):
self.s3api.conf.s3_acl = False
class TestS3ApiS3Acl(S3ApiTestCaseAcl):
def test_bucket_acl_PUT_with_other_owner(self):
req = Request.blank('/bucket?acl',
@ -521,42 +382,6 @@ class TestS3ApiS3Acl(S3ApiTestCase):
status, headers, body = self._test_object_acl_PUT('test:tester')
self.assertEqual(status.split()[0], '200')
def test_s3acl_decorator(self):
@s3acl
def non_class_s3acl_error():
raise TypeError()
class FakeClass(object):
def __init__(self):
self.s3api = MagicMock()
self.swift = FakeSwift()
@s3acl
def s3acl_error(self):
raise TypeError()
@s3acl
def s3acl_assert_fail(self):
assert False
@s3acl(s3acl_only=True)
def s3acl_s3only_error(self):
if self.s3api.conf.s3_acl:
raise TypeError()
@s3acl(s3acl_only=True)
def s3acl_s3only_no_error(self):
if not self.s3api.conf.s3_acl:
raise TypeError()
fake_class = FakeClass()
self.assertRaises(NotMethodException, non_class_s3acl_error)
self.assertRaises(TypeError, fake_class.s3acl_error)
self.assertRaises(AssertionError, fake_class.s3acl_assert_fail)
self.assertRaises(TypeError, fake_class.s3acl_s3only_error)
self.assertIsNone(fake_class.s3acl_s3only_no_error())
if __name__ == '__main__':
unittest.main()

@ -40,7 +40,7 @@ from keystoneauth1.access import AccessInfoV2
from test.debug_logger import debug_logger, FakeStatsdClient
from test.unit.common.middleware.s3api import S3ApiTestCase
from test.unit.common.middleware.s3api.helpers import FakeSwift
from test.unit.common.middleware.helpers import FakeSwift
from test.unit.common.middleware.s3api.test_s3token import \
GOOD_RESPONSE_V2, GOOD_RESPONSE_V3
from swift.common.middleware.s3api.s3request import SigV4Request, S3Request
@ -1440,7 +1440,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
self.s3api.logger.logger.statsd_client.get_increment_counts())
def test_s3api_with_only_s3_token(self):
self.swift = FakeSwift()
self.swift = FakeSwift(allowed_methods=['TEST'])
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.s3_token = S3Token(
@ -1470,7 +1470,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
req.environ['swift.backend_path'])
def test_s3api_with_only_s3_token_v3(self):
self.swift = FakeSwift()
self.swift = FakeSwift(allowed_methods=['TEST'])
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.s3_token = S3Token(
@ -1500,7 +1500,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
req.environ['swift.backend_path'])
def test_s3api_with_s3_token_and_auth_token(self):
self.swift = FakeSwift()
self.swift = FakeSwift(allowed_methods=['TEST'])
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.auth_token = AuthProtocol(
@ -1555,7 +1555,7 @@ class TestS3ApiMiddleware(S3ApiTestCase):
statsd_client.get_increment_counts())
def test_s3api_with_only_s3_token_in_s3acl(self):
self.swift = FakeSwift()
self.swift = FakeSwift(allowed_methods=['TEST'])
self.keystone_auth = KeystoneAuth(
self.swift, {'operator_roles': 'swift-user'})
self.s3_token = S3Token(

@ -95,7 +95,6 @@ class TestRequest(S3ApiTestCase):
def setUp(self):
super(TestRequest, self).setUp()
self.s3api.conf.s3_acl = True
self.swift.s3_acl = True
@patch('swift.common.middleware.s3api.acl_handlers.ACL_MAP', Fake_ACL_MAP)
@patch('swift.common.middleware.s3api.s3request.S3AclRequest.authenticate',
@ -122,7 +121,6 @@ class TestRequest(S3ApiTestCase):
def test_get_response_without_s3_acl(self):
self.s3api.conf.s3_acl = False
self.swift.s3_acl = False
mock_get_resp, m_check_permission, s3_resp = \
self._test_get_response('HEAD')
self.assertFalse(hasattr(s3_resp, 'bucket_acl'))
@ -1005,7 +1003,6 @@ class TestSigV4Request(S3ApiTestCase):
def setUp(self):
super(TestSigV4Request, self).setUp()
self.s3api.conf.s3_acl = True
self.swift.s3_acl = True
def test_init_header_authorization(self):
environ = {

@ -19,8 +19,7 @@ from swift.common import swob
from swift.common.swob import Request
from swift.common.utils import json
from test.unit.common.middleware.s3api.test_s3_acl import s3acl
from test.unit.common.middleware.s3api import S3ApiTestCase
from test.unit.common.middleware.s3api import S3ApiTestCase, S3ApiTestCaseAcl
from swift.common.middleware.s3api.etree import fromstring
from swift.common.middleware.s3api.subresource import ACL, Owner, encode_acl
@ -36,7 +35,7 @@ def create_bucket_list_json(buckets):
return json.dumps(bucket_list)
class TestS3ApiService(S3ApiTestCase):
class BaseS3ApiService(object):
def setup_buckets(self):
self.buckets = (('apple', 1, 200), ('orange', 3, 430))
bucket_list = create_bucket_list_json(self.buckets)
@ -44,22 +43,10 @@ class TestS3ApiService(S3ApiTestCase):
bucket_list)
def setUp(self):
super(TestS3ApiService, self).setUp()
super(BaseS3ApiService, self).setUp()
self.setup_buckets()
def test_service_GET_error(self):
code = self._test_method_error(
'GET', '', swob.HTTPUnauthorized, expected_xml_tags=(
'Code', 'Message', 'AWSAccessKeyId', 'StringToSign',
'StringToSignBytes', 'SignatureProvided'))
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('GET', '', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('GET', '', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
@s3acl
def test_service_GET(self):
req = Request.blank('/',
environ={'REQUEST_METHOD': 'GET'},
@ -83,7 +70,6 @@ class TestS3ApiService(S3ApiTestCase):
for i in self.buckets:
self.assertTrue(i[0] in names)
@s3acl
def test_service_GET_subresource(self):
req = Request.blank('/?acl',
environ={'REQUEST_METHOD': 'GET'},
@ -107,6 +93,20 @@ class TestS3ApiService(S3ApiTestCase):
for i in self.buckets:
self.assertTrue(i[0] in names)
class TestS3ApiServiceNoAcl(BaseS3ApiService, S3ApiTestCase):
def test_service_GET_error(self):
code = self._test_method_error(
'GET', '', swob.HTTPUnauthorized, expected_xml_tags=(
'Code', 'Message', 'AWSAccessKeyId', 'StringToSign',
'StringToSignBytes', 'SignatureProvided'))
self.assertEqual(code, 'SignatureDoesNotMatch')
code = self._test_method_error('GET', '', swob.HTTPForbidden)
self.assertEqual(code, 'AccessDenied')
code = self._test_method_error('GET', '', swob.HTTPServerError)
self.assertEqual(code, 'InternalError')
def test_service_GET_with_blind_resource(self):
buckets = (('apple', 1, 200), ('orange', 3, 430),
('apple+segment', 1, 200))
@ -137,6 +137,9 @@ class TestS3ApiService(S3ApiTestCase):
for i in expected:
self.assertIn(i[0], names)
class TestS3ApiServiceAcl(BaseS3ApiService, S3ApiTestCaseAcl):
def _test_service_GET_for_check_bucket_owner(self, buckets):
self.s3api.conf.check_bucket_owner = True
bucket_list = create_bucket_list_json(buckets)
@ -149,7 +152,6 @@ class TestS3ApiService(S3ApiTestCase):
'Date': self.get_date_header()})
return self.call_s3api(req)
@s3acl(s3acl_only=True)
def test_service_GET_without_bucket(self):
bucket_list = []
for var in range(0, 10):
@ -168,7 +170,6 @@ class TestS3ApiService(S3ApiTestCase):
buckets = resp_buckets.iterchildren('Bucket')
self.assertEqual(len(list(buckets)), 0)
@s3acl(s3acl_only=True)
def test_service_GET_without_owner_bucket(self):
bucket_list = []
for var in range(0, 10):
@ -190,7 +191,6 @@ class TestS3ApiService(S3ApiTestCase):
buckets = resp_buckets.iterchildren('Bucket')
self.assertEqual(len(list(buckets)), 0)
@s3acl(s3acl_only=True)
def test_service_GET_bucket_list(self):
bucket_list = []
for var in range(0, 10):

@ -16,12 +16,40 @@
import unittest
from swift.common.storage_policy import POLICIES
from swift.common.swob import Request, HTTPOk, HTTPNotFound, HTTPCreated
from swift.common.swob import Request, HTTPOk, HTTPNotFound, \
HTTPCreated, HeaderKeyDict, HTTPException
from swift.common import request_helpers as rh
from swift.common.middleware.s3api.utils import sysmeta_header
from test.unit.common.middleware.helpers import FakeSwift
class TestFakeSwift(unittest.TestCase):
def test_allowed_methods(self):
def assert_allowed(swift, method):
path = '/v1/a/c/o'
swift.register(method, path, HTTPOk, {}, None)
req = Request.blank(path)
req.method = method
self.assertEqual(200, req.get_response(swift).status_int)
def assert_disallowed(swift, method):
path = '/v1/a/c/o'
swift.register(method, path, HTTPOk, {}, None)
req = Request.blank(path)
req.method = method
with self.assertRaises(HTTPException) as cm:
req.get_response(swift)
self.assertEqual(501, cm.exception.status_int)
for method in ('PUT', 'POST', 'DELETE', 'GET', 'HEAD', 'OPTIONS',
'REPLICATE', 'SSYNC', 'UPDATE'):
assert_allowed(FakeSwift(), method)
assert_allowed(FakeSwift(allowed_methods=['TEST']), 'TEST')
assert_disallowed(FakeSwift(), 'TEST')
assert_allowed(FakeSwift(allowed_methods=['TEST']), 'TEST')
def test_not_registered(self):
swift = FakeSwift()
@ -692,3 +720,114 @@ class TestFakeSwiftMultipleResponses(unittest.TestCase):
resp = req.get_response(swift)
self.assertEqual(200, resp.status_int)
self.assertEqual('Baz', resp.headers['X-Foo'])
class TestFakeSwiftStickyHeaders(unittest.TestCase):
def setUp(self):
self.swift = FakeSwift()
self.path = '/v1/AUTH_test/bucket'
def _check_headers(self, method, path, exp_headers):
captured_headers = {}
def start_response(status, resp_headers):
self.assertEqual(status, '200 OK')
captured_headers.update(resp_headers)
env = {'REQUEST_METHOD': method, 'PATH_INFO': path}
body_iter = self.swift(env, start_response)
b''.join(body_iter)
captured_headers.pop('Content-Type')
self.assertEqual(exp_headers, captured_headers)
def test_sticky_headers(self):
sticky_headers = HeaderKeyDict({
sysmeta_header('container', 'acl'): 'test',
'x-container-meta-foo': 'bar',
})
self.swift.update_sticky_response_headers(self.path, sticky_headers)
# register a response for this path with no headers
self.swift.register('GET', self.path, HTTPOk, {}, None)
self._check_headers('HEAD', self.path, sticky_headers)
self._check_headers('GET', self.path, sticky_headers)
# sticky headers are not applied to PUT, POST, DELETE
self.swift.register('PUT', self.path, HTTPOk, {}, None)
self._check_headers('PUT', self.path, {})
self.swift.register('POST', self.path, HTTPOk, {}, None)
self._check_headers('POST', self.path, {})
self.swift.register('DELETE', self.path, HTTPOk, {}, None)
self._check_headers('DELETE', self.path, {})
def test_sticky_headers_match_path(self):
other_path = self.path + '-other'
sticky_headers = HeaderKeyDict({
sysmeta_header('container', 'acl'): 'test',
'x-container-meta-foo': 'bar',
})
sticky_headers_other = HeaderKeyDict({
'x-container-meta-foo': 'other',
})
self.swift.update_sticky_response_headers(self.path, sticky_headers)
self.swift.update_sticky_response_headers(other_path,
sticky_headers_other)
self.swift.register('GET', self.path, HTTPOk, {}, None)
self.swift.register('GET', other_path, HTTPOk, {}, None)
self._check_headers('HEAD', self.path, sticky_headers)
self._check_headers('GET', other_path, sticky_headers_other)
def test_sticky_headers_update(self):
sticky_headers = HeaderKeyDict({
sysmeta_header('container', 'acl'): 'test',
'x-container-meta-foo': 'bar'
})
exp_headers = sticky_headers.copy()
self.swift.update_sticky_response_headers(self.path, sticky_headers)
self.swift.register('HEAD', self.path, HTTPOk, {}, None)
self._check_headers('HEAD', self.path, exp_headers)
# check that FakeSwift made a *copy*
sticky_headers['x-container-meta-foo'] = 'changed'
self._check_headers('HEAD', self.path, exp_headers)
# check existing are updated not replaced
sticky_headers = HeaderKeyDict({
sysmeta_header('container', 'acl'): 'test-modified',
'x-container-meta-bar': 'foo'
})
exp_headers.update(sticky_headers)
self.swift.update_sticky_response_headers(self.path, sticky_headers)
self._check_headers('HEAD', self.path, exp_headers)
def test_sticky_headers_add_to_response_headers(self):
sticky_headers = HeaderKeyDict({
'x-container-meta-foo': 'bar',
})
self.swift.update_sticky_response_headers(self.path, sticky_headers)
# register a response with another header
self.swift.register('HEAD', self.path, HTTPOk, {
'x-backend-storage-policy-index': '1',
}, None)
self._check_headers('HEAD', self.path, HeaderKeyDict({
'x-container-meta-foo': 'bar',
'x-backend-storage-policy-index': '1',
}))
def test_sticky_headers_overwritten_by_response_header(self):
sticky_headers = HeaderKeyDict({
'x-container-meta-foo': 'bar',
'x-backend-storage-policy-index': '0',
})
self.swift.update_sticky_response_headers(self.path, sticky_headers)
# register a response with a different value for a sticky header
self.swift.register('HEAD', self.path, HTTPOk, {
'x-container-meta-foo': 'different',
}, None)
self._check_headers('HEAD', self.path, HeaderKeyDict({
'x-container-meta-foo': 'different',
'x-backend-storage-policy-index': '0',
}))
if __name__ == '__main__':
unittest.main()