Merge "Using policies for protected properties"

This commit is contained in:
Jenkins 2013-10-17 17:42:40 +00:00 committed by Gerrit Code Review
commit 56b225fb6f
19 changed files with 735 additions and 140 deletions

View File

@ -107,16 +107,21 @@ workers = 1
# (string value). This setting needs to be the same for both
# glance-scrubber and glance-api.
#lock_path=<None>
#
# Property Protections config file
# This file contains the rules for property protections and the roles
# This file contains the rules for property protections and the roles/policies
# associated with it.
# If this config value is not specified, by default, property protections
# won't be enforced.
# If a value is specified and the file is not found, then an
# HTTPInternalServerError will be thrown.
# If a value is specified and the file is not found, then the glance-api
# service will not start.
#property_protection_file =
# Specify whether 'roles' or 'policies' are used in the
# property_protection_file.
# The default value for property_protection_rule_format is 'roles'.
#property_protection_rule_format = roles
# Set a system wide quota for every user. This value is the total number
# of bytes that a user can use across all storage systems. A value of
# 0 means unlimited.

View File

@ -0,0 +1,34 @@
# property-protections-policies.conf.sample
#
# This file is an example config file for when
# property_protection_rule_format=policies is enabled.
#
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions. You may refer to policies defined
# in policy.json.
# The property rules will be applied in the order specified. Once
# a match is found the remaining property rules will not be applied.
#
# WARNING:
# * If the reg ex specified below does not compile, then
# the glance-api service fails to start. (Guide for reg ex python compiler
# used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then the glance-api service fails to start.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
#
# NOTE: Only one policy can be specified per action. If multiple policies are
# specified, then the glance-api service fails to start.
[^x_.*]
create = default
read = default
update = default
delete = default
[.*]
create = context_is_admin
read = context_is_admin
update = context_is_admin
delete = context_is_admin

View File

