Merge "Pass a real image target to the policy enforcer"

This commit is contained in:
Jenkins 2015-03-19 18:07:43 +00:00 committed by Gerrit Code Review
commit 8608ab1399
4 changed files with 446 additions and 67 deletions
doc/source
glance
api
tests

@ -116,6 +116,57 @@ To limit an action to a particular role or roles, you list the roles like so ::
The above would add a rule that only allowed users that had roles of either
"admin" or "superuser" to delete an image.
Writing Rules
-------------
Role checks are going to continue to work exactly as they already do. If the
role defined in the check is one that the user holds, then that will pass,
e.g., ``role:admin``.
To write a generic rule, you need to know that there are three values provided
by Glance that can be used in a rule on the left side of the colon (``:``).
Those values are the current user's credentials in the form of:
- role
- tenant
- owner
The left side of the colon can also contain any value that Python can
understand, e.g.,:
- ``True``
- ``False``
- ``"a string"``
- &c.
Using ``tenant`` and ``owner`` will only work with images. Consider the
following rule::
tenant:%(owner)s
This will use the ``tenant`` value of the currently authenticated user. It
will also use ``owner`` from the image it is acting upon. If those two
values are equivalent the check will pass. All attributes on an image (as well
as extra image properties) are available for use on the right side of the
colon. The most useful are the following:
- ``owner``
- ``protected``
- ``is_public``
Therefore, you could construct a set of rules like the following::
{
"not_protected": "False:%(protected)s",
"is_owner": "tenant:%(owner)s",
"is_owner_or_admin": "rule:is_owner or role:admin",
"not_protected_and_is_owner": "rule:not_protected and rule:is_owner",
"get_image": "rule:is_owner_or_admin",
"delete_image": "rule:not_protected_and_is_owner",
"add_member": "rule:not_protected_and_is_owner"
}
Examples
--------
@ -124,7 +175,7 @@ Example 1. (The default policy configuration)
::
{
"default": []
"default": ""
}
Note that an empty JSON list means that all methods of the
@ -135,8 +186,8 @@ Example 2. Disallow modification calls to non-admins
::
{
"default": [],
"add_image": ["role:admin"],
"modify_image": ["role:admin"],
"delete_image": ["role:admin"]
"default": "",
"add_image": "role:admin",
"modify_image": "role:admin",
"delete_image": "role:admin"
}

