swift/test/unit/common/middleware/s3api/test_s3_acl.py
Alistair Coles b07d87c4be tests: use subclasses for S3Acl tests
We remove s3api.FakeSwift and replace it with the "normal" FakeSwift.
Additionally the @s3acl decorator is removed and replaced with an
inheritance based pattern.  This simplifies maintenance using more
familiar patterns and improves debugging.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I55b596a42af01870b49fda22800f7a1293163eb8
2023-12-14 16:59:53 +00:00

388 lines
19 KiB
Python

# Copyright (c) 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from swift.common.swob import Request
from swift.common.middleware.s3api.etree import tostring, Element, SubElement
from swift.common.middleware.s3api.subresource import ACL, ACLPrivate, User, \
Owner, Grant
from test.unit.common.middleware.s3api import S3ApiTestCaseAcl
XMLNS_XSI = 'http://www.w3.org/2001/XMLSchema-instance'
def _make_xml(grantee):
owner = 'test:tester'
permission = 'READ'
elem = Element('AccessControlPolicy')
elem_owner = SubElement(elem, 'Owner')
SubElement(elem_owner, 'ID').text = owner
SubElement(elem_owner, 'DisplayName').text = owner
acl_list_elem = SubElement(elem, 'AccessControlList')
elem_grant = SubElement(acl_list_elem, 'Grant')
elem_grant.append(grantee)
SubElement(elem_grant, 'Permission').text = permission
return tostring(elem)
class TestS3ApiS3Acl(S3ApiTestCaseAcl):
def test_bucket_acl_PUT_with_other_owner(self):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=tostring(
ACLPrivate(
Owner(id='test:other',
name='test:other')).elem()))
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'AccessDenied')
def test_object_acl_PUT_xml_error(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body="invalid xml")
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedACLError')
def test_canned_acl_private(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'private'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
self.assertIn('REMOTE_USER', req.environ)
def test_canned_acl_public_read(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'public-read'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_canned_acl_public_read_write(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'public-read-write'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_canned_acl_authenticated_read(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'authenticated-read'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_canned_acl_bucket_owner_read(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'bucket-owner-read'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_canned_acl_bucket_owner_full_control(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'bucket-owner-full-control'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_invalid_canned_acl(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-acl': 'invalid'})
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidArgument')
def _test_grant_header(self, permission):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-' + permission:
'id=test:tester'})
return self.call_s3api(req)
def test_grant_read(self):
status, headers, body = self._test_grant_header('read')
self.assertEqual(status.split()[0], '200')
def test_grant_write(self):
status, headers, body = self._test_grant_header('write')
self.assertEqual(status.split()[0], '200')
def test_grant_read_acp(self):
status, headers, body = self._test_grant_header('read-acp')
self.assertEqual(status.split()[0], '200')
def test_grant_write_acp(self):
status, headers, body = self._test_grant_header('write-acp')
self.assertEqual(status.split()[0], '200')
def test_grant_full_control(self):
status, headers, body = self._test_grant_header('full-control')
self.assertEqual(status.split()[0], '200')
def test_grant_invalid_permission(self):
status, headers, body = self._test_grant_header('invalid')
self.assertEqual(self._get_error_code(body), 'MissingSecurityHeader')
def test_grant_with_both_header_and_xml(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-full-control':
'id=test:tester'},
body=tostring(
ACLPrivate(
Owner(id='test:tester',
name='test:tester')).elem()))
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'UnexpectedContent')
def test_grant_with_both_header_and_canned_acl(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-full-control':
'id=test:tester',
'x-amz-acl': 'public-read'})
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidRequest')
def test_grant_email(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read': 'emailAddress=a@b.c'})
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'NotImplemented')
def test_grant_email_xml(self):
grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'AmazonCustomerByEmail')
SubElement(grantee, 'EmailAddress').text = 'Grantees@email.com'
xml = _make_xml(grantee=grantee)
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'NotImplemented')
def test_grant_invalid_group_xml(self):
grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Invalid')
xml = _make_xml(grantee=grantee)
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'MalformedACLError')
def test_grant_authenticated_users(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read':
'uri="http://acs.amazonaws.com/groups/'
'global/AuthenticatedUsers"'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_grant_all_users(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read':
'uri="http://acs.amazonaws.com/groups/'
'global/AllUsers"'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_grant_all_users_with_uppercase_type(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read':
'URI="http://acs.amazonaws.com/groups/'
'global/AllUsers"'})
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '200')
def test_grant_invalid_uri(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read':
'uri="http://localhost/"'})
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidArgument')
def test_grant_invalid_uri_xml(self):
grantee = Element('Grantee', nsmap={'xsi': XMLNS_XSI})
grantee.set('{%s}type' % XMLNS_XSI, 'Group')
SubElement(grantee, 'URI').text = 'invalid'
xml = _make_xml(grantee)
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()},
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidArgument')
def test_grant_invalid_target(self):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(),
'x-amz-grant-read': 'key=value'})
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'InvalidArgument')
def _test_bucket_acl_GET(self, account):
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header()})
return self.call_s3api(req)
def test_bucket_acl_GET_without_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:other')
self.assertEqual(self._get_error_code(body), 'AccessDenied')
def test_bucket_acl_GET_with_read_acp_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:read_acp')
self.assertEqual(status.split()[0], '200')
def test_bucket_acl_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:full_control')
self.assertEqual(status.split()[0], '200')
def test_bucket_acl_GET_with_owner_permission(self):
status, headers, body = self._test_bucket_acl_GET('test:tester')
self.assertEqual(status.split()[0], '200')
def _test_bucket_acl_PUT(self, account, permission='FULL_CONTROL'):
acl = ACL(self.default_owner, [Grant(User(account), permission)])
req = Request.blank('/bucket?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header()},
body=tostring(acl.elem()))
return self.call_s3api(req)
def test_bucket_acl_PUT_without_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:other')
self.assertEqual(self._get_error_code(body), 'AccessDenied')
def test_bucket_acl_PUT_with_write_acp_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:write_acp')
self.assertEqual(status.split()[0], '200')
def test_bucket_acl_PUT_with_fullcontrol_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:full_control')
self.assertEqual(status.split()[0], '200')
def test_bucket_acl_PUT_with_owner_permission(self):
status, headers, body = self._test_bucket_acl_PUT('test:tester')
self.assertEqual(status.split()[0], '200')
def _test_object_acl_GET(self, account):
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header()})
return self.call_s3api(req)
def test_object_acl_GET_without_permission(self):
status, headers, body = self._test_object_acl_GET('test:other')
self.assertEqual(self._get_error_code(body), 'AccessDenied')
def test_object_acl_GET_with_read_acp_permission(self):
status, headers, body = self._test_object_acl_GET('test:read_acp')
self.assertEqual(status.split()[0], '200')
def test_object_acl_GET_with_fullcontrol_permission(self):
status, headers, body = self._test_object_acl_GET('test:full_control')
self.assertEqual(status.split()[0], '200')
def test_object_acl_GET_with_owner_permission(self):
status, headers, body = self._test_object_acl_GET('test:tester')
self.assertEqual(status.split()[0], '200')
def _test_object_acl_PUT(self, account, permission='FULL_CONTROL'):
acl = ACL(self.default_owner, [Grant(User(account), permission)])
req = Request.blank('/bucket/object?acl',
environ={'REQUEST_METHOD': 'PUT'},
headers={'Authorization': 'AWS %s:hmac' % account,
'Date': self.get_date_header()},
body=tostring(acl.elem()))
return self.call_s3api(req)
def test_object_acl_PUT_without_permission(self):
status, headers, body = self._test_object_acl_PUT('test:other')
self.assertEqual(self._get_error_code(body), 'AccessDenied')
def test_object_acl_PUT_with_write_acp_permission(self):
status, headers, body = self._test_object_acl_PUT('test:write_acp')
self.assertEqual(status.split()[0], '200')
def test_object_acl_PUT_with_fullcontrol_permission(self):
status, headers, body = self._test_object_acl_PUT('test:full_control')
self.assertEqual(status.split()[0], '200')
def test_object_acl_PUT_with_owner_permission(self):
status, headers, body = self._test_object_acl_PUT('test:tester')
self.assertEqual(status.split()[0], '200')
if __name__ == '__main__':
unittest.main()