s3_acl: Require swift_owner authz to create buckets

Otherwise, users can create buckets in accounts they don't own.

Change-Id: I13d557c32b12529ef1087c52f7af302a33d33acb
This commit is contained in:
Tim Burke 2018-06-25 15:40:30 -07:00 committed by Thiago da Silva
parent dd760bff0a
commit 51b885b3b5
6 changed files with 31 additions and 8 deletions

View File

@ -53,7 +53,7 @@ import sys
from swift.common.middleware.s3api.subresource import ACL, Owner, encode_acl
from swift.common.middleware.s3api.s3response import MissingSecurityHeader, \
MalformedACLError, UnexpectedContent
MalformedACLError, UnexpectedContent, AccessDenied
from swift.common.middleware.s3api.etree import fromstring, XMLSyntaxError, \
DocumentInvalid
from swift.common.middleware.s3api.utils import MULTIUPLOAD_SUFFIX, \
@ -209,6 +209,9 @@ class BucketAclHandler(BaseAclHandler):
req_acl = ACL.from_headers(self.req.headers,
Owner(self.user_id, self.user_id))
if not self.req.environ.get('swift_owner'):
raise AccessDenied()
# To avoid overwriting the existing bucket's ACL, we send PUT
# request first before setting the ACL to make sure that the target
# container does not exist.

View File

@ -1341,6 +1341,9 @@ class S3AclRequest(S3Request):
# tempauth
self.user_id = self.access_key
sw_req.environ.get('swift.authorize', lambda req: None)(sw_req)
self.environ['swift_owner'] = sw_req.environ.get('swift_owner', False)
# Need to skip S3 authorization on subsequent requests to prevent
# overwriting the account in PATH_INFO
del self.headers['Authorization']

View File

@ -151,7 +151,11 @@ class TestS3ApiBucket(S3ApiBase):
self.conn.make_request('PUT', 'bucket')
status, headers, body = self.conn.make_request('PUT', 'bucket')
self.assertEqual(get_error_code(body), 'BucketAlreadyExists')
# If the user can't create buckets, they shouldn't even know
# whether the bucket exists. For some reason, though, when s3_acl
# is disabled, we translate 403 -> BucketAlreadyExists??
self.assertIn(get_error_code(body),
('AccessDenied', 'BucketAlreadyExists'))
def test_put_bucket_with_LocationConstraint(self):
bucket = 'bucket'

View File

@ -106,7 +106,8 @@ class S3ApiTestCase(unittest.TestCase):
elem = fromstring(body, 'Error')
return elem.find('./Message').text
def _test_method_error(self, method, path, response_class, headers={}):
def _test_method_error(self, method, path, response_class, headers={},
env={}):
if not path.startswith('/'):
path = '/' + path # add a missing slash before the path
@ -117,8 +118,8 @@ class S3ApiTestCase(unittest.TestCase):
self.swift.register(method, uri, response_class, headers, None)
headers.update({'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header()})
req = swob.Request.blank(path, environ={'REQUEST_METHOD': method},
headers=headers)
env.update({'REQUEST_METHOD': method})
req = swob.Request.blank(path, environ=env, headers=headers)
status, headers, body = self.call_s3api(req)
return self._get_error_code(body)

View File

@ -53,9 +53,15 @@ class FakeSwift(object):
env['REMOTE_USER'] = 'authorized'
if env['REQUEST_METHOD'] == 'TEST':
# AccessDenied by default at s3acl authenticate
env['swift.authorize'] = \
lambda req: swob.HTTPForbidden(request=req)
def authorize_cb(req):
# Assume swift owner, if not yet set
req.environ.setdefault('swift_owner', True)
# But then default to blocking authz, to ensure we've replaced
# the default auth system
return swob.HTTPForbidden(request=req)
env['swift.authorize'] = authorize_cb
else:
env['swift.authorize'] = lambda req: None

View File

@ -498,6 +498,12 @@ class TestS3ApiBucket(S3ApiTestCase):
swob.HTTPCreated)
self.assertEqual(code, 'InvalidBucketName')
@s3acl(s3acl_only=True)
def test_bucket_PUT_error_non_owner(self):
code = self._test_method_error('PUT', '/bucket', swob.HTTPAccepted,
env={'swift_owner': False})
self.assertEqual(code, 'AccessDenied')
@s3acl
def test_bucket_PUT(self):
req = Request.blank('/bucket',