@ -0,0 +1,32 @@
# property-protections-roles.conf.sample
#
# This file is an example config file for when
# property_protection_rule_format=roles is enabled.
#
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions.
# The property rules will be applied in the order specified. Once
# a match is found the remaining property rules will not be applied.
#
# WARNING:
# * If the reg ex specified below does not compile, then
# glance-api service will not start. (Guide for reg ex python compiler used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then the glance-api service will not start.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
#
# NOTE: Multiple roles can be specified for a given operation. These roles must
# be comma separated.
[^x_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -1,25 +0,0 @@
# property-protections.conf.sample
# Specify regular expression for which properties will be protected in []
# For each section, specify CRUD permissions. You may refer to roles defined
# in policy.json
# The property rules will be applied in the order specified below. Once
# a match is found the remaining property rules will not be traversed through.
# WARNING:
# * If the reg ex specified below does not compile, then
# HTTPInternalServerErrors will be thrown. (Guide for reg ex python compiler used:
# http://docs.python.org/2/library/re.html#regular-expression-syntax)
# * If an operation(create, read, update, delete) is not specified or misspelt
# then that operation for the given regex is disabled for all roles.
# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY!
[^x_.*]
create = admin,member
read = admin,member
update = admin,member
delete = admin,member
[.*]
create = admin
read = admin
update = admin
delete = admin

View File

@ -55,12 +55,21 @@ class Enforcer(object):
self.policy_path = self._find_policy_file()
self.policy_file_mtime = None
self.policy_file_contents = None
self.load_rules()
def set_rules(self, rules):
"""Create a new Rules object based on the provided dict of rules"""
rules_obj = policy.Rules(rules, self.default_rule)
policy.set_rules(rules_obj)
def add_rules(self, rules):
"""Add new rules to the Rules object"""
if policy._rules:
rules_obj = policy.Rules(rules)
policy._rules.update(rules_obj)
else:
self.set_rules(rules)
def load_rules(self):
"""Set the rules found in the json file on disk"""
if self.policy_path:
@ -112,8 +121,6 @@ class Enforcer(object):
:raises: `glance.common.exception.Forbidden`
:returns: A non-False value if access is allowed.
"""
self.load_rules()
credentials = {
'roles': context.roles,
'user': context.user,

View File

@ -23,7 +23,6 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, image_factory, context, property_rules):
self.image_factory = image_factory
self.context = context
self.roles = self.context.roles
self.property_rules = property_rules
kwargs = {'context': self.context,
'property_rules': self.property_rules}
@ -38,7 +37,7 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory):
extra_properties = {}
for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'create',
self.roles):
self.context):
extra_properties[key] = extra_props[key]
else:
raise exception.ReservedProperty(property=key)
@ -72,11 +71,10 @@ class ProtectedImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, property_rules):
self.image = image
self.context = context
self.roles = self.context.roles
self.property_rules = property_rules
self.image.extra_properties = ExtraPropertiesProxy(
self.roles,
self.context,
self.image.extra_properties,
self.property_rules)
super(ProtectedImageProxy, self).__init__(self.image)
@ -84,18 +82,18 @@ class ProtectedImageProxy(glance.domain.proxy.Image):
class ExtraPropertiesProxy(glance.domain.ExtraProperties):
def __init__(self, roles, extra_props, property_rules):
self.roles = roles
def __init__(self, context, extra_props, property_rules):
self.context = context
self.property_rules = property_rules
extra_properties = {}
for key in extra_props.keys():
if self.property_rules.check_property_rules(key, 'read',
self.roles):
self.context):
extra_properties[key] = extra_props[key]
super(ExtraPropertiesProxy, self).__init__(extra_properties)
def __getitem__(self, key):
if self.property_rules.check_property_rules(key, 'read', self.roles):
if self.property_rules.check_property_rules(key, 'read', self.context):
return dict.__getitem__(self, key)
else:
raise KeyError
@ -108,13 +106,13 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties):
try:
if self.__getitem__(key):
if self.property_rules.check_property_rules(key, 'update',
self.roles):
self.context):
return dict.__setitem__(self, key, value)
else:
raise exception.ReservedProperty(property=key)
except KeyError:
if self.property_rules.check_property_rules(key, 'create',
self.roles):
self.context):
return dict.__setitem__(self, key, value)
else:
raise exception.ReservedProperty(property=key)
@ -124,7 +122,7 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties):
raise KeyError
if self.property_rules.check_property_rules(key, 'delete',
self.roles):
self.context):
return dict.__delitem__(self, key)
else:
raise exception.ReservedProperty(property=key)

View File

@ -137,7 +137,7 @@ class Controller(controller.BaseController):
self.policy = policy.Enforcer()
self.pool = eventlet.GreenPool(size=1024)
if property_utils.is_property_protection_enabled():
self.prop_enforcer = property_utils.PropertyRules()
self.prop_enforcer = property_utils.PropertyRules(self.policy)
else:
self.prop_enforcer = None
@ -160,7 +160,7 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled():
for key in create_props:
if (self.prop_enforcer.check_property_rules(
key, 'create', req.context.roles) is False):
key, 'create', req.context) is False):
msg = _("Property '%s' is protected" % key)
LOG.debug(msg)
raise HTTPForbidden(explanation=msg,
@ -177,7 +177,7 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled():
for key in image_meta['properties'].keys():
if (self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles) is False):
key, 'read', req.context) is False):
image_meta['properties'].pop(key)
def _enforce_update_protected_props(self, update_props, image_meta,
@ -200,9 +200,9 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled():
for key in update_props:
has_read = self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles)
key, 'read', req.context)
if ((self.prop_enforcer.check_property_rules(
key, 'update', req.context.roles) is False and
key, 'update', req.context) is False and
image_meta['properties'][key] !=
orig_meta['properties'][key]) or not has_read):
msg = _("Property '%s' is protected" % key)
@ -232,13 +232,13 @@ class Controller(controller.BaseController):
if property_utils.is_property_protection_enabled():
for key in delete_props:
if (self.prop_enforcer.check_property_rules(
key, 'read', req.context.roles) is False):
key, 'read', req.context) is False):
# NOTE(bourke): if read protected, re-add to image_meta to
# prevent deletion
image_meta['properties'][key] = \
orig_meta['properties'][key]
elif (self.prop_enforcer.check_property_rules(
key, 'delete', req.context.roles) is False):
key, 'delete', req.context) is False):
msg = _("Property '%s' is protected" % key)
LOG.debug(msg)
raise HTTPForbidden(explanation=msg,

View File

@ -136,6 +136,10 @@ class InvalidSortKey(Invalid):
message = _("Sort key supplied was not valid.")
class InvalidPropertyProtectionConfiguration(Invalid):
message = _("Invalid configuration in property protection file.")
class InvalidFilterRangeValue(Invalid):
message = _("Unable to filter using the specified range.")

View File

@ -20,8 +20,11 @@ import re
from oslo.config import cfg
import webob.exc
import glance.api.policy
from glance.common import exception
from glance.common.ordereddict import OrderedDict
from glance.openstack.common import log as logging
from glance.openstack.common import policy
# NOTE(bourke): The default dict_type is collections.OrderedDict in py27, but
# we must set manually for compatibility with py26
@ -32,6 +35,10 @@ property_opts = [
cfg.StrOpt('property_protection_file',
default=None,
help=_('The location of the property protection file.')),
cfg.StrOpt('property_protection_rule_format',
default='roles',
help=_('This config value indicates whether "roles" or '
'"policies" are used in the property protection file.')),
]
CONF = cfg.CONF
@ -46,8 +53,13 @@ def is_property_protection_enabled():
class PropertyRules(object):
def __init__(self):
def __init__(self, policy_enforcer=None):
self.rules = []
self.prop_exp_mapping = {}
self.policies = []
self.policy_enforcer = policy_enforcer or glance.api.policy.Enforcer()
self.prop_prot_rule_format = CONF.property_protection_rule_format
self.prop_prot_rule_format = self.prop_prot_rule_format.lower()
self._load_rules()
def _load_rules(self):
@ -58,7 +70,14 @@ class PropertyRules(object):
msg = (_("Couldn't find property protection file %s:%s.") %
(CONF.property_protection_file, e))
LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg)
raise exception.InvalidPropertyProtectionConfiguration()
if self.prop_prot_rule_format not in ['policies', 'roles']:
msg = _("Invalid value '%s' for 'property_protection_rule_format'"
". The permitted values are 'roles' and 'policies'" %
self.prop_prot_rule_format)
LOG.error(msg)
raise exception.InvalidPropertyProtectionConfiguration()
operations = ['create', 'read', 'update', 'delete']
properties = CONFIG.sections()
@ -67,10 +86,25 @@ class PropertyRules(object):
compiled_rule = self._compile_rule(property_exp)
for operation in operations:
roles = CONFIG.get(property_exp, operation)
if roles:
roles = [role.strip() for role in roles.split(',')]
property_dict[operation] = roles
permissions = CONFIG.get(property_exp, operation)
if permissions:
if self.prop_prot_rule_format == 'policies':
if ',' in permissions:
msg = _("Multiple policies '%s' not allowed for a"
" given operation. Policies can be "
"combined in the policy file" %
permissions)
LOG.error(msg)
raise exception.\
InvalidPropertyProtectionConfiguration()
self.prop_exp_mapping[compiled_rule] = property_exp
self._add_policy_rules(property_exp, operation,
permissions)
permissions = [permissions]
else:
permissions = [permission.strip() for permission in
permissions.split(',')]
property_dict[operation] = permissions
else:
property_dict[operation] = []
msg = _(('Property protection on operation %s for rule '
@ -88,9 +122,34 @@ class PropertyRules(object):
msg = (_("Encountered a malformed property protection rule %s:%s.")
% (rule, e))
LOG.error(msg)
raise webob.exc.HTTPInternalServerError(explanation=msg)
raise exception.InvalidPropertyProtectionConfiguration()
def check_property_rules(self, property_name, action, roles):
def _add_policy_rules(self, property_exp, action, rule):
""" Add policy rules to the policy enforcer.
For example, if the file listed as property_protection_file has:
[prop_a]
create = glance_creator
then the corresponding policy rule would be:
"prop_a:create": "rule:glance_creator"
where glance_creator is defined in policy.json. For example:
"glance:creator": "role:admin or role:glance_create_user"
"""
rule = "rule:%s" % rule
rule_name = "%s:%s" % (property_exp, action)
rule_dict = {}
rule_dict[rule_name] = policy.parse_rule(rule)
self.policy_enforcer.add_rules(rule_dict)
def _check_policy(self, property_exp, action, context):
try:
target = ":".join([property_exp, action])
self.policy_enforcer.enforce(context, target, {})
except exception.Forbidden:
return False
return True
def check_property_rules(self, property_name, action, context):
roles = context.roles
if not self.rules:
return True
@ -99,6 +158,12 @@ class PropertyRules(object):
for rule_exp, rule in self.rules:
if rule_exp.search(str(property_name)):
if set(roles).intersection(set(rule.get(action))):
return True
rule_roles = rule.get(action)
if rule_roles:
if self.prop_prot_rule_format == 'policies':
prop_exp_key = self.prop_exp_mapping[rule_exp]
return self._check_policy(prop_exp_key, action,
context)
if set(roles).intersection(set(rule_roles)):
return True
return False

View File

@ -44,7 +44,7 @@ class Gateway(object):
notifier_image_factory = glance.notifier.ImageFactoryProxy(
policy_image_factory, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules()
property_rules = property_utils.PropertyRules(self.policy)
protected_image_factory = property_protections.\
ProtectedImageFactoryProxy(notifier_image_factory, context,
property_rules)
@ -74,7 +74,7 @@ class Gateway(object):
notifier_image_repo = glance.notifier.ImageRepoProxy(
policy_image_repo, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules()
property_rules = property_utils.PropertyRules(self.policy)
protected_image_repo = property_protections.\
ProtectedImageRepoProxy(notifier_image_repo, context,
property_rules)

View File

@ -1,4 +1,5 @@
{
"context_is_admin": "role:admin",
"default": ""
"default": "",
"glance_creator": "role:admin or role:spl_role"
}

View File

@ -0,0 +1,17 @@
[spl_creator_policy]
create = glance_creator
read = glance_creator
update = context_is_admin
delete = context_is_admin
[spl_default_policy]
create = context_is_admin
read = default
update = context_is_admin
delete = context_is_admin
[.*]
create = context_is_admin
read = context_is_admin
update = context_is_admin
delete = context_is_admin

View File

@ -310,6 +310,7 @@ class ApiServer(Server):
self.image_cache_driver = 'sqlite'
self.policy_file = policy_file
self.policy_default_rule = 'default'
self.property_protection_rule_format = 'roles'
self.needs_database = True
default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir
@ -370,6 +371,7 @@ enable_v2_api = %(enable_v2_api)s
lock_path = %(lock_path)s
enable_v2_api= %(enable_v2_api)s
property_protection_file = %(property_protection_file)s
property_protection_rule_format = %(property_protection_rule_format)s
[paste_deploy]
flavor = %(deployment_flavor)s
"""
@ -575,8 +577,12 @@ class FunctionalTest(test_utils.BaseTestCase):
self.copy_data_file('schema-image.json', conf_dir)
self.copy_data_file('policy.json', conf_dir)
self.copy_data_file('property-protections.conf', conf_dir)
self.property_file = os.path.join(conf_dir,
'property-protections.conf')
self.copy_data_file('property-protections-policies.conf', conf_dir)
self.property_file_roles = os.path.join(conf_dir,
'property-protections.conf')
property_policies = 'property-protections-policies.conf'
self.property_file_policies = os.path.join(conf_dir,
property_policies)
self.policy_file = os.path.join(conf_dir, 'policy.json')
self.api_server = ApiServer(self.test_dir,

View File

@ -368,9 +368,9 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_property_protections(self):
def test_property_protections_with_roles(self):
# Enable property protection
self.api_server.property_protection_file = self.property_file
self.api_server.property_protection_file = self.property_file_roles
self.start_servers(**self.__dict__.copy())
# Image list should be empty
@ -427,6 +427,7 @@ class TestImages(functional.FunctionalTest):
data = json.dumps({'name': 'image-1',
'disk_format': 'aki', 'container_format': 'aki',
'spl_create_prop': 'create_bar',
'spl_create_prop_policy': 'create_policy_bar',
'spl_read_prop': 'read_bar',
'spl_update_prop': 'update_bar',
'spl_delete_prop': 'delete_bar'})
@ -495,6 +496,155 @@ class TestImages(functional.FunctionalTest):
self.stop_servers()
def test_property_protections_with_policies(self):
# Enable property protection
self.api_server.property_protection_file = self.property_file_policies
self.api_server.property_protection_rule_format = 'policies'
self.start_servers(**self.__dict__.copy())
# Image list should be empty
path = self._url('/v2/images')
response = requests.get(path, headers=self._headers())
self.assertEqual(200, response.status_code)
images = json.loads(response.text)['images']
self.assertEqual(0, len(images))
## Create an image for role member with extra props
# Raises 403 since user is not allowed to set 'foo'
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = json.dumps({'name': 'image-1', 'foo': 'bar',
'disk_format': 'aki', 'container_format': 'aki',
'x_owner_foo': 'o_s_bar'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(403, response.status_code)
## Create an image for role member without 'foo'
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'member'})
data = json.dumps({'name': 'image-1', 'disk_format': 'aki',
'container_format': 'aki'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
# Returned image entity
image = json.loads(response.text)
image_id = image['id']
expected_image = {
'status': 'queued',
'name': 'image-1',
'tags': [],
'visibility': 'private',
'self': '/v2/images/%s' % image_id,
'protected': False,
'file': '/v2/images/%s/file' % image_id,
'min_disk': 0,
'min_ram': 0,
'schema': '/v2/schemas/image',
}
for key, value in expected_image.items():
self.assertEqual(image[key], value, key)
# Create an image for role spl_role with extra props
path = self._url('/v2/images')
headers = self._headers({'content-type': 'application/json',
'X-Roles': 'spl_role, admin'})
data = json.dumps({'name': 'image-1',
'disk_format': 'aki', 'container_format': 'aki',
'spl_creator_policy': 'creator_bar',
'spl_default_policy': 'default_bar'})
response = requests.post(path, headers=headers, data=data)
self.assertEqual(201, response.status_code)
image = json.loads(response.text)
image_id = image['id']
self.assertEqual('creator_bar', image['spl_creator_policy'])
self.assertEqual('default_bar', image['spl_default_policy'])
# Attempt to replace a property which is permitted
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'admin'})
data = json.dumps([
{'op': 'replace', 'path': '/spl_creator_policy', 'value': 'r'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code, response.text)
# Returned image entity should reflect the changes
image = json.loads(response.text)
# 'spl_creator_policy' has update permission for admin
# hence the value has changed
self.assertEqual('r', image['spl_creator_policy'])
# Attempt to replace a property which is forbidden
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'spl_role'})
data = json.dumps([
{'op': 'replace', 'path': '/spl_creator_policy', 'value': 'z'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(403, response.status_code, response.text)
# Attempt to read properties
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({'content-type': media_type,
'X-Roles': 'random_role'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
image = json.loads(response.text)
# 'random_role' is allowed read 'spl_default_policy'.
self.assertEqual(image['spl_default_policy'], 'default_bar')
# 'random_role' is forbidden to read 'spl_creator_policy'.
self.assertFalse('spl_creator_policy' in image)
# Attempt to add and remove properties which are permitted
path = self._url('/v2/images/%s' % image_id)
media_type = 'application/openstack-images-v2.1-json-patch'
headers = self._headers({'content-type': media_type,
'X-Roles': 'admin'})
data = json.dumps([
{'op': 'remove', 'path': '/spl_creator_policy'},
])
response = requests.patch(path, headers=headers, data=data)
self.assertEqual(200, response.status_code, response.text)
# Returned image entity should reflect the changes
image = json.loads(response.text)
# 'spl_creator_policy' has delete permission for admin
# hence the value has been deleted
self.assertFalse('spl_creator_policy' in image)
# Attempt to read a property that is permitted
path = self._url('/v2/images/%s' % image_id)
headers = self._headers({'content-type': media_type,
'X-Roles': 'random_role'})
response = requests.get(path, headers=headers)
self.assertEqual(200, response.status_code)
# Returned image entity should reflect the changes
image = json.loads(response.text)
self.assertEqual(image['spl_default_policy'], 'default_bar')
# Image Deletion should work
path = self._url('/v2/images/%s' % image_id)
response = requests.delete(path, headers=self._headers())
self.assertEqual(204, response.status_code)
# This image should be no longer be directly accessible
path = self._url('/v2/images/%s' % image_id)
response = requests.get(path, headers=self._headers())
self.assertEqual(404, response.status_code)
self.stop_servers()
def test_tag_lifecycle(self):
# Create an image with a tag - duplicate should be ignored
path = self._url('/v2/images')

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from glance.api import policy
from glance.api import property_protections
from glance.common import exception
from glance.common import property_utils
@ -48,7 +49,8 @@ class TestProtectedImageRepoProxy(utils.BaseTestCase):
def setUp(self):
super(TestProtectedImageRepoProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
self.image_factory = glance.domain.ImageFactory()
extra_props = {'spl_create_prop': 'c',
'spl_read_prop': 'r',
@ -101,7 +103,8 @@ class TestProtectedImageProxy(utils.BaseTestCase):
def setUp(self):
super(TestProtectedImageProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
class ImageStub(object):
def __init__(self, extra_prop):
@ -123,92 +126,93 @@ class TestExtraPropertiesProxy(utils.BaseTestCase):
def setUp(self):
super(TestExtraPropertiesProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
def test_read_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin']
context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
test_result = extra_prop_proxy['foo']
self.assertEqual(test_result, 'bar')
def test_read_extra_property_as_unpermitted_role(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['unpermitted_role']
context = glance.context.RequestContext(roles=['unpermitted_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_update_extra_property_as_permitted_role_after_read(self):
extra_properties = {'foo': 'bar', 'ping': 'pong'}
roles = ['admin']
context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
extra_prop_proxy['foo'] = 'par'
self.assertEqual(extra_prop_proxy['foo'], 'par')
def test_update_extra_property_as_unpermitted_role_after_read(self):
extra_properties = {'spl_read_prop': 'bar'}
roles = ['spl_role']
context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__,
'spl_read_prop', 'par')
def test_update_reserved_extra_property_as_permitted_role(self):
def test_update_reserved_extra_property(self):
extra_properties = {'spl_create_prop': 'bar'}
roles = ['spl_role']
context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'spl_create_prop',
'par')
def test_create_extra_property_as_permitted_role_after_read(self):
def test_create_extra_property_admin(self):
extra_properties = {}
roles = ['admin']
context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
extra_prop_proxy['boo'] = 'doo'
self.assertEqual(extra_prop_proxy['boo'], 'doo')
def test_create_reserved_extra_property_as_permitted_role(self):
def test_create_reserved_extra_property(self):
extra_properties = {}
roles = ['spl_role']
context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__setitem__, 'boo',
'doo')
def test_delete_extra_property_as_admin_role(self):
extra_properties = {'foo': 'bar'}
roles = ['admin']
context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
del extra_prop_proxy['foo']
self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo')
def test_delete_nonexistant_extra_property_as_admin_role(self):
extra_properties = {}
roles = ['admin']
context = glance.context.RequestContext(roles=['admin'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
self.assertRaises(KeyError, extra_prop_proxy.__delitem__, 'foo')
def test_delete_reserved_extra_property_as_permitted_role(self):
def test_delete_reserved_extra_property(self):
extra_properties = {'spl_read_prop': 'r'}
roles = ['spl_role']
context = glance.context.RequestContext(roles=['spl_role'])
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
roles, extra_properties, self.property_rules)
context, extra_properties, self.property_rules)
# Ensure property has been created and can be read
self.assertEqual(extra_prop_proxy['spl_read_prop'], 'r')
self.assertRaises(exception.ReservedProperty,
extra_prop_proxy.__delitem__, 'spl_read_prop')
def test_delete_nonexistant_extra_property_as_permitted_role(self):
def test_delete_nonexistant_extra_property(self):
extra_properties = {}
roles = ['spl_role']
extra_prop_proxy = property_protections.ExtraPropertiesProxy(
@ -221,7 +225,8 @@ class TestProtectedImageFactoryProxy(utils.BaseTestCase):
def setUp(self):
super(TestProtectedImageFactoryProxy, self).setUp()
self.set_property_protections()
self.property_rules = property_utils.PropertyRules()
self.policy = policy.Enforcer()
self.property_rules = property_utils.PropertyRules(self.policy)
self.factory = glance.domain.ImageFactory()
def test_create_image_no_extra_prop(self):

View File

@ -15,8 +15,11 @@
import webob.exc
from glance.api import policy
from glance.common import exception
from glance.common import property_utils
from glance.tests import utils
import glance.context
from glance.tests.unit import base
CONFIG_SECTIONS = [
'^x_owner_.*',
@ -30,16 +33,20 @@ CONFIG_SECTIONS = [
]
class TestPropertyRules(utils.BaseTestCase):
def create_context(policy, roles=[]):
return glance.context.RequestContext(roles=roles,
policy_enforcer=policy)
class TestPropertyRulesWithRoles(base.IsolatedUnitTest):
def setUp(self):
super(TestPropertyRules, self).setUp()
super(TestPropertyRulesWithRoles, self).setUp()
self.set_property_protections()
self.policy = policy.Enforcer()
def tearDown(self):
for section in property_utils.CONFIG.sections():
property_utils.CONFIG.remove_section(section)
super(TestPropertyRules, self).tearDown()
super(TestPropertyRulesWithRoles, self).tearDown()
def test_is_property_protections_enabled_true(self):
self.config(property_protection_file="property-protections.conf")
@ -51,7 +58,7 @@ class TestPropertyRules(utils.BaseTestCase):
def test_property_protection_file_doesnt_exist(self):
self.config(property_protection_file='fake-file.conf')
self.assertRaises(webob.exc.HTTPInternalServerError,
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_malformed_rule(self):
@ -60,7 +67,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(webob.exc.HTTPInternalServerError,
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_missing_operation(self):
@ -68,7 +75,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_missing_operation)
self.assertRaises(webob.exc.HTTPInternalServerError,
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_misspelt_operation(self):
@ -77,7 +84,7 @@ class TestPropertyRules(utils.BaseTestCase):
'update': ['fake-role'],
'delete': ['fake-role']}}
self.set_property_protection_rules(rules_with_misspelt_operation)
self.assertRaises(webob.exc.HTTPInternalServerError,
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_whitespace(self):
@ -92,81 +99,85 @@ class TestPropertyRules(utils.BaseTestCase):
self.set_property_protection_rules(rules_whitespace)
self.rules_checker = property_utils.PropertyRules()
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['member']))
'read', create_context(self.policy, ['member'])))
self.assertTrue(self.rules_checker.check_property_rules('test_prop_1',
'read', ['fake-role']))
'read', create_context(self.policy, ['fake-role'])))
def test_check_property_rules_invalid_action(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'hall', ['admin']))
'hall', create_context(self.policy, ['admin'])))
def test_check_property_rules_read_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'read', ['admin']))
'read', create_context(self.policy, ['admin'])))
def test_check_property_rules_read_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'read', ['member']))
'x_owner_prop', 'read',
create_context(self.policy, ['member'])))
def test_check_property_rules_read_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'read', ['member']))
'read', create_context(self.policy, ['member'])))
def test_check_property_rules_create_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'create', ['admin']))
'create', create_context(self.policy, ['admin'])))
def test_check_property_rules_create_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'create', ['member']))
'x_owner_prop', 'create',
create_context(self.policy, ['member'])))
def test_check_property_rules_create_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'create', ['member']))
'create', create_context(self.policy, ['member'])))
def test_check_property_rules_update_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'update', ['admin']))
'update', create_context(self.policy, ['admin'])))
def test_check_property_rules_update_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'update', ['member']))
'x_owner_prop', 'update',
create_context(self.policy, ['member'])))
def test_check_property_rules_update_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'update', ['member']))
'update', create_context(self.policy, ['member'])))
def test_check_property_rules_delete_permitted_admin_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules('test_prop',
'delete', ['admin']))
'delete', create_context(self.policy, ['admin'])))
def test_check_property_rules_delete_permitted_specific_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertTrue(self.rules_checker.check_property_rules(
'x_owner_prop', 'delete', ['member']))
'x_owner_prop', 'delete',
create_context(self.policy, ['member'])))
def test_check_property_rules_delete_unpermitted_role(self):
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertFalse(self.rules_checker.check_property_rules('test_prop',
'delete', ['member']))
'delete', create_context(self.policy, ['member'])))
def test_property_config_loaded_in_order(self):
"""
Verify the order of loaded config sections matches that from the
configuration file
"""
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
self.assertEqual(property_utils.CONFIG.sections(), CONFIG_SECTIONS)
def test_property_rules_loaded_in_order(self):
@ -174,7 +185,77 @@ class TestPropertyRules(utils.BaseTestCase):
Verify rules are iterable in the same order as read from the config
file
"""
self.rules_checker = property_utils.PropertyRules()
self.rules_checker = property_utils.PropertyRules(self.policy)
for i in xrange(len(property_utils.CONFIG.sections())):
self.assertEqual(property_utils.CONFIG.sections()[i],
self.rules_checker.rules[i][0].pattern)
class TestPropertyRulesWithPolicies(base.IsolatedUnitTest):
def setUp(self):
super(TestPropertyRulesWithPolicies, self).setUp()
self.set_property_protections(use_policies=True)
self.policy = policy.Enforcer()
self.rules_checker = property_utils.PropertyRules(self.policy)
def tearDown(self):
super(TestPropertyRulesWithPolicies, self).tearDown()
def test_check_property_rules_create_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'create',
create_context(self.policy, ['spl_role'])))
def test_check_property_rules_create_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'create',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_read_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'read',
create_context(self.policy, ['spl_role'])))
def test_check_property_rules_read_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'read',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_update_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'update',
create_context(self.policy, ['admin'])))
def test_check_property_rules_update_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'update',
create_context(self.policy, ['fake-role'])))
def test_check_property_rules_delete_permitted_specific_policy(self):
self.assertTrue(self.rules_checker.check_property_rules(
'spl_creator_policy', 'delete',
create_context(self.policy, ['admin'])))
def test_check_property_rules_delete_unpermitted_policy(self):
self.assertFalse(self.rules_checker.check_property_rules(
'spl_creator_policy', 'delete',
create_context(self.policy, ['fake-role'])))
def test_property_protection_with_malformed_rule(self):
malformed_rules = {'^[0-9)': {'create': ['fake-policy'],
'read': ['fake-policy'],
'update': ['fake-policy'],
'delete': ['fake-policy']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)
def test_property_protection_with_multiple_policies(self):
malformed_rules = {'^x_.*': {'create': ['fake-policy, another_pol'],
'read': ['fake-policy'],
'update': ['fake-policy'],
'delete': ['fake-policy']}}
self.set_property_protection_rules(malformed_rules)
self.assertRaises(exception.InvalidPropertyProtectionConfiguration,
property_utils.PropertyRules)

View File

@ -2487,7 +2487,7 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
def test_prop_protection_with_create_and_permitted_role(self):
"""
As admin role, create and image and verify permitted role 'member' can
As admin role, create an image and verify permitted role 'member' can
create a protected property
"""
image_id = self._create_admin_image()
@ -2501,6 +2501,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_permitted_policy_config(self):
"""
As admin role, create an image and verify permitted role 'member' can
create a protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image()
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:admin',
'x-image-meta-property-spl_create_prop_policy': 'bar'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['spl_create_prop_policy'],
'bar')
def test_prop_protection_with_create_and_unpermitted_role(self):
"""
As admin role, create an image and verify unpermitted role
@ -2605,6 +2624,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['images'][0]
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_detail_and_permitted_policy(self):
"""
As admin role, create an image with a protected property, and verify
permitted role 'member' can read that protected property via
/images/detail
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
method='GET', path='/images/detail')
headers = {'x-auth-token': 'user:tenant:member'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['images'][0]
self.assertEqual(res_body['properties']['x_owner_foo'], 'bar')
def test_prop_protection_with_detail_and_unpermitted_role(self):
"""
As admin role, create an image with a protected property, and verify
@ -2624,6 +2662,26 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
self.assertNotIn('x-image-meta-property-x_owner_foo',
res_body['properties'])
def test_prop_protection_with_detail_and_unpermitted_policy(self):
"""
As admin role, create an image with a protected property, and verify
permitted role 'fake_role' can *not* read that protected property via
/images/detail
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
method='GET', path='/images/detail')
headers = {'x-auth-token': 'user:tenant:fake_role'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEqual(output.status_int, 200)
res_body = json.loads(output.body)['images'][0]
self.assertNotIn('x-image-meta-property-x_owner_foo',
res_body['properties'])
def test_prop_protection_with_update_and_permitted_role(self):
"""
As admin role, create an image with protected property, and verify
@ -2641,6 +2699,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['x_owner_foo'], 'baz')
def test_prop_protection_with_update_and_permitted_policy(self):
"""
As admin role, create an image with protected property, and verify
permitted role 'admin' can update that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-spl_default_policy': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:admin',
'x-image-meta-property-spl_default_policy': 'baz'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties']['spl_default_policy'], 'baz')
def test_prop_protection_with_update_and_unpermitted_role(self):
"""
As admin role, create an image with protected property, and verify
@ -2659,6 +2735,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
self.assertIn("Property '%s' is protected" %
"x_owner_foo", output.body)
def test_prop_protection_with_update_and_unpermitted_policy(self):
"""
As admin role, create an image with protected property, and verify
unpermitted role 'fake_role' can *not* update that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:fake_role',
'x-image-meta-property-x_owner_foo': 'baz'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
self.assertEquals(output.status_int, webob.exc.HTTPForbidden.code)
self.assertIn("Property '%s' is protected" %
"x_owner_foo", output.body)
def test_prop_protection_update_without_read(self):
"""
Test protected property cannot be updated without read permission
@ -2711,6 +2806,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest):
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties'], {})
def test_prop_protection_with_delete_and_permitted_policy(self):
"""
As admin role, create an image with protected property, and verify
permitted role 'member' can can delete that protected property
"""
self.set_property_protections(use_policies=True)
image_id = self._create_admin_image(
{'x-image-meta-property-x_owner_foo': 'bar'})
another_request = unit_test_utils.get_fake_request(
path='/images/%s' % image_id, method='PUT')
headers = {'x-auth-token': 'user:tenant:member',
'X-Glance-Registry-Purge-Props': 'True'}
for k, v in headers.iteritems():
another_request.headers[k] = v
output = another_request.get_response(self.api)
res_body = json.loads(output.body)['image']
self.assertEqual(res_body['properties'], {})
def test_prop_protection_with_delete_and_unpermitted_read(self):
"""
Test protected property cannot be deleted without read permission

View File

@ -105,7 +105,7 @@ def _db_image_member_fixture(image_id, member_id, **kwargs):
return obj
class TestImagesController(test_utils.BaseTestCase):
class TestImagesController(base.IsolatedUnitTest):
def setUp(self):
super(TestImagesController, self).setUp()
@ -607,6 +607,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.controller.update, request, UUID1, changes)
def test_prop_protection_with_create_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -621,7 +626,66 @@ class TestImagesController(test_utils.BaseTestCase):
created_image.image_id, changes)
self.assertEqual(output.extra_properties['x_owner_foo'], 'bar')
def test_prop_protection_with_update_and_permitted_policy(self):
self.set_property_protections(use_policies=True)
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
request = unit_test_utils.get_fake_request(roles=['spl_role'])
image = {'name': 'image-1'}
extra_props = {'spl_creator_policy': 'bar'}
created_image = self.controller.create(request, image=image,
extra_properties=extra_props,
tags=[])
self.assertEqual(created_image.extra_properties['spl_creator_policy'],
'bar')
another_request = unit_test_utils.get_fake_request(roles=['spl_role'])
changes = [
{'op': 'replace', 'path': ['spl_creator_policy'], 'value': 'par'},
]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
another_request, created_image.image_id, changes)
another_request = unit_test_utils.get_fake_request(roles=['admin'])
output = self.controller.update(another_request,
created_image.image_id, changes)
self.assertEqual(output.extra_properties['spl_creator_policy'],
'par')
def test_prop_protection_with_create_with_patch_and_policy(self):
self.set_property_protections(use_policies=True)
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
request = unit_test_utils.get_fake_request(roles=['spl_role', 'admin'])
image = {'name': 'image-1'}
extra_props = {'spl_default_policy': 'bar'}
created_image = self.controller.create(request, image=image,
extra_properties=extra_props,
tags=[])
another_request = unit_test_utils.get_fake_request(roles=['fake_role'])
changes = [
{'op': 'add', 'path': ['spl_creator_policy'], 'value': 'bar'},
]
self.assertRaises(webob.exc.HTTPForbidden, self.controller.update,
another_request, created_image.image_id, changes)
another_request = unit_test_utils.get_fake_request(roles=['spl_role'])
output = self.controller.update(another_request,
created_image.image_id, changes)
self.assertEqual(output.extra_properties['spl_creator_policy'],
'bar')
def test_prop_protection_with_create_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -638,6 +702,11 @@ class TestImagesController(test_utils.BaseTestCase):
created_image.image_id, changes)
def test_prop_protection_with_show_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -650,6 +719,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.assertEqual(output.extra_properties['x_owner_foo'], 'bar')
def test_prop_protection_with_show_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['member'])
image = {'name': 'image-1'}
@ -663,6 +737,11 @@ class TestImagesController(test_utils.BaseTestCase):
'x_owner_foo')
def test_prop_protection_with_update_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -679,6 +758,11 @@ class TestImagesController(test_utils.BaseTestCase):
self.assertEqual(output.extra_properties['x_owner_foo'], 'baz')
def test_prop_protection_with_update_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -694,6 +778,11 @@ class TestImagesController(test_utils.BaseTestCase):
request, UUID1, changes)
def test_prop_protection_with_delete_and_permitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}
@ -711,6 +800,11 @@ class TestImagesController(test_utils.BaseTestCase):
'x_owner_foo')
def test_prop_protection_with_delete_and_unpermitted_role(self):
enforcer = glance.api.policy.Enforcer()
self.controller = glance.api.v2.images.ImagesController(self.db,
enforcer,
self.notifier,
self.store)
self.set_property_protections()
request = unit_test_utils.get_fake_request(roles=['admin'])
image = {'name': 'image-1'}

View File

@ -65,11 +65,19 @@ class BaseTestCase(testtools.TestCase):
self.stubs.SmartUnsetAll()
super(BaseTestCase, self).tearDown()
def set_property_protections(self):
self.property_file = self._copy_data_file('property-protections.conf',
self.test_dir)
def set_property_protections(self, use_policies=False):
self.unset_property_protections()
conf_file = "property-protections.conf"
if use_policies:
conf_file = "property-protections-policies.conf"
self.config(property_protection_rule_format="policies")
self.property_file = self._copy_data_file(conf_file, self.test_dir)
self.config(property_protection_file=self.property_file)
def unset_property_protections(self):
for section in property_utils.CONFIG.sections():
property_utils.CONFIG.remove_section(section)
def _copy_data_file(self, file_name, dst_dir):
src_file_name = os.path.join('glance/tests/etc', file_name)
shutil.copy(src_file_name, dst_dir)