Merge "Improve Keystone v3 token support"

This commit is contained in:
Jenkins 2016-07-15 18:10:41 +00:00 committed by Gerrit Code Review
commit 5ace495562
2 changed files with 95 additions and 16 deletions

View File

@ -243,8 +243,10 @@ class KeystoneAuth(object):
service_roles = list_from_csv(environ.get('HTTP_X_SERVICE_ROLES', ''))
identity = {'user': (environ.get('HTTP_X_USER_ID'),
environ.get('HTTP_X_USER_NAME')),
'tenant': (environ.get('HTTP_X_TENANT_ID'),
environ.get('HTTP_X_TENANT_NAME')),
'tenant': (environ.get('HTTP_X_PROJECT_ID',
environ.get('HTTP_X_TENANT_ID')),
environ.get('HTTP_X_PROJECT_NAME',
environ.get('HTTP_X_TENANT_NAME'))),
'roles': roles,
'service_roles': service_roles}
token_info = environ.get('keystone.token_info', {})

View File

@ -550,7 +550,8 @@ class BaseTestAuthorize(unittest.TestCase):
if not identity:
identity = self._get_identity()
return get_account_for_tenant(self.test_auth,
identity['HTTP_X_TENANT_ID'])
identity.get('HTTP_X_PROJECT_ID') or
identity.get('HTTP_X_TENANT_ID'))
def _get_identity(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=None,
@ -564,13 +565,20 @@ class BaseTestAuthorize(unittest.TestCase):
'HTTP_X_USER_NAME': user_name,
'HTTP_X_USER_DOMAIN_NAME': user_domain_name,
'HTTP_X_USER_DOMAIN_ID': user_domain_id,
'HTTP_X_TENANT_ID': tenant_id,
'HTTP_X_TENANT_NAME': tenant_name,
'HTTP_X_PROJECT_ID': tenant_id,
'HTTP_X_PROJECT_NAME': tenant_name,
'HTTP_X_PROJECT_DOMAIN_ID': project_domain_id,
'HTTP_X_PROJECT_DOMAIN_NAME': project_domain_name,
'HTTP_X_ROLES': roles,
'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
def _get_identity_for_v2(self, **kwargs):
identity = self._get_identity(**kwargs)
for suffix in ['ID', 'NAME']:
identity['HTTP_X_TENANT_{0}'.format(suffix)] = identity.pop(
'HTTP_X_PROJECT_{0}'.format(suffix))
return identity
def _get_env_id(self, tenant_id='tenant_id', tenant_name='tenant_name',
user_id='user_id', user_name='user_name', roles=[],
project_domain_name='domA', project_domain_id='99',
@ -597,7 +605,8 @@ class TestAuthorize(BaseTestAuthorize):
# fake cached account info
info_key = get_cache_key(account)
default_env = {
'REMOTE_USER': identity['HTTP_X_TENANT_ID'],
'REMOTE_USER': (identity.get('HTTP_X_PROJECT_ID') or
identity.get('HTTP_X_TENANT_ID')),
'swift.infocache': {info_key: {'status': 200, 'sysmeta': {}}}}
default_env.update(identity)
if env:
@ -689,7 +698,7 @@ class TestAuthorize(BaseTestAuthorize):
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_name_user_in_roles(self):
identity = self._get_identity()
identity = self._get_identity_for_v2()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_name = identity['HTTP_X_TENANT_NAME']
@ -697,15 +706,33 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (tenant_name, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
def test_authorize_succeeds_for_project_name_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
project_name = identity['HTTP_X_PROJECT_NAME']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (project_name, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_tenant_id_user_in_roles(self):
identity = self._get_identity_for_v2()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
tenant_id = identity['HTTP_X_TENANT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (tenant_id, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_project_id_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
user_id = identity['HTTP_X_USER_ID']
project_id = identity['HTTP_X_PROJECT_ID']
for user in [user_id, user_name, '*']:
acl = '%s:%s' % (project_id, user)
self._check_authenticate(identity=identity, acl=acl)
def test_authorize_succeeds_for_wildcard_tenant_user_in_roles(self):
identity = self._get_identity()
user_name = identity['HTTP_X_USER_NAME']
@ -844,7 +871,7 @@ class TestAuthorize(BaseTestAuthorize):
self.assertEqual(authorize_resp, None)
def test_names_disallowed_in_acls_outside_default_domain(self):
id = self._get_identity(user_domain_id='non-default',
id = self._get_identity_for_v2(user_domain_id='non-default',
project_domain_id='non-default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
@ -859,8 +886,22 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='non-default',
project_domain_id='non-default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_allowed_in_acls_inside_default_domain(self):
id = self._get_identity(user_domain_id='default',
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
@ -872,11 +913,22 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_allowed_in_acls_inside_default_domain_with_config(self):
conf = {'allow_names_in_acls': 'yes'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.test_auth.logger = FakeLogger()
id = self._get_identity(user_domain_id='default',
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
@ -888,11 +940,22 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_names_disallowed_in_acls_inside_default_domain(self):
conf = {'allow_names_in_acls': 'false'}
self.test_auth = keystoneauth.filter_factory(conf)(FakeApp())
self.test_auth.logger = FakeLogger()
id = self._get_identity(user_domain_id='default',
id = self._get_identity_for_v2(user_domain_id='default',
project_domain_id='default')
env = {'keystone.token_info': _fake_token_info(version='3')}
acl = '%s:%s' % (id['HTTP_X_TENANT_NAME'], id['HTTP_X_USER_NAME'])
@ -907,6 +970,20 @@ class TestAuthorize(BaseTestAuthorize):
acl = '%s:%s' % (id['HTTP_X_TENANT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
id = self._get_identity(user_domain_id='default',
project_domain_id='default')
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_NAME'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_NAME'])
self._check_authenticate(acl=acl, identity=id, env=env,
exception=HTTP_FORBIDDEN)
acl = '%s:%s' % (id['HTTP_X_PROJECT_ID'], id['HTTP_X_USER_ID'])
self._check_authenticate(acl=acl, identity=id, env=env)
def test_keystone_identity(self):
user = ('U_ID', 'U_NAME')
roles = ('ROLE1', 'ROLE2')