Merge "Implement assignment of Domain Roles to Group"

This commit is contained in:
Jenkins 2013-08-29 06:36:14 +00:00 committed by Gerrit Code Review
commit 6ad0ae0d27
4 changed files with 370 additions and 9 deletions

View File

@ -21,3 +21,4 @@ DOMAINS_INDEX_URL = 'horizon:admin:domains:index'
DOMAINS_INDEX_VIEW_TEMPLATE = 'admin/domains/index.html'
DOMAINS_CREATE_URL = 'horizon:admin:domains:create'
DOMAINS_UPDATE_URL = 'horizon:admin:domains:update'
DOMAIN_GROUP_MEMBER_SLUG = "update_group_members"

View File

@ -17,6 +17,8 @@
import logging
from django.conf import settings # noqa
from django.core.urlresolvers import reverse # noqa
from django.utils.http import urlencode # noqa
from django.utils.translation import ugettext_lazy as _ # noqa
from keystoneclient import exceptions
@ -32,6 +34,19 @@ from openstack_dashboard.dashboards.admin.domains import constants
LOG = logging.getLogger(__name__)
class ViewGroupsLink(tables.LinkAction):
name = "groups"
verbose_name = _("Modify Groups")
url = "horizon:admin:domains:update"
classes = ("ajax-modal", "btn-edit")
def get_link_url(self, domain):
step = 'update_group_members'
base_url = reverse(self.url, args=[domain.id])
param = urlencode({"step": step})
return "?".join([base_url, param])
class CreateDomainLink(tables.LinkAction):
name = "create"
verbose_name = _("Create Domain")
@ -152,6 +167,7 @@ class DomainsTable(tables.DataTable):
class Meta:
name = "domains"
verbose_name = _("Domains")
row_actions = (SetDomainContext, EditDomainLink, DeleteDomainsAction)
row_actions = (SetDomainContext, ViewGroupsLink, EditDomainLink,
DeleteDomainsAction)
table_actions = (DomainFilterAction, CreateDomainLink,
DeleteDomainsAction, UnsetDomainContext)

View File