@ -111,19 +111,25 @@ class ImageRepoProxy(glance.domain.proxy.Repo):
item_proxy_kwargs=proxy_kwargs)
def get(self, image_id):
try:
image = super(ImageRepoProxy, self).get(image_id)
except exception.NotFound:
self.policy.enforce(self.context, 'get_image', {})
return super(ImageRepoProxy, self).get(image_id)
raise
else:
self.policy.enforce(self.context, 'get_image', ImageTarget(image))
return image
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_images', {})
return super(ImageRepoProxy, self).list(*args, **kwargs)
def save(self, image, from_state=None):
self.policy.enforce(self.context, 'modify_image', {})
self.policy.enforce(self.context, 'modify_image', image.target)
return super(ImageRepoProxy, self).save(image, from_state=from_state)
def add(self, image):
self.policy.enforce(self.context, 'add_image', {})
self.policy.enforce(self.context, 'add_image', image.target)
return super(ImageRepoProxy, self).add(image)
@ -131,6 +137,7 @@ class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, policy):
self.image = image
self.target = ImageTarget(image)
self.context = context
self.policy = policy
super(ImageProxy, self).__init__(image)
@ -142,7 +149,7 @@ class ImageProxy(glance.domain.proxy.Image):
@visibility.setter
def visibility(self, value):
if value == 'public':
self.policy.enforce(self.context, 'publicize_image', {})
self.policy.enforce(self.context, 'publicize_image', self.target)
self.image.visibility = value
@property
@ -154,15 +161,16 @@ class ImageProxy(glance.domain.proxy.Image):
def locations(self, value):
if not isinstance(value, (list, ImageLocationsProxy)):
raise exception.Invalid(_('Invalid locations: %s') % value)
self.policy.enforce(self.context, 'set_image_location', {})
self.policy.enforce(self.context, 'set_image_location', self.target)
new_locations = list(value)
if (set([loc['url'] for loc in self.image.locations]) -
set([loc['url'] for loc in new_locations])):
self.policy.enforce(self.context, 'delete_image_location', {})
self.policy.enforce(self.context, 'delete_image_location',
self.target)
self.image.locations = new_locations
def delete(self):
self.policy.enforce(self.context, 'delete_image', {})
self.policy.enforce(self.context, 'delete_image', self.target)
return self.image.delete()
def deactivate(self):
@ -180,13 +188,11 @@ class ImageProxy(glance.domain.proxy.Image):
self.image.reactivate()
def get_data(self, *args, **kwargs):
target = ImageTarget(self.image)
self.policy.enforce(self.context, 'download_image',
target=target)
self.policy.enforce(self.context, 'download_image', self.target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
self.policy.enforce(self.context, 'upload_image', {})
self.policy.enforce(self.context, 'upload_image', self.target)
return self.image.set_data(*args, **kwargs)
def get_member_repo(self, **kwargs):
@ -224,27 +230,28 @@ class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, member_repo, context, policy):
self.member_repo = member_repo
self.target = ImageTarget(self.member_repo.image)
self.context = context
self.policy = policy
def add(self, member):
self.policy.enforce(self.context, 'add_member', {})
self.policy.enforce(self.context, 'add_member', self.target)
self.member_repo.add(member)
def get(self, member_id):
self.policy.enforce(self.context, 'get_member', {})
self.policy.enforce(self.context, 'get_member', self.target)
return self.member_repo.get(member_id)
def save(self, member, from_state=None):
self.policy.enforce(self.context, 'modify_member', {})
self.policy.enforce(self.context, 'modify_member', self.target)
self.member_repo.save(member, from_state=from_state)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_members', {})
self.policy.enforce(self.context, 'get_members', self.target)
return self.member_repo.list(*args, **kwargs)
def remove(self, member):
self.policy.enforce(self.context, 'delete_member', {})
self.policy.enforce(self.context, 'delete_member', self.target)
self.member_repo.remove(member)
@ -370,31 +377,38 @@ class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
class ImageTarget(object):
SENTINEL = object()
def __init__(self, image):
"""
Initialize the object
def __init__(self, target):
"""Initialize the object
:param image: Image object
:param target: Object being targetted
"""
self.image = image
self.target = target
def __getitem__(self, key):
"""
Returns the value of 'key' from the image if image has that attribute
else tries to retrieve value from the extra_properties of image.
"""Return the value of 'key' from the target.
If the target has the attribute 'key', return it.
:param key: value to retrieve
"""
# Need to change the key 'id' to 'image_id' as Image object has
# attribute as 'image_id' in case of V2.
key = self.key_transforms(key)
value = getattr(self.target, key, self.SENTINEL)
if value is self.SENTINEL:
extra_properties = getattr(self.target, 'extra_properties', None)
if extra_properties is not None:
value = extra_properties[key]
else:
value = None
return value
def key_transforms(self, key):
if key == 'id':
key = 'image_id'
if hasattr(self.image, key):
return getattr(self.image, key)
else:
return self.image.extra_properties[key]
return key
# Metadef Namespace classes

