diff --git a/swift/common/middleware/keystoneauth.py b/swift/common/middleware/keystoneauth.py index a78e66fd06..b7cdd347cb 100644 --- a/swift/common/middleware/keystoneauth.py +++ b/swift/common/middleware/keystoneauth.py @@ -140,10 +140,34 @@ class KeystoneAuth(object): """Check reseller prefix.""" return account == self._get_account_for_tenant(tenant_id) + def _authorize_cross_tenant(self, user, tenant_id, tenant_name, roles): + """ Check cross-tenant ACLs + + Match tenant_id:user, tenant_name:user, and *:user. + + :param user: The user name from the identity token. + :param tenant_id: The tenant ID from the identity token. + :param tenant_name: The tenant name from the identity token. + :param roles: The given container ACL. + + :returns: True if tenant_id:user, tenant_name:user, or *:user matches + the given ACL. False otherwise. + + """ + wildcard_tenant_match = '*:%s' % (user) + tenant_id_user_match = '%s:%s' % (tenant_id, user) + tenant_name_user_match = '%s:%s' % (tenant_name, user) + + return (wildcard_tenant_match in roles + or tenant_id_user_match in roles + or tenant_name_user_match in roles) + def authorize(self, req): env = req.environ env_identity = env.get('keystone.identity', {}) tenant_id, tenant_name = env_identity.get('tenant') + user = env_identity.get('user', '') + referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None)) try: part = swift_utils.split_path(req.path, 1, 4, True) @@ -161,6 +185,13 @@ class KeystoneAuth(object): req.environ['swift_owner'] = True return + # cross-tenant authorization + if self._authorize_cross_tenant(user, tenant_id, tenant_name, roles): + log_msg = 'user %s:%s, %s:%s, or *:%s allowed in ACL authorizing' + self.logger.debug(log_msg % (tenant_name, user, + tenant_id, user, user)) + return + # Check if a user tries to access an account that does not match their # token if not self._reseller_check(account, tenant_id): @@ -181,13 +212,10 @@ class KeystoneAuth(object): return # If user is of the same name of the tenant then make owner of it. - user = env_identity.get('user', '') if self.is_admin and user == tenant_name: req.environ['swift_owner'] = True return - referrers, roles = swift_acl.parse_acl(getattr(req, 'acl', None)) - authorized = self._authorize_unconfirmed_identity(req, obj, referrers, roles) if authorized: @@ -195,14 +223,6 @@ class KeystoneAuth(object): elif authorized is not None: return self.denied_response(req) - # Allow ACL at individual user level (tenant:user format) - # For backward compatibility, check for ACL in tenant_id:user format - if ('%s:%s' % (tenant_name, user) in roles - or '%s:%s' % (tenant_id, user) in roles): - log_msg = 'user %s:%s or %s:%s allowed in ACL authorizing' - self.logger.debug(log_msg % (tenant_name, user, tenant_id, user)) - return - # Check if we have the role in the userroles and allow it for user_role in user_roles: if user_role in roles: diff --git a/test/unit/common/middleware/test_keystoneauth.py b/test/unit/common/middleware/test_keystoneauth.py index 16fd8a4396..a40898b184 100644 --- a/test/unit/common/middleware/test_keystoneauth.py +++ b/test/unit/common/middleware/test_keystoneauth.py @@ -227,5 +227,23 @@ class TestAuthorize(unittest.TestCase): acl = '%s:%s' % (identity['tenant'][0], identity['user']) self._check_authenticate(identity=identity, acl=acl) + def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self): + identity = self._get_identity() + acl = '*:%s' % (identity['user']) + self._check_authenticate(identity=identity, acl=acl) + + def test_cross_tenant_authorization_success(self): + self.assertTrue(self.test_auth._authorize_cross_tenant('userA', + 'tenantID', 'tenantNAME', ['tenantID:userA'])) + self.assertTrue(self.test_auth._authorize_cross_tenant('userA', + 'tenantID', 'tenantNAME', ['tenantNAME:userA'])) + self.assertTrue(self.test_auth._authorize_cross_tenant('userA', + 'tenantID', 'tenantNAME', ['*:userA'])) + + def test_cross_tenant_authorization_failure(self): + self.assertFalse(self.test_auth._authorize_cross_tenant('userA', + 'tenantID', 'tenantNAME', ['tenantXYZ:userA'])) + + if __name__ == '__main__': unittest.main()