granting and revoking privs to users and groups
The domain and groups are dropped for keystone v2. project is required for keystone v2 add a test that makes sure domains that don't exist raise an error Change-Id: I3313690c0f0bbf0fcd9fe1db2e46dcd3fb6dd3d0
This commit is contained in:
parent
b2deeefc8b
commit
e38eea3d8c
@ -0,0 +1,3 @@
|
||||
---
|
||||
features:
|
||||
- add granting and revoking of roles from groups and users
|
@ -682,6 +682,26 @@ class RoleDelete(task_manager.Task):
|
||||
return client.keystone_client.roles.delete(**self.args)
|
||||
|
||||
|
||||
class RoleAddUser(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.keystone_client.roles.add_user_role(**self.args)
|
||||
|
||||
|
||||
class RoleGrantUser(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.keystone_client.roles.grant(**self.args)
|
||||
|
||||
|
||||
class RoleRemoveUser(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.keystone_client.roles.remove_user_role(**self.args)
|
||||
|
||||
|
||||
class RoleRevokeUser(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.keystone_client.roles.revoke(**self.args)
|
||||
|
||||
|
||||
class RoleAssignmentList(task_manager.Task):
|
||||
def main(self, client):
|
||||
return client.keystone_client.role_assignments.list(**self.args)
|
||||
|
@ -448,6 +448,17 @@ def normalize_role_assignments(assignments):
|
||||
return new_assignments
|
||||
|
||||
|
||||
def normalize_roles(roles):
|
||||
"""Normalize Identity roles."""
|
||||
ret = [
|
||||
dict(
|
||||
id=role.get('id'),
|
||||
name=role.get('name'),
|
||||
) for role in roles
|
||||
]
|
||||
return meta.obj_list_to_dict(ret)
|
||||
|
||||
|
||||
def valid_kwargs(*valid_args):
|
||||
# This decorator checks if argument passed as **kwargs to a function are
|
||||
# present in valid_args.
|
||||
|
@ -1545,6 +1545,161 @@ class OperatorCloud(openstackcloud.OpenStackCloud):
|
||||
|
||||
return True
|
||||
|
||||
def _get_grant_revoke_params(self, role, user=None, group=None,
|
||||
project=None, domain=None):
|
||||
role = self.get_role(role)
|
||||
if role is None:
|
||||
return {}
|
||||
data = {'role': role.id}
|
||||
|
||||
# domain and group not available in keystone v2.0
|
||||
keystone_version = self.cloud_config.get_api_version('identity')
|
||||
is_keystone_v2 = keystone_version.startswith('2')
|
||||
|
||||
filters = {}
|
||||
if not is_keystone_v2 and domain:
|
||||
filters['domain_id'] = data['domain'] = \
|
||||
self.get_domain(domain)['id']
|
||||
|
||||
if user:
|
||||
data['user'] = self.get_user(user, filters=filters)
|
||||
|
||||
if project:
|
||||
# drop domain in favor of project
|
||||
data.pop('domain', None)
|
||||
data['project'] = self.get_project(project, filters=filters)
|
||||
|
||||
if not is_keystone_v2 and group:
|
||||
data['group'] = self.get_group(group, filters=filters)
|
||||
|
||||
return data
|
||||
|
||||
def grant_role(self, name_or_id, user=None, group=None,
|
||||
project=None, domain=None, wait=False, timeout=60):
|
||||
"""Grant a role to a user.
|
||||
|
||||
:param string name_or_id: The name or id of the role.
|
||||
:param string user: The name or id of the user.
|
||||
:param string group: The name or id of the group. (v3)
|
||||
:param string project: The name or id of the project.
|
||||
:param string domain: The name or id of the domain. (v3)
|
||||
:param bool wait: Wait for role to be granted
|
||||
:param int timeout: Timeout to wait for role to be granted
|
||||
|
||||
NOTE: for wait and timeout, sometimes granting roles is not
|
||||
instantaneous for granting roles.
|
||||
|
||||
NOTE: project is required for keystone v2
|
||||
|
||||
:returns: True if the role is assigned, otherwise False
|
||||
|
||||
:raise OpenStackCloudException: if the role cannot be granted
|
||||
"""
|
||||
data = self._get_grant_revoke_params(name_or_id, user, group,
|
||||
project, domain)
|
||||
filters = data.copy()
|
||||
if not data:
|
||||
raise OpenStackCloudException(
|
||||
'Role {0} not found.'.format(name_or_id))
|
||||
|
||||
if data.get('user') is not None and data.get('group') is not None:
|
||||
raise OpenStackCloudException(
|
||||
'Specify either a group or a user, not both')
|
||||
if data.get('user') is None and data.get('group') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify either a user or a group')
|
||||
if self.cloud_config.get_api_version('identity').startswith('2') and \
|
||||
data.get('project') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify project for keystone v2')
|
||||
|
||||
if self.list_role_assignments(filters=filters):
|
||||
self.log.debug('Assignment already exists')
|
||||
return False
|
||||
|
||||
with _utils.shade_exceptions(
|
||||
"Error granting access to role: {0}".format(
|
||||
data)):
|
||||
if self.cloud_config.get_api_version('identity').startswith('2'):
|
||||
data['tenant'] = data.pop('project')
|
||||
self.manager.submitTask(_tasks.RoleAddUser(**data))
|
||||
else:
|
||||
if data.get('project') is None and data.get('domain') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify either a domain or project')
|
||||
self.manager.submitTask(_tasks.RoleGrantUser(**data))
|
||||
if wait:
|
||||
for count in _utils._iterate_timeout(
|
||||
timeout,
|
||||
"Timeout waiting for role to be granted"):
|
||||
if self.list_role_assignments(filters=filters):
|
||||
break
|
||||
return True
|
||||
|
||||
def revoke_role(self, name_or_id, user=None, group=None,
|
||||
project=None, domain=None, wait=False, timeout=60):
|
||||
"""Revoke a role from a user.
|
||||
|
||||
:param string name_or_id: The name or id of the role.
|
||||
:param string user: The name or id of the user.
|
||||
:param string group: The name or id of the group. (v3)
|
||||
:param string project: The name or id of the project.
|
||||
:param string domain: The id of the domain. (v3)
|
||||
:param bool wait: Wait for role to be revoked
|
||||
:param int timeout: Timeout to wait for role to be revoked
|
||||
|
||||
NOTE: for wait and timeout, sometimes revoking roles is not
|
||||
instantaneous for revoking roles.
|
||||
|
||||
NOTE: project is required for keystone v2
|
||||
|
||||
:returns: True if the role is revoke, otherwise False
|
||||
|
||||
:raise OpenStackCloudException: if the role cannot be removed
|
||||
"""
|
||||
data = self._get_grant_revoke_params(name_or_id, user, group,
|
||||
project, domain)
|
||||
filters = data.copy()
|
||||
|
||||
if not data:
|
||||
raise OpenStackCloudException(
|
||||
'Role {0} not found.'.format(name_or_id))
|
||||
|
||||
if data.get('user') is not None and data.get('group') is not None:
|
||||
raise OpenStackCloudException(
|
||||
'Specify either a group or a user, not both')
|
||||
if data.get('user') is None and data.get('group') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify either a user or a group')
|
||||
if self.cloud_config.get_api_version('identity').startswith('2') and \
|
||||
data.get('project') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify project for keystone v2')
|
||||
|
||||
if not self.list_role_assignments(filters=filters):
|
||||
self.log.debug('Assignment does not exist')
|
||||
return False
|
||||
|
||||
with _utils.shade_exceptions(
|
||||
"Error revoking access to role: {0}".format(
|
||||
data)):
|
||||
if self.cloud_config.get_api_version('identity').startswith('2'):
|
||||
data['tenant'] = data.pop('project')
|
||||
self.manager.submitTask(_tasks.RoleRemoveUser(**data))
|
||||
else:
|
||||
if data.get('project') is None \
|
||||
and data.get('domain') is None:
|
||||
raise OpenStackCloudException(
|
||||
'Must specify either a domain or project')
|
||||
self.manager.submitTask(_tasks.RoleRevokeUser(**data))
|
||||
if wait:
|
||||
for count in _utils._iterate_timeout(
|
||||
timeout,
|
||||
"Timeout waiting for role to be revoked"):
|
||||
if not self.list_role_assignments(filters=filters):
|
||||
break
|
||||
return True
|
||||
|
||||
def list_hypervisors(self):
|
||||
"""List all hypervisors
|
||||
|
||||
|
@ -60,8 +60,9 @@ class FakeImage(object):
|
||||
|
||||
|
||||
class FakeProject(object):
|
||||
def __init__(self, id):
|
||||
def __init__(self, id, domain_id=None):
|
||||
self.id = id
|
||||
self.domain_id = domain_id or 'default'
|
||||
|
||||
|
||||
class FakeServer(object):
|
||||
@ -94,10 +95,12 @@ class FakeService(object):
|
||||
|
||||
|
||||
class FakeUser(object):
|
||||
def __init__(self, id, email, name):
|
||||
def __init__(self, id, email, name, domain_id=None):
|
||||
self.id = id
|
||||
self.email = email
|
||||
self.name = name
|
||||
if domain_id is not None:
|
||||
self.domain_id = domain_id
|
||||
|
||||
|
||||
class FakeVolume(object):
|
||||
@ -194,11 +197,11 @@ class FakeRole(object):
|
||||
|
||||
|
||||
class FakeGroup(object):
|
||||
def __init__(self, id, name, description, domain=None):
|
||||
def __init__(self, id, name, description, domain_id=None):
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.description = description
|
||||
self.domain = domain
|
||||
self.domain_id = domain_id or 'default'
|
||||
|
||||
|
||||
class FakeHypervisor(object):
|
||||
|
@ -33,8 +33,44 @@ class TestIdentity(base.TestCase):
|
||||
self.cloud = operator_cloud(cloud='devstack-admin')
|
||||
self.role_prefix = 'test_role' + ''.join(
|
||||
random.choice(string.ascii_lowercase) for _ in range(5))
|
||||
self.user_prefix = self.getUniqueString('user')
|
||||
self.group_prefix = self.getUniqueString('group')
|
||||
|
||||
self.addCleanup(self._cleanup_users)
|
||||
self.identity_version = \
|
||||
self.cloud.cloud_config.get_api_version('identity')
|
||||
if self.identity_version not in ('2', '2.0'):
|
||||
self.addCleanup(self._cleanup_groups)
|
||||
self.addCleanup(self._cleanup_roles)
|
||||
|
||||
def _cleanup_groups(self):
|
||||
exception_list = list()
|
||||
for group in self.cloud.list_groups():
|
||||
if group['name'].startswith(self.group_prefix):
|
||||
try:
|
||||
self.cloud.delete_group(group['id'])
|
||||
except Exception as e:
|
||||
exception_list.append(str(e))
|
||||
continue
|
||||
|
||||
if exception_list:
|
||||
# Raise an error: we must make users aware that something went
|
||||
# wrong
|
||||
raise OpenStackCloudException('\n'.join(exception_list))
|
||||
|
||||
def _cleanup_users(self):
|
||||
exception_list = list()
|
||||
for user in self.cloud.list_users():
|
||||
if user['name'].startswith(self.user_prefix):
|
||||
try:
|
||||
self.cloud.delete_user(user['id'])
|
||||
except Exception as e:
|
||||
exception_list.append(str(e))
|
||||
continue
|
||||
|
||||
if exception_list:
|
||||
raise OpenStackCloudException('\n'.join(exception_list))
|
||||
|
||||
def _cleanup_roles(self):
|
||||
exception_list = list()
|
||||
for role in self.cloud.list_roles():
|
||||
@ -48,6 +84,13 @@ class TestIdentity(base.TestCase):
|
||||
if exception_list:
|
||||
raise OpenStackCloudException('\n'.join(exception_list))
|
||||
|
||||
def _create_user(self, **kwargs):
|
||||
domain_id = None
|
||||
if self.identity_version not in ('2', '2.0'):
|
||||
domain = self.cloud.get_domain('default')
|
||||
domain_id = domain['id']
|
||||
return self.cloud.create_user(domain_id=domain_id, **kwargs)
|
||||
|
||||
def test_list_roles(self):
|
||||
roles = self.cloud.list_roles()
|
||||
self.assertIsNotNone(roles)
|
||||
@ -84,7 +127,7 @@ class TestIdentity(base.TestCase):
|
||||
# need to make this test a little more specific, and add more for testing
|
||||
# filtering functionality.
|
||||
def test_list_role_assignments(self):
|
||||
if self.cloud.cloud_config.get_api_version('identity') in ('2', '2.0'):
|
||||
if self.identity_version in ('2', '2.0'):
|
||||
self.skipTest("Identity service does not support role assignments")
|
||||
assignments = self.cloud.list_role_assignments()
|
||||
self.assertIsInstance(assignments, list)
|
||||
@ -94,6 +137,118 @@ class TestIdentity(base.TestCase):
|
||||
user = self.cloud.get_user('demo')
|
||||
project = self.cloud.get_project('demo')
|
||||
assignments = self.cloud.list_role_assignments(
|
||||
filters={'user': user.id, 'project': project.id})
|
||||
filters={'user': user['id'], 'project': project['id']})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) > 0)
|
||||
|
||||
def test_grant_revoke_role_user_project(self):
|
||||
user_name = self.user_prefix + '_user_project'
|
||||
user_email = 'nobody@nowhere.com'
|
||||
role_name = self.role_prefix + '_grant_user_project'
|
||||
role = self.cloud.create_role(role_name)
|
||||
user = self._create_user(name=user_name,
|
||||
email=user_email,
|
||||
default_project='demo')
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
role_name, user=user['id'], project='demo', wait=True))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'user': user['id'],
|
||||
'project': self.cloud.get_project('demo')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 1)
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
role_name, user=user['id'], project='demo', wait=True))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'user': user['id'],
|
||||
'project': self.cloud.get_project('demo')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 0)
|
||||
|
||||
def test_grant_revoke_role_group_project(self):
|
||||
if self.identity_version in ('2', '2.0'):
|
||||
self.skipTest("Identity service does not support group")
|
||||
role_name = self.role_prefix + '_grant_group_project'
|
||||
role = self.cloud.create_role(role_name)
|
||||
group_name = self.group_prefix + '_group_project'
|
||||
group = self.cloud.create_group(name=group_name,
|
||||
description='test group',
|
||||
domain='default')
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
role_name, group=group['id'], project='demo'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'group': group['id'],
|
||||
'project': self.cloud.get_project('demo')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 1)
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
role_name, group=group['id'], project='demo'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'group': group['id'],
|
||||
'project': self.cloud.get_project('demo')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 0)
|
||||
|
||||
def test_grant_revoke_role_user_domain(self):
|
||||
if self.identity_version in ('2', '2.0'):
|
||||
self.skipTest("Identity service does not support domain")
|
||||
role_name = self.role_prefix + '_grant_user_domain'
|
||||
role = self.cloud.create_role(role_name)
|
||||
user_name = self.user_prefix + '_user_domain'
|
||||
user_email = 'nobody@nowhere.com'
|
||||
user = self._create_user(name=user_name,
|
||||
email=user_email,
|
||||
default_project='demo')
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
role_name, user=user['id'], domain='default'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'user': user['id'],
|
||||
'domain': self.cloud.get_domain('default')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 1)
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
role_name, user=user['id'], domain='default'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'user': user['id'],
|
||||
'domain': self.cloud.get_domain('default')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 0)
|
||||
|
||||
def test_grant_revoke_role_group_domain(self):
|
||||
if self.identity_version in ('2', '2.0'):
|
||||
self.skipTest("Identity service does not support domain or group")
|
||||
role_name = self.role_prefix + '_grant_group_domain'
|
||||
role = self.cloud.create_role(role_name)
|
||||
group_name = self.group_prefix + '_group_domain'
|
||||
group = self.cloud.create_group(name=group_name,
|
||||
description='test group',
|
||||
domain='default')
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
role_name, group=group['id'], domain='default'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'group': group['id'],
|
||||
'domain': self.cloud.get_domain('default')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 1)
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
role_name, group=group['id'], domain='default'))
|
||||
assignments = self.cloud.list_role_assignments({
|
||||
'role': role['id'],
|
||||
'group': group['id'],
|
||||
'domain': self.cloud.get_domain('default')['id']
|
||||
})
|
||||
self.assertIsInstance(assignments, list)
|
||||
self.assertTrue(len(assignments) == 0)
|
||||
|
859
shade/tests/unit/test_role_assignment.py
Normal file
859
shade/tests/unit/test_role_assignment.py
Normal file
@ -0,0 +1,859 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mock import patch
|
||||
import os_client_config as occ
|
||||
from shade import OperatorCloud, operator_cloud
|
||||
from shade.exc import OpenStackCloudException, OpenStackCloudTimeout
|
||||
from shade.meta import obj_to_dict
|
||||
from shade.tests import base, fakes
|
||||
import testtools
|
||||
|
||||
|
||||
class TestRoleAssignment(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRoleAssignment, self).setUp()
|
||||
self.cloud = operator_cloud(validate=False)
|
||||
self.fake_role = obj_to_dict(fakes.FakeRole('12345', 'test'))
|
||||
self.fake_user = obj_to_dict(fakes.FakeUser('12345',
|
||||
'test@nobody.org',
|
||||
'test',
|
||||
domain_id='test-domain'))
|
||||
self.fake_group = obj_to_dict(fakes.FakeGroup('12345',
|
||||
'test',
|
||||
'test group',
|
||||
domain_id='test-domain'))
|
||||
self.fake_project = obj_to_dict(
|
||||
fakes.FakeProject('12345', domain_id='test-domain'))
|
||||
self.fake_domain = obj_to_dict(fakes.FakeDomain('test-domain',
|
||||
'test',
|
||||
'test domain',
|
||||
enabled=True))
|
||||
self.user_project_assignment = obj_to_dict({
|
||||
'role': {
|
||||
'id': self.fake_role['id']
|
||||
},
|
||||
'scope': {
|
||||
'project': {
|
||||
'id': self.fake_project['id']
|
||||
}
|
||||
},
|
||||
'user': {
|
||||
'id': self.fake_user['id']
|
||||
}
|
||||
})
|
||||
self.group_project_assignment = obj_to_dict({
|
||||
'role': {
|
||||
'id': self.fake_role['id']
|
||||
},
|
||||
'scope': {
|
||||
'project': {
|
||||
'id': self.fake_project['id']
|
||||
}
|
||||
},
|
||||
'group': {
|
||||
'id': self.fake_group['id']
|
||||
}
|
||||
})
|
||||
self.user_domain_assignment = obj_to_dict({
|
||||
'role': {
|
||||
'id': self.fake_role['id']
|
||||
},
|
||||
'scope': {
|
||||
'domain': {
|
||||
'id': self.fake_domain['id']
|
||||
}
|
||||
},
|
||||
'user': {
|
||||
'id': self.fake_user['id']
|
||||
}
|
||||
})
|
||||
self.group_domain_assignment = obj_to_dict({
|
||||
'role': {
|
||||
'id': self.fake_role['id']
|
||||
},
|
||||
'scope': {
|
||||
'domain': {
|
||||
'id': self.fake_domain['id']
|
||||
}
|
||||
},
|
||||
'group': {
|
||||
'id': self.fake_group['id']
|
||||
}
|
||||
})
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_v2(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.roles.roles_for_user.return_value = []
|
||||
mock_keystone.roles.add_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project_v2(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.return_value = []
|
||||
mock_keystone.roles.add_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['id'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['id'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project_v2_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.return_value = [self.fake_role]
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_project_assignment]
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['id'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_group_project(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_group_project_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.group_project_assignment]
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_domain(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_domain_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_domain_assignment]
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_group_domain(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_group_domain_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.group_domain_assignment]
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertFalse(self.cloud.grant_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_v2(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.roles.roles_for_user.return_value = [self.fake_role]
|
||||
mock_keystone.roles.remove_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project_v2(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.return_value = []
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['id'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['id'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project_v2_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.return_value = [self.fake_role]
|
||||
mock_keystone.roles.remove_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_project_assignment]
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['id'],
|
||||
user=self.fake_user['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_group_project(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_group_project_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.group_project_assignment]
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
project=self.fake_project['id']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
project=self.fake_project['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_domain(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_domain_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_domain_assignment]
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_group_domain(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = []
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertFalse(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_group_domain_exists(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.group_domain_assignment]
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['id']))
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
group=self.fake_group['id'],
|
||||
domain=self.fake_domain['id']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_no_role(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = []
|
||||
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Role {0} not found'.format(self.fake_role['name'])
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_no_role(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = []
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Role {0} not found'.format(self.fake_role['name'])
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'],
|
||||
group=self.fake_group['name'],
|
||||
domain=self.fake_domain['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_no_user_or_group_specified(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a user or a group'
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_no_user_or_group_specified(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a user or a group'
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_no_user_or_group(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = []
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a user or a group'
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_no_user_or_group(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = []
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a user or a group'
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_both_user_and_group(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Specify either a group or a user, not both'
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
group=self.fake_group['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_both_user_and_group(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.groups.list.return_value = [self.fake_group]
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Specify either a group or a user, not both'
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
group=self.fake_group['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_both_project_and_domain(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
fake_user2 = fakes.FakeUser('12345',
|
||||
'test@nobody.org',
|
||||
'test',
|
||||
domain_id='default')
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user, fake_user2]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_both_project_and_domain(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
fake_user2 = fakes.FakeUser('12345',
|
||||
'test@nobody.org',
|
||||
'test',
|
||||
domain_id='default')
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user, fake_user2]
|
||||
mock_keystone.projects.list.return_value = [self.fake_project]
|
||||
mock_keystone.domains.get.return_value = self.fake_domain
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_project_assignment]
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id'],
|
||||
domain=self.fake_domain['name']))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_no_project_or_domain(self, mock_keystone, mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.projects.list.return_value = []
|
||||
mock_keystone.domains.get.return_value = None
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a domain or project'
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_no_project_or_domain(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.projects.list.return_value = []
|
||||
mock_keystone.domains.get.return_value = None
|
||||
mock_keystone.role_assignments.list.return_value = \
|
||||
[self.user_project_assignment]
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Must specify either a domain or project'
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'])
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_bad_domain_exception(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.side_effect = Exception('test')
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Failed to get domain baddomain \(Inner Exception: test\)'
|
||||
):
|
||||
self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain='baddomain')
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_bad_domain_exception(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '3'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.domains.get.side_effect = Exception('test')
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudException,
|
||||
'Failed to get domain baddomain \(Inner Exception: test\)'
|
||||
):
|
||||
self.cloud.revoke_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
domain='baddomain')
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project_v2_wait(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.side_effect = [
|
||||
[], [], [self.fake_role]]
|
||||
mock_keystone.roles.add_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.grant_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id'],
|
||||
wait=True))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_grant_role_user_project_v2_wait_exception(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.side_effect = [
|
||||
[], [], [self.fake_role]]
|
||||
mock_keystone.roles.add_user_role.return_value = self.fake_role
|
||||
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudTimeout,
|
||||
'Timeout waiting for role to be granted'
|
||||
):
|
||||
self.assertTrue(self.cloud.grant_role(
|
||||
self.fake_role['name'], user=self.fake_user['name'],
|
||||
project=self.fake_project['id'], wait=True, timeout=1))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project_v2_wait(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.side_effect = [
|
||||
[self.fake_role], [self.fake_role],
|
||||
[]]
|
||||
mock_keystone.roles.remove_user_role.return_value = self.fake_role
|
||||
self.assertTrue(self.cloud.revoke_role(self.fake_role['name'],
|
||||
user=self.fake_user['name'],
|
||||
project=self.fake_project['id'],
|
||||
wait=True))
|
||||
|
||||
@patch.object(occ.cloud_config.CloudConfig, 'get_api_version')
|
||||
@patch.object(OperatorCloud, 'keystone_client')
|
||||
def test_revoke_role_user_project_v2_wait_exception(self,
|
||||
mock_keystone,
|
||||
mock_api_version):
|
||||
mock_api_version.return_value = '2.0'
|
||||
mock_keystone.roles.list.return_value = [self.fake_role]
|
||||
mock_keystone.tenants.list.return_value = [self.fake_project]
|
||||
mock_keystone.users.list.return_value = [self.fake_user]
|
||||
mock_keystone.roles.roles_for_user.side_effect = [
|
||||
[self.fake_role], [self.fake_role],
|
||||
[]]
|
||||
mock_keystone.roles.remove_user_role.return_value = self.fake_role
|
||||
with testtools.ExpectedException(
|
||||
OpenStackCloudTimeout,
|
||||
'Timeout waiting for role to be revoked'
|
||||
):
|
||||
self.assertTrue(self.cloud.revoke_role(
|
||||
self.fake_role['name'], user=self.fake_user['name'],
|
||||
project=self.fake_project['id'], wait=True, timeout=1))
|
Loading…
x
Reference in New Issue
Block a user