@ -811,6 +811,290 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_image_modification_works_for_owning_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "tenant:%(owner)s",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers['content-type'] = media_type
del headers['X-Roles']
data = jsonutils.dumps([
{'op': 'replace', 'path': '/name', 'value': 'new-name'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code)
self.stop_servers()
def test_image_modification_fails_on_mismatched_tenant_ids(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "'A-Fake-Tenant-Id':%(owner)s",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted"
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers['content-type'] = media_type
del headers['X-Roles']
data = jsonutils.dumps([
{'op': 'replace', 'path': '/name', 'value': 'new-name'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_member_additions_works_for_owning_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "tenant:%(owner)s",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin'})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
# Get the image's members resource
path = self._url('/v2/images/%s/members' % image_id)
body = jsonutils.dumps({'member': TENANT3})
del headers['X-Roles']
response = requests.post(path, headers=headers, data=body)
self.assertEqual(200, response.status_code)
self.stop_servers()
def test_image_additions_works_only_for_specific_tenant_id(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "'{0}':%(owner)s".format(TENANT1),
"get_image": "",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
headers['X-Tenant-Id'] = TENANT2
response = requests.post(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_owning_tenant_id_can_retrieve_image_information(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Remove the admin role
del headers['X-Roles']
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
# Can retrieve the image as TENANT1
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Can retrieve the image's members as TENANT1
path = self._url('/v2/images/%s/members' % image_id)
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
headers['X-Tenant-Id'] = TENANT2
response = requests.get(path, headers=headers)
self.assertEqual(403, response.status_code)
self.stop_servers()
def test_owning_tenant_can_publicize_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"publicize_iamge": "tenant:%(owner)s",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({
'Content-Type': 'application/openstack-images-v2.1-json-patch',
'X-Tenant-Id': TENANT1,
})
doc = [{'op': 'replace', 'path': '/visibility', 'value': 'public'}]
data = jsonutils.dumps(doc)
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code)
def test_owning_tenant_can_delete_image(self):
rules = {
"context_is_admin": "role:admin",
"default": "",
"add_image": "",
"publicize_iamge": "tenant:%(owner)s",
"get_image": "tenant:%(owner)s",
"modify_image": "",
"upload_image": "",
"get_image_location": "",
"delete_image": "",
"restricted":
"not ('aki':%(container_format)s and role:_member_)",
"download_image": "role:admin or rule:restricted",
"add_member": "",
}
self.set_policy_rules(rules)
self.start_servers(**self.__dict__.copy())
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'admin', 'X-Tenant-Id': TENANT1})
data = jsonutils.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Get the image's ID
image = jsonutils.loads(response.text)
image_id = image['id']
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=headers)
self.assertEqual(204, response.status_code)
def test_image_size_cap(self):
self.api_server.image_size_cap = 128
self.start_servers(**self.__dict__.copy())

@ -72,6 +72,8 @@ class ImageFactoryStub(object):
class MemberRepoStub(object):
image = None
def add(self, image_member):
image_member.output = 'member_repo_add'
@ -207,41 +209,53 @@ class TestImagePolicy(test_utils.BaseTestCase):
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'public')
self.assertEqual('private', image.visibility)
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
self.policy.enforce.assert_called_once_with({}, "publicize_image",
image.target)
def test_publicize_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image.visibility = 'public'
self.assertEqual('public', image.visibility)
self.policy.enforce.assert_called_once_with({}, "publicize_image", {})
self.policy.enforce.assert_called_once_with({}, "publicize_image",
image.target)
def test_delete_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.delete)
self.assertEqual('active', image.status)
self.policy.enforce.assert_called_once_with({}, "delete_image", {})
self.policy.enforce.assert_called_once_with({}, "delete_image",
image.target)
def test_delete_image_allowed(self):
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image.delete()
self.assertEqual('deleted', image.status)
self.policy.enforce.assert_called_once_with({}, "delete_image", {})
self.policy.enforce.assert_called_once_with({}, "delete_image",
image.target)
def test_get_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_target = mock.Mock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.get, UUID1)
self.policy.enforce.assert_called_once_with({}, "get_image", {})
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
def test_get_image_allowed(self):
image_target = mock.Mock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
output = image_repo.get(UUID1)
self.assertIsInstance(output, glance.api.policy.ImageProxy)
self.assertEqual('image_from_get', output.image)
self.policy.enforce.assert_called_once_with({}, "get_image", {})
self.policy.enforce.assert_called_once_with({}, "get_image",
image_target)
def test_get_images_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -265,14 +279,16 @@ class TestImagePolicy(test_utils.BaseTestCase):
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.save, image)
self.policy.enforce.assert_called_once_with({}, "modify_image", {})
self.policy.enforce.assert_called_once_with({}, "modify_image",
image.target)
def test_modify_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image_repo.save(image)
self.policy.enforce.assert_called_once_with({}, "modify_image", {})
self.policy.enforce.assert_called_once_with({}, "modify_image",
image.target)
def test_add_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -280,14 +296,16 @@ class TestImagePolicy(test_utils.BaseTestCase):
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image_repo.add, image)
self.policy.enforce.assert_called_once_with({}, "add_image", {})
self.policy.enforce.assert_called_once_with({}, "add_image",
image.target)
def test_add_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(self.image_repo_stub,
{}, self.policy)
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
image_repo.add(image)
self.policy.enforce.assert_called_once_with({}, "add_image", {})
self.policy.enforce.assert_called_once_with({}, "add_image",
image.target)
def test_new_image_visibility(self):
self.policy.enforce.side_effect = exception.Forbidden
@ -308,20 +326,21 @@ class TestImagePolicy(test_utils.BaseTestCase):
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
with mock.patch('glance.api.policy.ImageTarget'):
image = glance.api.policy.ImageProxy(image_stub, {}, self.policy)
target = image.target
self.policy.enforce.side_effect = exception.Forbidden
glance.api.policy.ImageTarget = mock.Mock()
target = glance.api.policy.ImageTarget(image)
self.assertRaises(exception.Forbidden, image.get_data)
self.policy.enforce.assert_called_once_with({}, "download_image",
target=target)
target)
def test_image_set_data(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, image.set_data)
self.policy.enforce.assert_called_once_with({}, "upload_image", {})
self.policy.enforce.assert_called_once_with({}, "upload_image",
image.target)
class TestMemberPolicy(test_utils.BaseTestCase):
@ -330,60 +349,71 @@ class TestMemberPolicy(test_utils.BaseTestCase):
self.policy.enforce = mock.Mock()
self.member_repo = glance.api.policy.ImageMemberRepoProxy(
MemberRepoStub(), {}, self.policy)
self.target = self.member_repo.target
super(TestMemberPolicy, self).setUp()
def test_add_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.add, '')
self.policy.enforce.assert_called_once_with({}, "add_member", {})
self.policy.enforce.assert_called_once_with({}, "add_member",
self.target)
def test_add_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.add(image_member)
self.assertEqual('member_repo_add', image_member.output)
self.policy.enforce.assert_called_once_with({}, "add_member", {})
self.policy.enforce.assert_called_once_with({}, "add_member",
self.target)
def test_get_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.get, '')
self.policy.enforce.assert_called_once_with({}, "get_member", {})
self.policy.enforce.assert_called_once_with({}, "get_member",
self.target)
def test_get_member_allowed(self):
output = self.member_repo.get('')
self.assertEqual('member_repo_get', output)
self.policy.enforce.assert_called_once_with({}, "get_member", {})
self.policy.enforce.assert_called_once_with({}, "get_member",
self.target)
def test_modify_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.save, '')
self.policy.enforce.assert_called_once_with({}, "modify_member", {})
self.policy.enforce.assert_called_once_with({}, "modify_member",
self.target)
def test_modify_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.save(image_member)
self.assertEqual('member_repo_save', image_member.output)
self.policy.enforce.assert_called_once_with({}, "modify_member", {})
self.policy.enforce.assert_called_once_with({}, "modify_member",
self.target)
def test_get_members_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.list, '')
self.policy.enforce.assert_called_once_with({}, "get_members", {})
self.policy.enforce.assert_called_once_with({}, "get_members",
self.target)
def test_get_members_allowed(self):
output = self.member_repo.list('')
self.assertEqual('member_repo_list', output)
self.policy.enforce.assert_called_once_with({}, "get_members", {})
self.policy.enforce.assert_called_once_with({}, "get_members",
self.target)
def test_delete_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.remove, '')
self.policy.enforce.assert_called_once_with({}, "delete_member", {})
self.policy.enforce.assert_called_once_with({}, "delete_member",
self.target)
def test_delete_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.remove(image_member)
self.assertEqual('member_repo_remove', image_member.output)
self.policy.enforce.assert_called_once_with({}, "delete_member", {})
self.policy.enforce.assert_called_once_with({}, "delete_member",
self.target)
class TestTaskPolicy(test_utils.BaseTestCase):