![Jon Snitow](/assets/img/avatar_default.png)
* Introduce a new privileged account header: X-Account-Access-Control * Introduce JSON-based version 2 ACL syntax -- see below for discussion * Implement account ACL authorization in TempAuth X-Account-Access-Control Header ------------------------------- Accounts now have a new privileged header to represent ACLs or any other form of account-level access control. The value of the header is an opaque string to be interpreted by the auth system, but it must be a JSON-encoded dictionary. A reference implementation is given in TempAuth, with the knowledge that historically other auth systems often use TempAuth as a starting point. The reference implementation describes three levels of account access: "admin", "read-write", and "read-only". Adding new access control features in a future patch (e.g. "write-only" account access) will automatically be forward- and backward-compatible, due to the JSON dictionary header format. The privileged X-Account-Access-Control header may only be read or written by a user with "swift_owner" status, traditionally the account owner but now also any user on the "admin" ACL. Access Levels: Read-only access is intended to indicate to the auth system that this list of identities can read everything (except privileged headers) in the account. Specifically, a user with read-only account access can get a list of containers in the account, list the contents of any container, retrieve any object, and see the (non-privileged) headers of the account, any container, or any object. Read-write access is intended to indicate to the auth system that this list of identities can read or write (or create) any container. A user with read-write account access can create new containers, set any unprivileged container headers, overwrite objects, delete containers, etc. A read-write user can NOT set account headers (or perform any PUT/POST/DELETE requests on the account). Admin access is intended to indicate to the auth system that this list of identities has "swift_owner" privileges. A user with admin account access can do anything the account owner can, including setting account headers and any privileged headers -- and thus changing the value of X-Account-Access-Control and thereby granting read-only, read-write, or admin access to other users. The auth system is responsible for making decisions based on this header, if it chooses to support its use. Therefore the above access level descriptions are necessarily advisory only for other auth systems. When setting the value of the header, callers are urged to use the new format_acl() method, described below. New ACL Format -------------- The account ACLs introduce a new format for ACLs, rather than reusing the existing format from X-Container-Read/X-Container-Write. There are several reasons for this: * Container ACL format does not support Unicode * Container ACLs have a different structure than account ACLs + account ACLs have no concept of referrers or rlistings + accounts have additional "admin" access level + account access levels are structured as admin > rw > ro, which seems more appropriate for how people access accounts, rather than reusing container ACLs' orthogonal read and write access In addition, the container ACL syntax is a bit arbitrary and highly custom, so instead of parsing additional custom syntax, I'd rather propose a next version and introduce a means for migration. The V2 ACL syntax has the following benefits: * JSON is a well-known standard syntax with parsers in all languages * no artificial value restrictions (you can grant access to a user named ".rlistings" if you want) * forward and backward compatibility: you may have extraneous keys, but your attempt to parse the header won't raise an exception I've introduced hooks in parse_acl and format_acl which currently default to the old V1 syntax but tolerate the V2 syntax and can easily be flipped to default to V2. I'm not changing the default or adding code to rewrite V1 ACLs to V2, because this patch has suffered a lot of scope creep already, but this seems like a sensible milestone in the migration. TempAuth Account ACL Implementation ----------------------------------- As stated above, core Swift is responsible for privileging the X-Account-Access-Control header (making it only accessible to swift_owners), for translating it to -sysmeta-* headers to trigger persistence by the account server, and for including the header in the responses to requests by privileged users. Core Swift puts no expectation on the *content* of this header. Auth systems (including TempAuth) are responsible for defining the content of the header and taking action based on it. In addition to the changes described above, this patch defines a format to be used by TempAuth for these headers in the common.middleware.acl module, in the methods format_v2_acl() and parse_v2_acl(). This patch also teaches TempAuth to take action based on the header contents. TempAuth now sets swift_owner=True if the user is on the Admin ACL, authorizes GET/HEAD/OPTIONS requests if the user is on any ACL, authorizes PUT/POST/DELETE requests if the user is on the admin or read-write ACL, etc. Note that the action of setting swift_owner=True triggers core Swift to add or strip the privileged headers from the responses. Core Swift (not the auth system) is responsible for that. DocImpact: Documentation for the new ACL usage and format appears in summary form in doc/source/overview_auth.rst, and in more detail in swift/common/middleware/tempauth.py in the TempAuth class docstring. I leave it to the Swift doc team to determine whether more is needed. Change-Id: I836a99eaaa6bb0e92dc03e1ca46a474522e6e826
1111 lines
48 KiB
Python
1111 lines
48 KiB
Python
# Copyright (c) 2011 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
from contextlib import contextmanager
|
|
from base64 import b64encode
|
|
from time import time
|
|
|
|
from swift.common.middleware import tempauth as auth
|
|
from swift.common.middleware.acl import format_acl
|
|
from swift.common.swob import Request, Response
|
|
from swift.common.utils import split_path
|
|
|
|
NO_CONTENT_RESP = (('204 No Content', {}, ''),) # mock server response
|
|
|
|
|
|
class FakeMemcache(object):
|
|
|
|
def __init__(self):
|
|
self.store = {}
|
|
|
|
def get(self, key):
|
|
return self.store.get(key)
|
|
|
|
def set(self, key, value, time=0):
|
|
self.store[key] = value
|
|
return True
|
|
|
|
def incr(self, key, time=0):
|
|
self.store[key] = self.store.setdefault(key, 0) + 1
|
|
return self.store[key]
|
|
|
|
@contextmanager
|
|
def soft_lock(self, key, timeout=0, retries=5):
|
|
yield True
|
|
|
|
def delete(self, key):
|
|
try:
|
|
del self.store[key]
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
|
|
class FakeApp(object):
|
|
|
|
def __init__(self, status_headers_body_iter=None, acl=None, sync_key=None):
|
|
self.calls = 0
|
|
self.status_headers_body_iter = status_headers_body_iter
|
|
if not self.status_headers_body_iter:
|
|
self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
|
|
self.acl = acl
|
|
self.sync_key = sync_key
|
|
|
|
def __call__(self, env, start_response):
|
|
self.calls += 1
|
|
self.request = Request(env)
|
|
if self.acl:
|
|
self.request.acl = self.acl
|
|
if self.sync_key:
|
|
self.request.environ['swift_sync_key'] = self.sync_key
|
|
if 'swift.authorize' in env:
|
|
resp = env['swift.authorize'](self.request)
|
|
if resp:
|
|
return resp(env, start_response)
|
|
status, headers, body = self.status_headers_body_iter.next()
|
|
return Response(status=status, headers=headers,
|
|
body=body)(env, start_response)
|
|
|
|
|
|
class FakeConn(object):
|
|
|
|
def __init__(self, status_headers_body_iter=None):
|
|
self.calls = 0
|
|
self.status_headers_body_iter = status_headers_body_iter
|
|
if not self.status_headers_body_iter:
|
|
self.status_headers_body_iter = iter([('404 Not Found', {}, '')])
|
|
|
|
def request(self, method, path, headers):
|
|
self.calls += 1
|
|
self.request_path = path
|
|
self.status, self.headers, self.body = \
|
|
self.status_headers_body_iter.next()
|
|
self.status, self.reason = self.status.split(' ', 1)
|
|
self.status = int(self.status)
|
|
|
|
def getresponse(self):
|
|
return self
|
|
|
|
def read(self):
|
|
body = self.body
|
|
self.body = ''
|
|
return body
|
|
|
|
|
|
class TestAuth(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.test_auth = auth.filter_factory({})(FakeApp())
|
|
|
|
def _make_request(self, path, **kwargs):
|
|
req = Request.blank(path, **kwargs)
|
|
req.environ['swift.cache'] = FakeMemcache()
|
|
return req
|
|
|
|
def test_reseller_prefix_init(self):
|
|
app = FakeApp()
|
|
ath = auth.filter_factory({})(app)
|
|
self.assertEquals(ath.reseller_prefix, 'AUTH_')
|
|
ath = auth.filter_factory({'reseller_prefix': 'TEST'})(app)
|
|
self.assertEquals(ath.reseller_prefix, 'TEST_')
|
|
ath = auth.filter_factory({'reseller_prefix': 'TEST_'})(app)
|
|
self.assertEquals(ath.reseller_prefix, 'TEST_')
|
|
|
|
def test_auth_prefix_init(self):
|
|
app = FakeApp()
|
|
ath = auth.filter_factory({})(app)
|
|
self.assertEquals(ath.auth_prefix, '/auth/')
|
|
ath = auth.filter_factory({'auth_prefix': ''})(app)
|
|
self.assertEquals(ath.auth_prefix, '/auth/')
|
|
ath = auth.filter_factory({'auth_prefix': '/'})(app)
|
|
self.assertEquals(ath.auth_prefix, '/auth/')
|
|
ath = auth.filter_factory({'auth_prefix': '/test/'})(app)
|
|
self.assertEquals(ath.auth_prefix, '/test/')
|
|
ath = auth.filter_factory({'auth_prefix': '/test'})(app)
|
|
self.assertEquals(ath.auth_prefix, '/test/')
|
|
ath = auth.filter_factory({'auth_prefix': 'test/'})(app)
|
|
self.assertEquals(ath.auth_prefix, '/test/')
|
|
ath = auth.filter_factory({'auth_prefix': 'test'})(app)
|
|
self.assertEquals(ath.auth_prefix, '/test/')
|
|
|
|
def test_top_level_deny(self):
|
|
req = self._make_request('/')
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
self.test_auth.denied_response)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="unknown"')
|
|
|
|
def test_anon(self):
|
|
req = self._make_request('/v1/AUTH_account')
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
self.test_auth.authorize)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_account"')
|
|
|
|
def test_anon_badpath(self):
|
|
req = self._make_request('/v1')
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="unknown"')
|
|
|
|
def test_override_asked_for_but_not_allowed(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'allow_overrides': 'false'})(FakeApp())
|
|
req = self._make_request('/v1/AUTH_account',
|
|
environ={'swift.authorize_override': True})
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_account"')
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
self.test_auth.authorize)
|
|
|
|
def test_override_asked_for_and_allowed(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'allow_overrides': 'true'})(FakeApp())
|
|
req = self._make_request('/v1/AUTH_account',
|
|
environ={'swift.authorize_override': True})
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 404)
|
|
self.assertTrue('swift.authorize' not in req.environ)
|
|
|
|
def test_override_default_allowed(self):
|
|
req = self._make_request('/v1/AUTH_account',
|
|
environ={'swift.authorize_override': True})
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 404)
|
|
self.assertTrue('swift.authorize' not in req.environ)
|
|
|
|
def test_auth_deny_non_reseller_prefix(self):
|
|
req = self._make_request('/v1/BLAH_account',
|
|
headers={'X-Auth-Token': 'BLAH_t'})
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="BLAH_account"')
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
self.test_auth.denied_response)
|
|
|
|
def test_auth_deny_non_reseller_prefix_no_override(self):
|
|
fake_authorize = lambda x: Response(status='500 Fake')
|
|
req = self._make_request('/v1/BLAH_account',
|
|
headers={'X-Auth-Token': 'BLAH_t'},
|
|
environ={'swift.authorize': fake_authorize}
|
|
)
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 500)
|
|
self.assertEquals(req.environ['swift.authorize'], fake_authorize)
|
|
|
|
def test_auth_no_reseller_prefix_deny(self):
|
|
# Ensures that when we have no reseller prefix, we don't deny a request
|
|
# outright but set up a denial swift.authorize and pass the request on
|
|
# down the chain.
|
|
local_app = FakeApp()
|
|
local_auth = auth.filter_factory({'reseller_prefix': ''})(local_app)
|
|
req = self._make_request('/v1/account',
|
|
headers={'X-Auth-Token': 't'})
|
|
resp = req.get_response(local_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="account"')
|
|
self.assertEquals(local_app.calls, 1)
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
local_auth.denied_response)
|
|
|
|
def test_auth_reseller_prefix_with_s3_deny(self):
|
|
# Ensures that when we have a reseller prefix and using a middleware
|
|
# relying on Http-Authorization (for example swift3), we don't deny a
|
|
# request outright but set up a denial swift.authorize and pass the
|
|
# request on down the chain.
|
|
local_app = FakeApp()
|
|
local_auth = auth.filter_factory({'reseller_prefix': 'PRE'})(local_app)
|
|
req = self._make_request('/v1/account',
|
|
headers={'X-Auth-Token': 't',
|
|
'Authorization': 'AWS user:pw'})
|
|
resp = req.get_response(local_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(local_app.calls, 1)
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
local_auth.denied_response)
|
|
|
|
def test_auth_no_reseller_prefix_no_token(self):
|
|
# Check that normally we set up a call back to our authorize.
|
|
local_auth = auth.filter_factory({'reseller_prefix': ''})(FakeApp())
|
|
req = self._make_request('/v1/account')
|
|
resp = req.get_response(local_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="account"')
|
|
self.assertEquals(req.environ['swift.authorize'],
|
|
local_auth.authorize)
|
|
# Now make sure we don't override an existing swift.authorize when we
|
|
# have no reseller prefix.
|
|
local_auth = \
|
|
auth.filter_factory({'reseller_prefix': ''})(FakeApp())
|
|
local_authorize = lambda req: Response('test')
|
|
req = self._make_request('/v1/account', environ={'swift.authorize':
|
|
local_authorize})
|
|
resp = req.get_response(local_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertEquals(req.environ['swift.authorize'], local_authorize)
|
|
|
|
def test_auth_fail(self):
|
|
resp = self._make_request(
|
|
'/v1/AUTH_cfa',
|
|
headers={'X-Auth-Token': 'AUTH_t'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
|
|
def test_authorize_bad_path(self):
|
|
req = self._make_request('/badpath')
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="unknown"')
|
|
req = self._make_request('/badpath')
|
|
req.remote_user = 'act:usr,act,AUTH_cfa'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_authorize_account_access(self):
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act,AUTH_cfa'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_authorize_acl_group_access(self):
|
|
self.test_auth = auth.filter_factory({})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 3)))
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = 'act'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = 'act:usr'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = 'act2'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
req = self._make_request('/v1/AUTH_cfa')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = 'act:usr2'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_deny_cross_reseller(self):
|
|
# Tests that cross-reseller is denied, even if ACLs/group names match
|
|
req = self._make_request('/v1/OTHER_cfa')
|
|
req.remote_user = 'act:usr,act,AUTH_cfa'
|
|
req.acl = 'act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_authorize_acl_referer_after_user_groups(self):
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr'
|
|
req.acl = '.r:*,act:usr'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
|
|
def test_authorize_acl_referrer_access(self):
|
|
self.test_auth = auth.filter_factory({})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 6)))
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr,act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = '.r:*,.rlistings'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = '.r:*' # No listings allowed
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr,act'
|
|
req.acl = '.r:.example.com,.rlistings'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.remote_user = 'act:usr,act'
|
|
req.referer = 'http://www.example.com/index.html'
|
|
req.acl = '.r:.example.com,.rlistings'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.acl = '.r:*,.rlistings'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.acl = '.r:*' # No listings allowed
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.acl = '.r:.example.com,.rlistings'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
req = self._make_request('/v1/AUTH_cfa/c')
|
|
req.referer = 'http://www.example.com/index.html'
|
|
req.acl = '.r:.example.com,.rlistings'
|
|
self.assertEquals(self.test_auth.authorize(req), None)
|
|
|
|
def test_detect_reseller_request(self):
|
|
req = self._make_request('/v1/AUTH_admin',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
cache_key = 'AUTH_/token/AUTH_t'
|
|
cache_entry = (time() + 3600, '.reseller_admin')
|
|
req.environ['swift.cache'].set(cache_key, cache_entry)
|
|
req.get_response(self.test_auth)
|
|
self.assertTrue(req.environ.get('reseller_request', False))
|
|
|
|
def test_account_put_permissions(self):
|
|
self.test_auth = auth.filter_factory({})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 4)))
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'PUT'})
|
|
req.remote_user = 'act:usr,act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'PUT'})
|
|
req.remote_user = 'act:usr,act,AUTH_other'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
# Even PUTs to your own account as account admin should fail
|
|
req = self._make_request('/v1/AUTH_old',
|
|
environ={'REQUEST_METHOD': 'PUT'})
|
|
req.remote_user = 'act:usr,act,AUTH_old'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'PUT'})
|
|
req.remote_user = 'act:usr,act,.reseller_admin'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp, None)
|
|
|
|
# .super_admin is not something the middleware should ever see or care
|
|
# about
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'PUT'})
|
|
req.remote_user = 'act:usr,act,.super_admin'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_account_delete_permissions(self):
|
|
self.test_auth = auth.filter_factory({})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 4)))
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'DELETE'})
|
|
req.remote_user = 'act:usr,act'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'DELETE'})
|
|
req.remote_user = 'act:usr,act,AUTH_other'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
# Even DELETEs to your own account as account admin should fail
|
|
req = self._make_request('/v1/AUTH_old',
|
|
environ={'REQUEST_METHOD': 'DELETE'})
|
|
req.remote_user = 'act:usr,act,AUTH_old'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'DELETE'})
|
|
req.remote_user = 'act:usr,act,.reseller_admin'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp, None)
|
|
|
|
# .super_admin is not something the middleware should ever see or care
|
|
# about
|
|
req = self._make_request('/v1/AUTH_new',
|
|
environ={'REQUEST_METHOD': 'DELETE'})
|
|
req.remote_user = 'act:usr,act,.super_admin'
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_get_token_success(self):
|
|
# Example of how to simulate the auth transaction
|
|
test_auth = auth.filter_factory({'user_ac_user': 'testing'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'ac:user', 'X-Auth-Key': 'testing'})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertTrue(resp.headers['x-storage-url'].endswith('/v1/AUTH_ac'))
|
|
self.assertTrue(resp.headers['x-auth-token'].startswith('AUTH_'))
|
|
self.assertTrue(len(resp.headers['x-auth-token']) > 10)
|
|
|
|
def test_use_token_success(self):
|
|
# Example of how to simulate an authorized request
|
|
test_auth = auth.filter_factory({'user_acct_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 1)))
|
|
req = self._make_request('/v1/AUTH_acct',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
cache_key = 'AUTH_/token/AUTH_t'
|
|
cache_entry = (time() + 3600, 'AUTH_acct')
|
|
req.environ['swift.cache'].set(cache_key, cache_entry)
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_get_token_fail(self):
|
|
resp = self._make_request('/auth/v1.0').get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="unknown"')
|
|
resp = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'act:usr',
|
|
'X-Auth-Key': 'key'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertTrue('Www-Authenticate' in resp.headers)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="act"')
|
|
|
|
def test_get_token_fail_invalid_x_auth_user_format(self):
|
|
resp = self._make_request(
|
|
'/auth/v1/act/auth',
|
|
headers={'X-Auth-User': 'usr',
|
|
'X-Auth-Key': 'key'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="act"')
|
|
|
|
def test_get_token_fail_non_matching_account_in_request(self):
|
|
resp = self._make_request(
|
|
'/auth/v1/act/auth',
|
|
headers={'X-Auth-User': 'act2:usr',
|
|
'X-Auth-Key': 'key'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="act"')
|
|
|
|
def test_get_token_fail_bad_path(self):
|
|
resp = self._make_request(
|
|
'/auth/v1/act/auth/invalid',
|
|
headers={'X-Auth-User': 'act:usr',
|
|
'X-Auth-Key': 'key'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 400)
|
|
|
|
def test_get_token_fail_missing_key(self):
|
|
resp = self._make_request(
|
|
'/auth/v1/act/auth',
|
|
headers={'X-Auth-User': 'act:usr'}).get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="act"')
|
|
|
|
def test_object_name_containing_slash(self):
|
|
test_auth = auth.filter_factory({'user_acct_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 1)))
|
|
req = self._make_request('/v1/AUTH_acct/cont/obj/name/with/slash',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
cache_key = 'AUTH_/token/AUTH_t'
|
|
cache_entry = (time() + 3600, 'AUTH_acct')
|
|
req.environ['swift.cache'].set(cache_key, cache_entry)
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_storage_url_default(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
del req.environ['HTTP_HOST']
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertEquals(resp.headers['x-storage-url'],
|
|
'http://bob:1234/v1/AUTH_test')
|
|
|
|
def test_storage_url_based_on_host(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
req.environ['HTTP_HOST'] = 'somehost:5678'
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertEquals(resp.headers['x-storage-url'],
|
|
'http://somehost:5678/v1/AUTH_test')
|
|
|
|
def test_storage_url_overridden_scheme(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing',
|
|
'storage_url_scheme': 'fake'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
req.environ['HTTP_HOST'] = 'somehost:5678'
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertEquals(resp.headers['x-storage-url'],
|
|
'fake://somehost:5678/v1/AUTH_test')
|
|
|
|
def test_use_old_token_from_memcached(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing',
|
|
'storage_url_scheme': 'fake'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
req.environ['HTTP_HOST'] = 'somehost:5678'
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
|
|
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
|
|
(time() + 180, 'test,test:tester'))
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertEquals(resp.headers['x-auth-token'], 'uuid_token')
|
|
|
|
def test_old_token_overdate(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing',
|
|
'storage_url_scheme': 'fake'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
req.environ['HTTP_HOST'] = 'somehost:5678'
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
|
|
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
|
|
(0, 'test,test:tester'))
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertNotEquals(resp.headers['x-auth-token'], 'uuid_token')
|
|
self.assertEquals(resp.headers['x-auth-token'][:7], 'AUTH_tk')
|
|
|
|
def test_old_token_with_old_data(self):
|
|
self.test_auth = \
|
|
auth.filter_factory({'user_test_tester': 'testing',
|
|
'storage_url_scheme': 'fake'})(FakeApp())
|
|
req = self._make_request(
|
|
'/auth/v1.0',
|
|
headers={'X-Auth-User': 'test:tester', 'X-Auth-Key': 'testing'})
|
|
req.environ['HTTP_HOST'] = 'somehost:5678'
|
|
req.environ['SERVER_NAME'] = 'bob'
|
|
req.environ['SERVER_PORT'] = '1234'
|
|
req.environ['swift.cache'].set('AUTH_/user/test:tester', 'uuid_token')
|
|
req.environ['swift.cache'].set('AUTH_/token/uuid_token',
|
|
(time() + 99, 'test,test:tester,.role'))
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 200)
|
|
self.assertNotEquals(resp.headers['x-auth-token'], 'uuid_token')
|
|
self.assertEquals(resp.headers['x-auth-token'][:7], 'AUTH_tk')
|
|
|
|
def test_reseller_admin_is_owner(self):
|
|
orig_authorize = self.test_auth.authorize
|
|
owner_values = []
|
|
|
|
def mitm_authorize(req):
|
|
rv = orig_authorize(req)
|
|
owner_values.append(req.environ.get('swift_owner', False))
|
|
return rv
|
|
|
|
self.test_auth.authorize = mitm_authorize
|
|
|
|
req = self._make_request('/v1/AUTH_cfa',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
req.remote_user = '.reseller_admin'
|
|
self.test_auth.authorize(req)
|
|
self.assertEquals(owner_values, [True])
|
|
|
|
def test_admin_is_owner(self):
|
|
orig_authorize = self.test_auth.authorize
|
|
owner_values = []
|
|
|
|
def mitm_authorize(req):
|
|
rv = orig_authorize(req)
|
|
owner_values.append(req.environ.get('swift_owner', False))
|
|
return rv
|
|
|
|
self.test_auth.authorize = mitm_authorize
|
|
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
req.remote_user = 'AUTH_cfa'
|
|
self.test_auth.authorize(req)
|
|
self.assertEquals(owner_values, [True])
|
|
|
|
def test_regular_is_not_owner(self):
|
|
orig_authorize = self.test_auth.authorize
|
|
owner_values = []
|
|
|
|
def mitm_authorize(req):
|
|
rv = orig_authorize(req)
|
|
owner_values.append(req.environ.get('swift_owner', False))
|
|
return rv
|
|
|
|
self.test_auth.authorize = mitm_authorize
|
|
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c',
|
|
headers={'X-Auth-Token': 'AUTH_t'})
|
|
req.remote_user = 'act:usr'
|
|
self.test_auth.authorize(req)
|
|
self.assertEquals(owner_values, [False])
|
|
|
|
def test_sync_request_success(self):
|
|
self.test_auth.app = FakeApp(iter(NO_CONTENT_RESP * 1),
|
|
sync_key='secret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret',
|
|
'x-timestamp': '123.456'})
|
|
req.remote_addr = '127.0.0.1'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_sync_request_fail_key(self):
|
|
self.test_auth.app = FakeApp(sync_key='secret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'wrongsecret',
|
|
'x-timestamp': '123.456'})
|
|
req.remote_addr = '127.0.0.1'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
|
|
self.test_auth.app = FakeApp(sync_key='othersecret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret',
|
|
'x-timestamp': '123.456'})
|
|
req.remote_addr = '127.0.0.1'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
|
|
self.test_auth.app = FakeApp(sync_key=None)
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret',
|
|
'x-timestamp': '123.456'})
|
|
req.remote_addr = '127.0.0.1'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
|
|
def test_sync_request_fail_no_timestamp(self):
|
|
self.test_auth.app = FakeApp(sync_key='secret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret'})
|
|
req.remote_addr = '127.0.0.1'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="AUTH_cfa"')
|
|
|
|
def test_sync_request_success_lb_sync_host(self):
|
|
self.test_auth.app = FakeApp(iter(NO_CONTENT_RESP * 1),
|
|
sync_key='secret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret',
|
|
'x-timestamp': '123.456',
|
|
'x-forwarded-for': '127.0.0.1'})
|
|
req.remote_addr = '127.0.0.2'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
self.test_auth.app = FakeApp(iter(NO_CONTENT_RESP * 1),
|
|
sync_key='secret')
|
|
req = self._make_request(
|
|
'/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'DELETE'},
|
|
headers={'x-container-sync-key': 'secret',
|
|
'x-timestamp': '123.456',
|
|
'x-cluster-client-ip': '127.0.0.1'})
|
|
req.remote_addr = '127.0.0.2'
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_options_call(self):
|
|
req = self._make_request('/v1/AUTH_cfa/c/o',
|
|
environ={'REQUEST_METHOD': 'OPTIONS'})
|
|
resp = self.test_auth.authorize(req)
|
|
self.assertEquals(resp, None)
|
|
|
|
def test_get_user_group(self):
|
|
app = FakeApp()
|
|
ath = auth.filter_factory({})(app)
|
|
|
|
ath.users = {'test:tester': {'groups': ['.admin']}}
|
|
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
|
|
self.assertEquals(groups, 'test,test:tester,AUTH_test')
|
|
|
|
ath.users = {'test:tester': {'groups': []}}
|
|
groups = ath._get_user_groups('test', 'test:tester', 'AUTH_test')
|
|
self.assertEquals(groups, 'test,test:tester')
|
|
|
|
def test_auth_scheme(self):
|
|
req = self._make_request('/v1/BLAH_account',
|
|
headers={'X-Auth-Token': 'BLAH_t'})
|
|
resp = req.get_response(self.test_auth)
|
|
self.assertEquals(resp.status_int, 401)
|
|
self.assertTrue('Www-Authenticate' in resp.headers)
|
|
self.assertEquals(resp.headers.get('Www-Authenticate'),
|
|
'Swift realm="BLAH_account"')
|
|
|
|
|
|
class TestParseUserCreation(unittest.TestCase):
|
|
def test_parse_user_creation(self):
|
|
auth_filter = auth.filter_factory({
|
|
'reseller_prefix': 'ABC',
|
|
'user_test_tester3': 'testing',
|
|
'user_has_url': 'urlly .admin http://a.b/v1/DEF_has',
|
|
'user_admin_admin': 'admin .admin .reseller_admin',
|
|
})(FakeApp())
|
|
self.assertEquals(auth_filter.users, {
|
|
'admin:admin': {
|
|
'url': '$HOST/v1/ABC_admin',
|
|
'groups': ['.admin', '.reseller_admin'],
|
|
'key': 'admin'
|
|
}, 'test:tester3': {
|
|
'url': '$HOST/v1/ABC_test',
|
|
'groups': [],
|
|
'key': 'testing'
|
|
}, 'has:url': {
|
|
'url': 'http://a.b/v1/DEF_has',
|
|
'groups': ['.admin'],
|
|
'key': 'urlly'
|
|
},
|
|
})
|
|
|
|
def test_base64_encoding(self):
|
|
auth_filter = auth.filter_factory({
|
|
'reseller_prefix': 'ABC',
|
|
'user64_%s_%s' % (
|
|
b64encode('test').rstrip('='),
|
|
b64encode('tester3').rstrip('=')):
|
|
'testing .reseller_admin',
|
|
'user64_%s_%s' % (
|
|
b64encode('user_foo').rstrip('='),
|
|
b64encode('ab').rstrip('=')):
|
|
'urlly .admin http://a.b/v1/DEF_has',
|
|
})(FakeApp())
|
|
self.assertEquals(auth_filter.users, {
|
|
'test:tester3': {
|
|
'url': '$HOST/v1/ABC_test',
|
|
'groups': ['.reseller_admin'],
|
|
'key': 'testing'
|
|
}, 'user_foo:ab': {
|
|
'url': 'http://a.b/v1/DEF_has',
|
|
'groups': ['.admin'],
|
|
'key': 'urlly'
|
|
},
|
|
})
|
|
|
|
def test_key_with_no_value(self):
|
|
self.assertRaises(ValueError, auth.filter_factory({
|
|
'user_test_tester3': 'testing',
|
|
'user_bob_bobby': '',
|
|
'user_admin_admin': 'admin .admin .reseller_admin',
|
|
}), FakeApp())
|
|
|
|
|
|
class TestAccountAcls(unittest.TestCase):
|
|
def _make_request(self, path, **kwargs):
|
|
# Our TestAccountAcls default request will have a valid auth token
|
|
version, acct, _ = split_path(path, 1, 3, True)
|
|
headers = kwargs.pop('headers', {'X-Auth-Token': 'AUTH_t'})
|
|
user_groups = kwargs.pop('user_groups', 'AUTH_firstacct')
|
|
|
|
# The account being accessed will have account ACLs
|
|
acl = {'admin': ['AUTH_admin'], 'read-write': ['AUTH_rw'],
|
|
'read-only': ['AUTH_ro']}
|
|
header_data = {'core-access-control':
|
|
format_acl(version=2, acl_dict=acl)}
|
|
acls = kwargs.pop('acls', header_data)
|
|
|
|
req = Request.blank(path, headers=headers, **kwargs)
|
|
|
|
# Authorize the token by populating the request's cache
|
|
req.environ['swift.cache'] = FakeMemcache()
|
|
cache_key = 'AUTH_/token/AUTH_t'
|
|
cache_entry = (time() + 3600, user_groups)
|
|
req.environ['swift.cache'].set(cache_key, cache_entry)
|
|
|
|
# Pretend get_account_info returned ACLs in sysmeta, and we cached that
|
|
cache_key = 'account/%s' % acct
|
|
cache_entry = {'sysmeta': acls}
|
|
req.environ['swift.cache'].set(cache_key, cache_entry)
|
|
|
|
return req
|
|
|
|
def test_account_acl_success(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 1)))
|
|
|
|
# admin (not a swift admin) wants to read from otheracct
|
|
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_admin")
|
|
|
|
# The request returned by _make_request should be allowed
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_account_acl_failures(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp())
|
|
|
|
# If I'm not authed as anyone on the ACLs, I shouldn't get in
|
|
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_bob")
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
# If the target account has no ACLs, a non-owner shouldn't get in
|
|
req = self._make_request('/v1/AUTH_otheract', user_groups="AUTH_admin",
|
|
acls={})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
def test_admin_privileges(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 18)))
|
|
|
|
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/container',
|
|
'/v1/AUTH_otheracct/container/obj'):
|
|
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
|
|
# Admin ACL user can do anything
|
|
req = self._make_request(target, user_groups="AUTH_admin",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
# swift_owner should be set to True
|
|
if method != 'OPTIONS':
|
|
self.assertTrue(req.environ.get('swift_owner'))
|
|
|
|
def test_readwrite_privileges(self):
|
|
test_auth = auth.filter_factory({'user_rw_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 15)))
|
|
|
|
for target in ('/v1/AUTH_otheracct',):
|
|
for method in ('GET', 'HEAD', 'OPTIONS'):
|
|
# Read-Write user can read account data
|
|
req = self._make_request(target, user_groups="AUTH_rw",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
# swift_owner should NOT be set to True
|
|
self.assertFalse(req.environ.get('swift_owner'))
|
|
|
|
# RW user should NOT be able to PUT, POST, or DELETE to the account
|
|
for method in ('PUT', 'POST', 'DELETE'):
|
|
req = self._make_request(target, user_groups="AUTH_rw",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 403)
|
|
|
|
# RW user should be able to GET, PUT, POST, or DELETE to containers
|
|
# and objects
|
|
for target in ('/v1/AUTH_otheracct/c', '/v1/AUTH_otheracct/c/o'):
|
|
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
|
|
req = self._make_request(target, user_groups="AUTH_rw",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
def test_readonly_privileges(self):
|
|
test_auth = auth.filter_factory({'user_ro_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 9)))
|
|
|
|
# ReadOnly user should NOT be able to PUT, POST, or DELETE to account,
|
|
# container, or object
|
|
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/cont',
|
|
'/v1/AUTH_otheracct/cont/obj'):
|
|
for method in ('GET', 'HEAD', 'OPTIONS'):
|
|
req = self._make_request(target, user_groups="AUTH_ro",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
# swift_owner should NOT be set to True for the ReadOnly ACL
|
|
self.assertFalse(req.environ.get('swift_owner'))
|
|
for method in ('PUT', 'POST', 'DELETE'):
|
|
req = self._make_request(target, user_groups="AUTH_ro",
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 403)
|
|
# swift_owner should NOT be set to True for the ReadOnly ACL
|
|
self.assertFalse(req.environ.get('swift_owner'))
|
|
|
|
def test_user_gets_best_acl(self):
|
|
test_auth = auth.filter_factory({'user_acct_username': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 18)))
|
|
|
|
mygroups = "AUTH_acct,AUTH_ro,AUTH_something,AUTH_admin"
|
|
for target in ('/v1/AUTH_otheracct', '/v1/AUTH_otheracct/container',
|
|
'/v1/AUTH_otheracct/container/obj'):
|
|
for method in ('GET', 'HEAD', 'OPTIONS', 'PUT', 'POST', 'DELETE'):
|
|
# Admin ACL user can do anything
|
|
req = self._make_request(target, user_groups=mygroups,
|
|
environ={'REQUEST_METHOD': method})
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(
|
|
resp.status_int, 204, "%s (%s) - expected 204, got %d" %
|
|
(target, method, resp.status_int))
|
|
|
|
# swift_owner should be set to True
|
|
if method != 'OPTIONS':
|
|
self.assertTrue(req.environ.get('swift_owner'))
|
|
|
|
def test_acl_syntax_verification(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 3)))
|
|
|
|
good_headers = {'X-Auth-Token': 'AUTH_t'}
|
|
good_acl = '{"read-only":["a","b"]}'
|
|
bad_acl = 'syntactically invalid acl -- this does not parse as JSON'
|
|
wrong_acl = '{"other-auth-system":["valid","json","but","wrong"]}'
|
|
bad_value_acl = '{"read-write":["fine"],"admin":"should be a list"}'
|
|
target = '/v1/AUTH_firstacct'
|
|
|
|
# no acls -- no problem!
|
|
req = self._make_request(target, headers=good_headers)
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
# syntactically valid acls should go through
|
|
update = {'x-account-access-control': good_acl}
|
|
req = self._make_request(target, headers=dict(good_headers, **update))
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
|
|
errmsg = 'X-Account-Access-Control invalid: %s'
|
|
# syntactically invalid acls get a 400
|
|
update = {'x-account-access-control': bad_acl}
|
|
req = self._make_request(target, headers=dict(good_headers, **update))
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 400)
|
|
self.assertEquals(errmsg % "Syntax error", resp.body[:46])
|
|
|
|
# syntactically valid acls with bad keys also get a 400
|
|
update = {'x-account-access-control': wrong_acl}
|
|
req = self._make_request(target, headers=dict(good_headers, **update))
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 400)
|
|
self.assertEquals(errmsg % "Key '", resp.body[:39])
|
|
|
|
# acls with good keys but bad values also get a 400
|
|
update = {'x-account-access-control': bad_value_acl}
|
|
req = self._make_request(target, headers=dict(good_headers, **update))
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 400)
|
|
self.assertEquals(errmsg % "Value", resp.body[:39])
|
|
|
|
def test_acls_propagate_to_sysmeta(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 3)))
|
|
|
|
sysmeta_hdr = 'x-account-sysmeta-core-access-control'
|
|
target = '/v1/AUTH_firstacct'
|
|
good_headers = {'X-Auth-Token': 'AUTH_t'}
|
|
good_acl = '{"read-only":["a","b"]}'
|
|
|
|
# no acls -- no problem!
|
|
req = self._make_request(target, headers=good_headers)
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
self.assertEqual(None, req.headers.get(sysmeta_hdr))
|
|
|
|
# syntactically valid acls should go through
|
|
update = {'x-account-access-control': good_acl}
|
|
req = self._make_request(target, headers=dict(good_headers, **update))
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 204)
|
|
self.assertEqual(good_acl, req.headers.get(sysmeta_hdr))
|
|
|
|
def test_bad_acls_get_denied(self):
|
|
test_auth = auth.filter_factory({'user_admin_user': 'testing'})(
|
|
FakeApp(iter(NO_CONTENT_RESP * 3)))
|
|
|
|
target = '/v1/AUTH_firstacct'
|
|
good_headers = {'X-Auth-Token': 'AUTH_t'}
|
|
bad_acls = (
|
|
'syntax error',
|
|
'{"bad_key":"should_fail"}',
|
|
'{"admin":"not a list, should fail"}',
|
|
'{"admin":["valid"],"read-write":"not a list, should fail"}',
|
|
)
|
|
|
|
for bad_acl in bad_acls:
|
|
hdrs = dict(good_headers, **{'x-account-access-control': bad_acl})
|
|
req = self._make_request(target, headers=hdrs)
|
|
resp = req.get_response(test_auth)
|
|
self.assertEquals(resp.status_int, 400)
|
|
|
|
|
|
class TestUtilityMethods(unittest.TestCase):
|
|
def test_account_acls_bad_path_raises_exception(self):
|
|
auth_inst = auth.filter_factory({})(FakeApp())
|
|
req = Request({'PATH_INFO': '/'})
|
|
self.assertRaises(ValueError, auth_inst.account_acls, req)
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|