@ -33,6 +33,7 @@ from openstack_dashboard.dashboards.admin.domains import workflows
DOMAINS_INDEX_URL = reverse(constants.DOMAINS_INDEX_URL)
DOMAIN_CREATE_URL = reverse(constants.DOMAINS_CREATE_URL)
DOMAIN_UPDATE_URL = reverse(constants.DOMAINS_UPDATE_URL, args=[1])
GROUP_ROLE_PREFIX = constants.DOMAIN_GROUP_MEMBER_SLUG + "_role_"
class DomainsViewTests(test.BaseAdminViewTests):
@ -178,11 +179,42 @@ class UpdateDomainWorkflowTests(test.BaseAdminViewTests):
domain_info = self._get_domain_info(domain)
return domain_info
@test.create_stubs({api.keystone: ('domain_get', )})
def _get_all_groups(self, domain_id):
if not domain_id:
groups = self.groups.list()
else:
groups = [group for group in self.groups.list()
if group.domain_id == domain_id]
return groups
def _get_domain_groups(self, domain_id):
# all domain groups have role assignments
return self._get_all_groups(domain_id)
@test.create_stubs({api.keystone: ('domain_get',
'get_default_role',
'role_list',
'group_list',
'roles_for_group')})
def test_update_domain_get(self):
default_role = self.roles.first()
domain = self.domains.get(id="1")
groups = self._get_all_groups(domain.id)
roles = self.roles.list()
api.keystone.domain_get(IsA(http.HttpRequest), '1').AndReturn(domain)
api.keystone.get_default_role(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(default_role)
api.keystone.role_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(roles)
api.keystone.group_list(IsA(http.HttpRequest), domain=domain.id) \
.AndReturn(groups)
for group in groups:
api.keystone.roles_for_group(IsA(http.HttpRequest),
group=group.id,
domain=domain.id) \
.AndReturn(roles)
self.mox.ReplayAll()
@ -194,29 +226,174 @@ class UpdateDomainWorkflowTests(test.BaseAdminViewTests):
self.assertEqual(res.context['workflow'].name,
workflows.UpdateDomain.name)
step = workflow.get_step("update_domain")
self.assertEqual(step.action.initial['name'], domain.name)
self.assertEqual(step.action.initial['description'],
domain.description)
self.assertQuerysetEqual(workflow.steps,
['<UpdateDomainInfo: update_domain>', ])
['<UpdateDomainInfo: update_domain>',
'<UpdateDomainGroups: update_group_members>'])
@test.create_stubs({api.keystone: ('domain_get',
'domain_update')})
'domain_update',
'get_default_role',
'role_list',
'group_list',
'roles_for_group',
'remove_group_role',
'add_group_role',)})
def test_update_domain_post(self):
default_role = self.roles.first()
domain = self.domains.get(id="1")
test_description = 'updated description'
groups = self._get_all_groups(domain.id)
domain_groups = self._get_domain_groups(domain.id)
roles = self.roles.list()
api.keystone.domain_get(IsA(http.HttpRequest), '1').AndReturn(domain)
api.keystone.get_default_role(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(default_role)
api.keystone.role_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(roles)
api.keystone.group_list(IsA(http.HttpRequest), domain=domain.id) \
.AndReturn(groups)
for group in groups:
api.keystone.roles_for_group(IsA(http.HttpRequest),
group=group.id,
domain=domain.id) \
.AndReturn(roles)
workflow_data = self._get_workflow_data(domain)
# update some fields
workflow_data['description'] = test_description
# Group assignment form data
workflow_data[GROUP_ROLE_PREFIX + "1"] = ['3'] # admin role
workflow_data[GROUP_ROLE_PREFIX + "2"] = ['2'] # member role
# handle
api.keystone.domain_update(IsA(http.HttpRequest),
description=test_description,
domain_id=domain.id,
enabled=domain.enabled,
name=domain.name).AndReturn(None)
self.mox.ReplayAll()
# Group assignments
api.keystone.group_list(IsA(http.HttpRequest),
domain=domain.id).AndReturn(domain_groups)
workflow_data = self._get_workflow_data(domain)
workflow_data['description'] = test_description
# admin group - try to remove all roles on current domain
api.keystone.roles_for_group(IsA(http.HttpRequest),
group='1',
domain=domain.id) \
.AndReturn(roles)
for role in roles:
api.keystone.remove_group_role(IsA(http.HttpRequest),
role=role.id,
group='1',
domain=domain.id)
# member group 1 - has role 1, will remove it
api.keystone.roles_for_group(IsA(http.HttpRequest),
group='2',
domain=domain.id) \
.AndReturn((roles[0],))
# remove role 1
api.keystone.remove_group_role(IsA(http.HttpRequest),
role='1',
group='2',
domain=domain.id)
# add role 2
api.keystone.add_group_role(IsA(http.HttpRequest),
role='2',
group='2',
domain=domain.id)
# member group 3 - has role 2
api.keystone.roles_for_group(IsA(http.HttpRequest),
group='3',
domain=domain.id) \
.AndReturn((roles[1],))
# remove role 2
api.keystone.remove_group_role(IsA(http.HttpRequest),
role='2',
group='3',
domain=domain.id)
# add role 1
api.keystone.add_group_role(IsA(http.HttpRequest),
role='1',
group='3',
domain=domain.id)
self.mox.ReplayAll()
res = self.client.post(DOMAIN_UPDATE_URL, workflow_data)
self.assertNoFormErrors(res)
self.assertMessageCount(success=1)
self.assertRedirectsNoFollow(res, DOMAINS_INDEX_URL)
@test.create_stubs({api.keystone: ('domain_get',)})
def test_update_domain_get_error(self):
domain = self.domains.get(id="1")
api.keystone.domain_get(IsA(http.HttpRequest), domain.id) \
.AndRaise(self.exceptions.keystone)
self.mox.ReplayAll()
res = self.client.get(DOMAIN_UPDATE_URL)
self.assertRedirectsNoFollow(res, DOMAINS_INDEX_URL)
@test.create_stubs({api.keystone: ('domain_get',
'domain_update',
'get_default_role',
'role_list',
'group_list',
'roles_for_group')})
def test_update_domain_post_error(self):
default_role = self.roles.first()
domain = self.domains.get(id="1")
test_description = 'updated description'
groups = self._get_all_groups(domain.id)
roles = self.roles.list()
api.keystone.domain_get(IsA(http.HttpRequest), '1').AndReturn(domain)
api.keystone.get_default_role(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(default_role)
api.keystone.role_list(IsA(http.HttpRequest)) \
.MultipleTimes().AndReturn(roles)
api.keystone.group_list(IsA(http.HttpRequest), domain=domain.id) \
.AndReturn(groups)
for group in groups:
api.keystone.roles_for_group(IsA(http.HttpRequest),
group=group.id,
domain=domain.id) \
.AndReturn(roles)
workflow_data = self._get_workflow_data(domain)
# update some fields
workflow_data['description'] = test_description
# Group assignment form data
workflow_data[GROUP_ROLE_PREFIX + "1"] = ['3'] # admin role
workflow_data[GROUP_ROLE_PREFIX + "2"] = ['2'] # member role
# handle
api.keystone.domain_update(IsA(http.HttpRequest),
description=test_description,
domain_id=domain.id,
enabled=domain.enabled,
name=domain.name) \
.AndRaise(self.exceptions.keystone)
self.mox.ReplayAll()
res = self.client.post(DOMAIN_UPDATE_URL, workflow_data)
self.assertNoFormErrors(res)
self.assertMessageCount(error=1)
self.assertRedirectsNoFollow(res, DOMAINS_INDEX_URL)

View File

@ -16,6 +16,8 @@
import logging
from django.conf import settings # noqa
from django.core.urlresolvers import reverse # noqa
from django.utils.translation import ugettext_lazy as _ # noqa
from horizon import exceptions
@ -54,6 +56,102 @@ class CreateDomainInfo(workflows.Step):
"enabled")
class UpdateDomainGroupsAction(workflows.MembershipAction):
def __init__(self, request, *args, **kwargs):
super(UpdateDomainGroupsAction, self).__init__(request,
*args,
**kwargs)
err_msg = _('Unable to retrieve group list. Please try again later.')
domain_id = ''
if 'domain_id' in args[0]:
domain_id = args[0]['domain_id']
# Get the default role
try:
default_role = api.keystone.get_default_role(self.request)
# Default role is necessary to add members to a domain
if default_role is None:
default = getattr(settings,
"OPENSTACK_KEYSTONE_DEFAULT_ROLE", None)
msg = _('Could not find default role "%s" in Keystone') % \
default
raise exceptions.NotFound(msg)
except Exception:
exceptions.handle(self.request,
err_msg,
redirect=reverse(constants.DOMAINS_INDEX_URL))
default_role_name = self.get_default_role_field_name()
self.fields[default_role_name] = forms.CharField(required=False)
self.fields[default_role_name].initial = default_role.id
# Get list of available groups
all_groups = []
try:
all_groups = api.keystone.group_list(request,
domain=domain_id)
except Exception:
exceptions.handle(request, err_msg)
groups_list = [(group.id, group.name) for group in all_groups]
# Get list of roles
role_list = []
try:
role_list = api.keystone.role_list(request)
except Exception:
exceptions.handle(request,
err_msg,
redirect=reverse(constants.DOMAINS_INDEX_URL))
for role in role_list:
field_name = self.get_member_field_name(role.id)
label = _(role.name)
self.fields[field_name] = forms.MultipleChoiceField(required=False,
label=label)
self.fields[field_name].choices = groups_list
self.fields[field_name].initial = []
# Figure out groups & roles
if domain_id:
for group in all_groups:
try:
roles = api.keystone.roles_for_group(self.request,
group=group.id,
domain=domain_id)
except Exception:
exceptions.handle(request,
err_msg,
redirect=reverse(
constants.DOMAINS_INDEX_URL))
for role in roles:
field_name = self.get_member_field_name(role.id)
self.fields[field_name].initial.append(group.id)
class Meta:
name = _("Domain Groups")
slug = constants.DOMAIN_GROUP_MEMBER_SLUG
class UpdateDomainGroups(workflows.UpdateMembersStep):
action_class = UpdateDomainGroupsAction
available_list_title = _("All Groups")
members_list_title = _("Domain Groups")
no_available_text = _("No groups found.")
no_members_text = _("No groups.")
def contribute(self, data, context):
if data:
try:
roles = api.keystone.role_list(self.workflow.request)
except Exception:
exceptions.handle(self.workflow.request,
_('Unable to retrieve role list.'))
post = self.workflow.request.POST
for role in roles:
field = self.get_member_field_name(role.id)
context[field] = post.getlist(field)
return context
class CreateDomain(workflows.Workflow):
slug = "create_domain"
name = _("Create Domain")
@ -78,7 +176,6 @@ class CreateDomain(workflows.Workflow):
except Exception:
exceptions.handle(request, ignore=True)
return False
return True
@ -105,7 +202,8 @@ class UpdateDomain(workflows.Workflow):
success_message = _('Modified domain "%s".')
failure_message = _('Unable to modify domain "%s".')
success_url = constants.DOMAINS_INDEX_URL
default_steps = (UpdateDomainInfo, )
default_steps = (UpdateDomainInfo,
UpdateDomainGroups)
def format_status_message(self, message):
return message % self.context.get('name', 'unknown domain')
@ -123,4 +221,73 @@ class UpdateDomain(workflows.Workflow):
except Exception:
exceptions.handle(request, ignore=True)
return False
# update domain groups
groups_to_modify = 0
member_step = self.get_step(constants.DOMAIN_GROUP_MEMBER_SLUG)
try:
# Get our role options
available_roles = api.keystone.role_list(request)
# Get the groups currently associated with this domain so we
# can diff against it.
domain_groups = api.keystone.group_list(request,
domain=domain_id)
groups_to_modify = len(domain_groups)
for group in domain_groups:
# Check if there have been any changes in the roles of
# Existing domain members.
current_roles = api.keystone.roles_for_group(
self.request,
group=group.id,
domain=domain_id)
current_role_ids = [role.id for role in current_roles]
for role in available_roles:
# Check if the group is in the list of groups with
# this role.
field_name = member_step.get_member_field_name(role.id)
if group.id in data[field_name]:
# Add it if necessary
if role.id not in current_role_ids:
# group role has changed
api.keystone.add_group_role(
request,
role=role.id,
group=group.id,
domain=domain_id)
else:
# Group role is unchanged, so remove it from
# the remaining roles list to avoid removing it
# later
index = current_role_ids.index(role.id)
current_role_ids.pop(index)
# Revoke any removed roles.
for id_to_delete in current_role_ids:
api.keystone.remove_group_role(request,
role=id_to_delete,
group=group.id,
domain=domain_id)
groups_to_modify -= 1
# Grant new roles on the domain.
for role in available_roles:
field_name = member_step.get_member_field_name(role.id)
# Count how many groups may be added for error handling.
groups_to_modify += len(data[field_name])
for role in available_roles:
groups_added = 0
field_name = member_step.get_member_field_name(role.id)
for group_id in data[field_name]:
if not filter(lambda x: group_id == x.id, domain_groups):
api.keystone.add_group_role(request,
role=role.id,
group=group_id,
domain=domain_id)
groups_added += 1
groups_to_modify -= groups_added
except Exception:
exceptions.handle(request, _('Failed to modify %s domain groups.'
% groups_to_modify))
return True
return True