diff --git a/etc/glance-api.conf b/etc/glance-api.conf index 33dd439add..665d69b3f7 100644 --- a/etc/glance-api.conf +++ b/etc/glance-api.conf @@ -107,16 +107,21 @@ workers = 1 # (string value). This setting needs to be the same for both # glance-scrubber and glance-api. #lock_path= -# + # Property Protections config file -# This file contains the rules for property protections and the roles +# This file contains the rules for property protections and the roles/policies # associated with it. # If this config value is not specified, by default, property protections # won't be enforced. -# If a value is specified and the file is not found, then an -# HTTPInternalServerError will be thrown. +# If a value is specified and the file is not found, then the glance-api +# service will not start. #property_protection_file = +# Specify whether 'roles' or 'policies' are used in the +# property_protection_file. +# The default value for property_protection_rule_format is 'roles'. +#property_protection_rule_format = roles + # Set a system wide quota for every user. This value is the total number # of bytes that a user can use across all storage systems. A value of # 0 means unlimited. diff --git a/etc/property-protections-policies.conf.sample b/etc/property-protections-policies.conf.sample new file mode 100644 index 0000000000..38f611e5e8 --- /dev/null +++ b/etc/property-protections-policies.conf.sample @@ -0,0 +1,34 @@ +# property-protections-policies.conf.sample +# +# This file is an example config file for when +# property_protection_rule_format=policies is enabled. +# +# Specify regular expression for which properties will be protected in [] +# For each section, specify CRUD permissions. You may refer to policies defined +# in policy.json. +# The property rules will be applied in the order specified. Once +# a match is found the remaining property rules will not be applied. +# +# WARNING: +# * If the reg ex specified below does not compile, then +# the glance-api service fails to start. (Guide for reg ex python compiler +# used: +# http://docs.python.org/2/library/re.html#regular-expression-syntax) +# * If an operation(create, read, update, delete) is not specified or misspelt +# then the glance-api service fails to start. +# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY! +# +# NOTE: Only one policy can be specified per action. If multiple policies are +# specified, then the glance-api service fails to start. + +[^x_.*] +create = default +read = default +update = default +delete = default + +[.*] +create = context_is_admin +read = context_is_admin +update = context_is_admin +delete = context_is_admin diff --git a/etc/property-protections-roles.conf.sample b/etc/property-protections-roles.conf.sample new file mode 100644 index 0000000000..634b5820cc --- /dev/null +++ b/etc/property-protections-roles.conf.sample @@ -0,0 +1,32 @@ +# property-protections-roles.conf.sample +# +# This file is an example config file for when +# property_protection_rule_format=roles is enabled. +# +# Specify regular expression for which properties will be protected in [] +# For each section, specify CRUD permissions. +# The property rules will be applied in the order specified. Once +# a match is found the remaining property rules will not be applied. +# +# WARNING: +# * If the reg ex specified below does not compile, then +# glance-api service will not start. (Guide for reg ex python compiler used: +# http://docs.python.org/2/library/re.html#regular-expression-syntax) +# * If an operation(create, read, update, delete) is not specified or misspelt +# then the glance-api service will not start. +# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY! +# +# NOTE: Multiple roles can be specified for a given operation. These roles must +# be comma separated. + +[^x_.*] +create = admin,member +read = admin,member +update = admin,member +delete = admin,member + +[.*] +create = admin +read = admin +update = admin +delete = admin diff --git a/etc/property-protections.conf.sample b/etc/property-protections.conf.sample deleted file mode 100644 index f1df25927a..0000000000 --- a/etc/property-protections.conf.sample +++ /dev/null @@ -1,25 +0,0 @@ -# property-protections.conf.sample -# Specify regular expression for which properties will be protected in [] -# For each section, specify CRUD permissions. You may refer to roles defined -# in policy.json -# The property rules will be applied in the order specified below. Once -# a match is found the remaining property rules will not be traversed through. -# WARNING: -# * If the reg ex specified below does not compile, then -# HTTPInternalServerErrors will be thrown. (Guide for reg ex python compiler used: -# http://docs.python.org/2/library/re.html#regular-expression-syntax) -# * If an operation(create, read, update, delete) is not specified or misspelt -# then that operation for the given regex is disabled for all roles. -# So, remember, with GREAT POWER comes GREAT RESPONSIBILITY! - -[^x_.*] -create = admin,member -read = admin,member -update = admin,member -delete = admin,member - -[.*] -create = admin -read = admin -update = admin -delete = admin diff --git a/glance/api/policy.py b/glance/api/policy.py index 468226bb55..c7bc86ad43 100644 --- a/glance/api/policy.py +++ b/glance/api/policy.py @@ -55,12 +55,21 @@ class Enforcer(object): self.policy_path = self._find_policy_file() self.policy_file_mtime = None self.policy_file_contents = None + self.load_rules() def set_rules(self, rules): """Create a new Rules object based on the provided dict of rules""" rules_obj = policy.Rules(rules, self.default_rule) policy.set_rules(rules_obj) + def add_rules(self, rules): + """Add new rules to the Rules object""" + if policy._rules: + rules_obj = policy.Rules(rules) + policy._rules.update(rules_obj) + else: + self.set_rules(rules) + def load_rules(self): """Set the rules found in the json file on disk""" if self.policy_path: @@ -112,8 +121,6 @@ class Enforcer(object): :raises: `glance.common.exception.Forbidden` :returns: A non-False value if access is allowed. """ - self.load_rules() - credentials = { 'roles': context.roles, 'user': context.user, diff --git a/glance/api/property_protections.py b/glance/api/property_protections.py index f10bc7536a..ce3bd8258b 100644 --- a/glance/api/property_protections.py +++ b/glance/api/property_protections.py @@ -23,7 +23,6 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory): def __init__(self, image_factory, context, property_rules): self.image_factory = image_factory self.context = context - self.roles = self.context.roles self.property_rules = property_rules kwargs = {'context': self.context, 'property_rules': self.property_rules} @@ -38,7 +37,7 @@ class ProtectedImageFactoryProxy(glance.domain.proxy.ImageFactory): extra_properties = {} for key in extra_props.keys(): if self.property_rules.check_property_rules(key, 'create', - self.roles): + self.context): extra_properties[key] = extra_props[key] else: raise exception.ReservedProperty(property=key) @@ -72,11 +71,10 @@ class ProtectedImageProxy(glance.domain.proxy.Image): def __init__(self, image, context, property_rules): self.image = image self.context = context - self.roles = self.context.roles self.property_rules = property_rules self.image.extra_properties = ExtraPropertiesProxy( - self.roles, + self.context, self.image.extra_properties, self.property_rules) super(ProtectedImageProxy, self).__init__(self.image) @@ -84,18 +82,18 @@ class ProtectedImageProxy(glance.domain.proxy.Image): class ExtraPropertiesProxy(glance.domain.ExtraProperties): - def __init__(self, roles, extra_props, property_rules): - self.roles = roles + def __init__(self, context, extra_props, property_rules): + self.context = context self.property_rules = property_rules extra_properties = {} for key in extra_props.keys(): if self.property_rules.check_property_rules(key, 'read', - self.roles): + self.context): extra_properties[key] = extra_props[key] super(ExtraPropertiesProxy, self).__init__(extra_properties) def __getitem__(self, key): - if self.property_rules.check_property_rules(key, 'read', self.roles): + if self.property_rules.check_property_rules(key, 'read', self.context): return dict.__getitem__(self, key) else: raise KeyError @@ -108,13 +106,13 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties): try: if self.__getitem__(key): if self.property_rules.check_property_rules(key, 'update', - self.roles): + self.context): return dict.__setitem__(self, key, value) else: raise exception.ReservedProperty(property=key) except KeyError: if self.property_rules.check_property_rules(key, 'create', - self.roles): + self.context): return dict.__setitem__(self, key, value) else: raise exception.ReservedProperty(property=key) @@ -124,7 +122,7 @@ class ExtraPropertiesProxy(glance.domain.ExtraProperties): raise KeyError if self.property_rules.check_property_rules(key, 'delete', - self.roles): + self.context): return dict.__delitem__(self, key) else: raise exception.ReservedProperty(property=key) diff --git a/glance/api/v1/images.py b/glance/api/v1/images.py index aa7995c9d9..96062ab1fe 100644 --- a/glance/api/v1/images.py +++ b/glance/api/v1/images.py @@ -137,7 +137,7 @@ class Controller(controller.BaseController): self.policy = policy.Enforcer() self.pool = eventlet.GreenPool(size=1024) if property_utils.is_property_protection_enabled(): - self.prop_enforcer = property_utils.PropertyRules() + self.prop_enforcer = property_utils.PropertyRules(self.policy) else: self.prop_enforcer = None @@ -160,7 +160,7 @@ class Controller(controller.BaseController): if property_utils.is_property_protection_enabled(): for key in create_props: if (self.prop_enforcer.check_property_rules( - key, 'create', req.context.roles) is False): + key, 'create', req.context) is False): msg = _("Property '%s' is protected" % key) LOG.debug(msg) raise HTTPForbidden(explanation=msg, @@ -177,7 +177,7 @@ class Controller(controller.BaseController): if property_utils.is_property_protection_enabled(): for key in image_meta['properties'].keys(): if (self.prop_enforcer.check_property_rules( - key, 'read', req.context.roles) is False): + key, 'read', req.context) is False): image_meta['properties'].pop(key) def _enforce_update_protected_props(self, update_props, image_meta, @@ -200,9 +200,9 @@ class Controller(controller.BaseController): if property_utils.is_property_protection_enabled(): for key in update_props: has_read = self.prop_enforcer.check_property_rules( - key, 'read', req.context.roles) + key, 'read', req.context) if ((self.prop_enforcer.check_property_rules( - key, 'update', req.context.roles) is False and + key, 'update', req.context) is False and image_meta['properties'][key] != orig_meta['properties'][key]) or not has_read): msg = _("Property '%s' is protected" % key) @@ -232,13 +232,13 @@ class Controller(controller.BaseController): if property_utils.is_property_protection_enabled(): for key in delete_props: if (self.prop_enforcer.check_property_rules( - key, 'read', req.context.roles) is False): + key, 'read', req.context) is False): # NOTE(bourke): if read protected, re-add to image_meta to # prevent deletion image_meta['properties'][key] = \ orig_meta['properties'][key] elif (self.prop_enforcer.check_property_rules( - key, 'delete', req.context.roles) is False): + key, 'delete', req.context) is False): msg = _("Property '%s' is protected" % key) LOG.debug(msg) raise HTTPForbidden(explanation=msg, diff --git a/glance/common/exception.py b/glance/common/exception.py index 051500f12d..b8ebced883 100644 --- a/glance/common/exception.py +++ b/glance/common/exception.py @@ -136,6 +136,10 @@ class InvalidSortKey(Invalid): message = _("Sort key supplied was not valid.") +class InvalidPropertyProtectionConfiguration(Invalid): + message = _("Invalid configuration in property protection file.") + + class InvalidFilterRangeValue(Invalid): message = _("Unable to filter using the specified range.") diff --git a/glance/common/property_utils.py b/glance/common/property_utils.py index 4e13b8feb3..b03c553cf7 100644 --- a/glance/common/property_utils.py +++ b/glance/common/property_utils.py @@ -20,8 +20,11 @@ import re from oslo.config import cfg import webob.exc +import glance.api.policy +from glance.common import exception from glance.common.ordereddict import OrderedDict from glance.openstack.common import log as logging +from glance.openstack.common import policy # NOTE(bourke): The default dict_type is collections.OrderedDict in py27, but # we must set manually for compatibility with py26 @@ -32,6 +35,10 @@ property_opts = [ cfg.StrOpt('property_protection_file', default=None, help=_('The location of the property protection file.')), + cfg.StrOpt('property_protection_rule_format', + default='roles', + help=_('This config value indicates whether "roles" or ' + '"policies" are used in the property protection file.')), ] CONF = cfg.CONF @@ -46,8 +53,13 @@ def is_property_protection_enabled(): class PropertyRules(object): - def __init__(self): + def __init__(self, policy_enforcer=None): self.rules = [] + self.prop_exp_mapping = {} + self.policies = [] + self.policy_enforcer = policy_enforcer or glance.api.policy.Enforcer() + self.prop_prot_rule_format = CONF.property_protection_rule_format + self.prop_prot_rule_format = self.prop_prot_rule_format.lower() self._load_rules() def _load_rules(self): @@ -58,7 +70,14 @@ class PropertyRules(object): msg = (_("Couldn't find property protection file %s:%s.") % (CONF.property_protection_file, e)) LOG.error(msg) - raise webob.exc.HTTPInternalServerError(explanation=msg) + raise exception.InvalidPropertyProtectionConfiguration() + + if self.prop_prot_rule_format not in ['policies', 'roles']: + msg = _("Invalid value '%s' for 'property_protection_rule_format'" + ". The permitted values are 'roles' and 'policies'" % + self.prop_prot_rule_format) + LOG.error(msg) + raise exception.InvalidPropertyProtectionConfiguration() operations = ['create', 'read', 'update', 'delete'] properties = CONFIG.sections() @@ -67,10 +86,25 @@ class PropertyRules(object): compiled_rule = self._compile_rule(property_exp) for operation in operations: - roles = CONFIG.get(property_exp, operation) - if roles: - roles = [role.strip() for role in roles.split(',')] - property_dict[operation] = roles + permissions = CONFIG.get(property_exp, operation) + if permissions: + if self.prop_prot_rule_format == 'policies': + if ',' in permissions: + msg = _("Multiple policies '%s' not allowed for a" + " given operation. Policies can be " + "combined in the policy file" % + permissions) + LOG.error(msg) + raise exception.\ + InvalidPropertyProtectionConfiguration() + self.prop_exp_mapping[compiled_rule] = property_exp + self._add_policy_rules(property_exp, operation, + permissions) + permissions = [permissions] + else: + permissions = [permission.strip() for permission in + permissions.split(',')] + property_dict[operation] = permissions else: property_dict[operation] = [] msg = _(('Property protection on operation %s for rule ' @@ -88,9 +122,34 @@ class PropertyRules(object): msg = (_("Encountered a malformed property protection rule %s:%s.") % (rule, e)) LOG.error(msg) - raise webob.exc.HTTPInternalServerError(explanation=msg) + raise exception.InvalidPropertyProtectionConfiguration() - def check_property_rules(self, property_name, action, roles): + def _add_policy_rules(self, property_exp, action, rule): + """ Add policy rules to the policy enforcer. + For example, if the file listed as property_protection_file has: + [prop_a] + create = glance_creator + then the corresponding policy rule would be: + "prop_a:create": "rule:glance_creator" + where glance_creator is defined in policy.json. For example: + "glance:creator": "role:admin or role:glance_create_user" + """ + rule = "rule:%s" % rule + rule_name = "%s:%s" % (property_exp, action) + rule_dict = {} + rule_dict[rule_name] = policy.parse_rule(rule) + self.policy_enforcer.add_rules(rule_dict) + + def _check_policy(self, property_exp, action, context): + try: + target = ":".join([property_exp, action]) + self.policy_enforcer.enforce(context, target, {}) + except exception.Forbidden: + return False + return True + + def check_property_rules(self, property_name, action, context): + roles = context.roles if not self.rules: return True @@ -99,6 +158,12 @@ class PropertyRules(object): for rule_exp, rule in self.rules: if rule_exp.search(str(property_name)): - if set(roles).intersection(set(rule.get(action))): - return True + rule_roles = rule.get(action) + if rule_roles: + if self.prop_prot_rule_format == 'policies': + prop_exp_key = self.prop_exp_mapping[rule_exp] + return self._check_policy(prop_exp_key, action, + context) + if set(roles).intersection(set(rule_roles)): + return True return False diff --git a/glance/gateway.py b/glance/gateway.py index 3225cf8480..f550c1ec91 100644 --- a/glance/gateway.py +++ b/glance/gateway.py @@ -44,7 +44,7 @@ class Gateway(object): notifier_image_factory = glance.notifier.ImageFactoryProxy( policy_image_factory, context, self.notifier) if property_utils.is_property_protection_enabled(): - property_rules = property_utils.PropertyRules() + property_rules = property_utils.PropertyRules(self.policy) protected_image_factory = property_protections.\ ProtectedImageFactoryProxy(notifier_image_factory, context, property_rules) @@ -74,7 +74,7 @@ class Gateway(object): notifier_image_repo = glance.notifier.ImageRepoProxy( policy_image_repo, context, self.notifier) if property_utils.is_property_protection_enabled(): - property_rules = property_utils.PropertyRules() + property_rules = property_utils.PropertyRules(self.policy) protected_image_repo = property_protections.\ ProtectedImageRepoProxy(notifier_image_repo, context, property_rules) diff --git a/glance/tests/etc/policy.json b/glance/tests/etc/policy.json index 8845d3498f..7ca97311cb 100644 --- a/glance/tests/etc/policy.json +++ b/glance/tests/etc/policy.json @@ -1,4 +1,5 @@ { "context_is_admin": "role:admin", - "default": "" + "default": "", + "glance_creator": "role:admin or role:spl_role" } diff --git a/glance/tests/etc/property-protections-policies.conf b/glance/tests/etc/property-protections-policies.conf new file mode 100644 index 0000000000..505f36473c --- /dev/null +++ b/glance/tests/etc/property-protections-policies.conf @@ -0,0 +1,17 @@ +[spl_creator_policy] +create = glance_creator +read = glance_creator +update = context_is_admin +delete = context_is_admin + +[spl_default_policy] +create = context_is_admin +read = default +update = context_is_admin +delete = context_is_admin + +[.*] +create = context_is_admin +read = context_is_admin +update = context_is_admin +delete = context_is_admin diff --git a/glance/tests/functional/__init__.py b/glance/tests/functional/__init__.py index 3f285ae9af..b730e41f98 100644 --- a/glance/tests/functional/__init__.py +++ b/glance/tests/functional/__init__.py @@ -310,6 +310,7 @@ class ApiServer(Server): self.image_cache_driver = 'sqlite' self.policy_file = policy_file self.policy_default_rule = 'default' + self.property_protection_rule_format = 'roles' self.needs_database = True default_sql_connection = 'sqlite:////%s/tests.sqlite' % self.test_dir @@ -370,6 +371,7 @@ enable_v2_api = %(enable_v2_api)s lock_path = %(lock_path)s enable_v2_api= %(enable_v2_api)s property_protection_file = %(property_protection_file)s +property_protection_rule_format = %(property_protection_rule_format)s [paste_deploy] flavor = %(deployment_flavor)s """ @@ -575,8 +577,12 @@ class FunctionalTest(test_utils.BaseTestCase): self.copy_data_file('schema-image.json', conf_dir) self.copy_data_file('policy.json', conf_dir) self.copy_data_file('property-protections.conf', conf_dir) - self.property_file = os.path.join(conf_dir, - 'property-protections.conf') + self.copy_data_file('property-protections-policies.conf', conf_dir) + self.property_file_roles = os.path.join(conf_dir, + 'property-protections.conf') + property_policies = 'property-protections-policies.conf' + self.property_file_policies = os.path.join(conf_dir, + property_policies) self.policy_file = os.path.join(conf_dir, 'policy.json') self.api_server = ApiServer(self.test_dir, diff --git a/glance/tests/functional/v2/test_images.py b/glance/tests/functional/v2/test_images.py index 4c54bc3ce6..269c82cc82 100644 --- a/glance/tests/functional/v2/test_images.py +++ b/glance/tests/functional/v2/test_images.py @@ -368,9 +368,9 @@ class TestImages(functional.FunctionalTest): self.stop_servers() - def test_property_protections(self): + def test_property_protections_with_roles(self): # Enable property protection - self.api_server.property_protection_file = self.property_file + self.api_server.property_protection_file = self.property_file_roles self.start_servers(**self.__dict__.copy()) # Image list should be empty @@ -427,6 +427,7 @@ class TestImages(functional.FunctionalTest): data = json.dumps({'name': 'image-1', 'disk_format': 'aki', 'container_format': 'aki', 'spl_create_prop': 'create_bar', + 'spl_create_prop_policy': 'create_policy_bar', 'spl_read_prop': 'read_bar', 'spl_update_prop': 'update_bar', 'spl_delete_prop': 'delete_bar'}) @@ -495,6 +496,155 @@ class TestImages(functional.FunctionalTest): self.stop_servers() + def test_property_protections_with_policies(self): + # Enable property protection + self.api_server.property_protection_file = self.property_file_policies + self.api_server.property_protection_rule_format = 'policies' + self.start_servers(**self.__dict__.copy()) + + # Image list should be empty + path = self._url('/v2/images') + response = requests.get(path, headers=self._headers()) + self.assertEqual(200, response.status_code) + images = json.loads(response.text)['images'] + self.assertEqual(0, len(images)) + + ## Create an image for role member with extra props + # Raises 403 since user is not allowed to set 'foo' + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'member'}) + data = json.dumps({'name': 'image-1', 'foo': 'bar', + 'disk_format': 'aki', 'container_format': 'aki', + 'x_owner_foo': 'o_s_bar'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(403, response.status_code) + + ## Create an image for role member without 'foo' + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'member'}) + data = json.dumps({'name': 'image-1', 'disk_format': 'aki', + 'container_format': 'aki'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + + # Returned image entity + image = json.loads(response.text) + image_id = image['id'] + expected_image = { + 'status': 'queued', + 'name': 'image-1', + 'tags': [], + 'visibility': 'private', + 'self': '/v2/images/%s' % image_id, + 'protected': False, + 'file': '/v2/images/%s/file' % image_id, + 'min_disk': 0, + 'min_ram': 0, + 'schema': '/v2/schemas/image', + } + for key, value in expected_image.items(): + self.assertEqual(image[key], value, key) + + # Create an image for role spl_role with extra props + path = self._url('/v2/images') + headers = self._headers({'content-type': 'application/json', + 'X-Roles': 'spl_role, admin'}) + data = json.dumps({'name': 'image-1', + 'disk_format': 'aki', 'container_format': 'aki', + 'spl_creator_policy': 'creator_bar', + 'spl_default_policy': 'default_bar'}) + response = requests.post(path, headers=headers, data=data) + self.assertEqual(201, response.status_code) + image = json.loads(response.text) + image_id = image['id'] + self.assertEqual('creator_bar', image['spl_creator_policy']) + self.assertEqual('default_bar', image['spl_default_policy']) + + # Attempt to replace a property which is permitted + path = self._url('/v2/images/%s' % image_id) + media_type = 'application/openstack-images-v2.1-json-patch' + headers = self._headers({'content-type': media_type, + 'X-Roles': 'admin'}) + data = json.dumps([ + {'op': 'replace', 'path': '/spl_creator_policy', 'value': 'r'}, + ]) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(200, response.status_code, response.text) + + # Returned image entity should reflect the changes + image = json.loads(response.text) + + # 'spl_creator_policy' has update permission for admin + # hence the value has changed + self.assertEqual('r', image['spl_creator_policy']) + + # Attempt to replace a property which is forbidden + path = self._url('/v2/images/%s' % image_id) + media_type = 'application/openstack-images-v2.1-json-patch' + headers = self._headers({'content-type': media_type, + 'X-Roles': 'spl_role'}) + data = json.dumps([ + {'op': 'replace', 'path': '/spl_creator_policy', 'value': 'z'}, + ]) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(403, response.status_code, response.text) + + # Attempt to read properties + path = self._url('/v2/images/%s' % image_id) + headers = self._headers({'content-type': media_type, + 'X-Roles': 'random_role'}) + response = requests.get(path, headers=headers) + self.assertEqual(200, response.status_code) + + image = json.loads(response.text) + # 'random_role' is allowed read 'spl_default_policy'. + self.assertEqual(image['spl_default_policy'], 'default_bar') + # 'random_role' is forbidden to read 'spl_creator_policy'. + self.assertFalse('spl_creator_policy' in image) + + # Attempt to add and remove properties which are permitted + path = self._url('/v2/images/%s' % image_id) + media_type = 'application/openstack-images-v2.1-json-patch' + headers = self._headers({'content-type': media_type, + 'X-Roles': 'admin'}) + data = json.dumps([ + {'op': 'remove', 'path': '/spl_creator_policy'}, + ]) + response = requests.patch(path, headers=headers, data=data) + self.assertEqual(200, response.status_code, response.text) + + # Returned image entity should reflect the changes + image = json.loads(response.text) + + # 'spl_creator_policy' has delete permission for admin + # hence the value has been deleted + self.assertFalse('spl_creator_policy' in image) + + # Attempt to read a property that is permitted + path = self._url('/v2/images/%s' % image_id) + headers = self._headers({'content-type': media_type, + 'X-Roles': 'random_role'}) + response = requests.get(path, headers=headers) + self.assertEqual(200, response.status_code) + + # Returned image entity should reflect the changes + image = json.loads(response.text) + self.assertEqual(image['spl_default_policy'], 'default_bar') + + # Image Deletion should work + path = self._url('/v2/images/%s' % image_id) + response = requests.delete(path, headers=self._headers()) + self.assertEqual(204, response.status_code) + + # This image should be no longer be directly accessible + path = self._url('/v2/images/%s' % image_id) + response = requests.get(path, headers=self._headers()) + self.assertEqual(404, response.status_code) + + self.stop_servers() + def test_tag_lifecycle(self): # Create an image with a tag - duplicate should be ignored path = self._url('/v2/images') diff --git a/glance/tests/unit/api/test_property_protections.py b/glance/tests/unit/api/test_property_protections.py index f05fbd0bed..d39ff3ece4 100644 --- a/glance/tests/unit/api/test_property_protections.py +++ b/glance/tests/unit/api/test_property_protections.py @@ -15,6 +15,7 @@ # License for the specific language governing permissions and limitations # under the License. +from glance.api import policy from glance.api import property_protections from glance.common import exception from glance.common import property_utils @@ -48,7 +49,8 @@ class TestProtectedImageRepoProxy(utils.BaseTestCase): def setUp(self): super(TestProtectedImageRepoProxy, self).setUp() self.set_property_protections() - self.property_rules = property_utils.PropertyRules() + self.policy = policy.Enforcer() + self.property_rules = property_utils.PropertyRules(self.policy) self.image_factory = glance.domain.ImageFactory() extra_props = {'spl_create_prop': 'c', 'spl_read_prop': 'r', @@ -101,7 +103,8 @@ class TestProtectedImageProxy(utils.BaseTestCase): def setUp(self): super(TestProtectedImageProxy, self).setUp() self.set_property_protections() - self.property_rules = property_utils.PropertyRules() + self.policy = policy.Enforcer() + self.property_rules = property_utils.PropertyRules(self.policy) class ImageStub(object): def __init__(self, extra_prop): @@ -123,92 +126,93 @@ class TestExtraPropertiesProxy(utils.BaseTestCase): def setUp(self): super(TestExtraPropertiesProxy, self).setUp() self.set_property_protections() - self.property_rules = property_utils.PropertyRules() + self.policy = policy.Enforcer() + self.property_rules = property_utils.PropertyRules(self.policy) def test_read_extra_property_as_admin_role(self): extra_properties = {'foo': 'bar', 'ping': 'pong'} - roles = ['admin'] + context = glance.context.RequestContext(roles=['admin']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) test_result = extra_prop_proxy['foo'] self.assertEqual(test_result, 'bar') def test_read_extra_property_as_unpermitted_role(self): extra_properties = {'foo': 'bar', 'ping': 'pong'} - roles = ['unpermitted_role'] + context = glance.context.RequestContext(roles=['unpermitted_role']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo') def test_update_extra_property_as_permitted_role_after_read(self): extra_properties = {'foo': 'bar', 'ping': 'pong'} - roles = ['admin'] + context = glance.context.RequestContext(roles=['admin']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) extra_prop_proxy['foo'] = 'par' self.assertEqual(extra_prop_proxy['foo'], 'par') def test_update_extra_property_as_unpermitted_role_after_read(self): extra_properties = {'spl_read_prop': 'bar'} - roles = ['spl_role'] + context = glance.context.RequestContext(roles=['spl_role']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) self.assertRaises(exception.ReservedProperty, extra_prop_proxy.__setitem__, 'spl_read_prop', 'par') - def test_update_reserved_extra_property_as_permitted_role(self): + def test_update_reserved_extra_property(self): extra_properties = {'spl_create_prop': 'bar'} - roles = ['spl_role'] + context = glance.context.RequestContext(roles=['spl_role']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) self.assertRaises(exception.ReservedProperty, extra_prop_proxy.__setitem__, 'spl_create_prop', 'par') - def test_create_extra_property_as_permitted_role_after_read(self): + def test_create_extra_property_admin(self): extra_properties = {} - roles = ['admin'] + context = glance.context.RequestContext(roles=['admin']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) extra_prop_proxy['boo'] = 'doo' self.assertEqual(extra_prop_proxy['boo'], 'doo') - def test_create_reserved_extra_property_as_permitted_role(self): + def test_create_reserved_extra_property(self): extra_properties = {} - roles = ['spl_role'] + context = glance.context.RequestContext(roles=['spl_role']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) self.assertRaises(exception.ReservedProperty, extra_prop_proxy.__setitem__, 'boo', 'doo') def test_delete_extra_property_as_admin_role(self): extra_properties = {'foo': 'bar'} - roles = ['admin'] + context = glance.context.RequestContext(roles=['admin']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) del extra_prop_proxy['foo'] self.assertRaises(KeyError, extra_prop_proxy.__getitem__, 'foo') def test_delete_nonexistant_extra_property_as_admin_role(self): extra_properties = {} - roles = ['admin'] + context = glance.context.RequestContext(roles=['admin']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) self.assertRaises(KeyError, extra_prop_proxy.__delitem__, 'foo') - def test_delete_reserved_extra_property_as_permitted_role(self): + def test_delete_reserved_extra_property(self): extra_properties = {'spl_read_prop': 'r'} - roles = ['spl_role'] + context = glance.context.RequestContext(roles=['spl_role']) extra_prop_proxy = property_protections.ExtraPropertiesProxy( - roles, extra_properties, self.property_rules) + context, extra_properties, self.property_rules) # Ensure property has been created and can be read self.assertEqual(extra_prop_proxy['spl_read_prop'], 'r') self.assertRaises(exception.ReservedProperty, extra_prop_proxy.__delitem__, 'spl_read_prop') - def test_delete_nonexistant_extra_property_as_permitted_role(self): + def test_delete_nonexistant_extra_property(self): extra_properties = {} roles = ['spl_role'] extra_prop_proxy = property_protections.ExtraPropertiesProxy( @@ -221,7 +225,8 @@ class TestProtectedImageFactoryProxy(utils.BaseTestCase): def setUp(self): super(TestProtectedImageFactoryProxy, self).setUp() self.set_property_protections() - self.property_rules = property_utils.PropertyRules() + self.policy = policy.Enforcer() + self.property_rules = property_utils.PropertyRules(self.policy) self.factory = glance.domain.ImageFactory() def test_create_image_no_extra_prop(self): diff --git a/glance/tests/unit/common/test_property_utils.py b/glance/tests/unit/common/test_property_utils.py index 88e723ffdf..024f766553 100644 --- a/glance/tests/unit/common/test_property_utils.py +++ b/glance/tests/unit/common/test_property_utils.py @@ -15,8 +15,11 @@ import webob.exc +from glance.api import policy +from glance.common import exception from glance.common import property_utils -from glance.tests import utils +import glance.context +from glance.tests.unit import base CONFIG_SECTIONS = [ '^x_owner_.*', @@ -30,16 +33,20 @@ CONFIG_SECTIONS = [ ] -class TestPropertyRules(utils.BaseTestCase): +def create_context(policy, roles=[]): + return glance.context.RequestContext(roles=roles, + policy_enforcer=policy) + + +class TestPropertyRulesWithRoles(base.IsolatedUnitTest): def setUp(self): - super(TestPropertyRules, self).setUp() + super(TestPropertyRulesWithRoles, self).setUp() self.set_property_protections() + self.policy = policy.Enforcer() def tearDown(self): - for section in property_utils.CONFIG.sections(): - property_utils.CONFIG.remove_section(section) - super(TestPropertyRules, self).tearDown() + super(TestPropertyRulesWithRoles, self).tearDown() def test_is_property_protections_enabled_true(self): self.config(property_protection_file="property-protections.conf") @@ -51,7 +58,7 @@ class TestPropertyRules(utils.BaseTestCase): def test_property_protection_file_doesnt_exist(self): self.config(property_protection_file='fake-file.conf') - self.assertRaises(webob.exc.HTTPInternalServerError, + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, property_utils.PropertyRules) def test_property_protection_with_malformed_rule(self): @@ -60,7 +67,7 @@ class TestPropertyRules(utils.BaseTestCase): 'update': ['fake-role'], 'delete': ['fake-role']}} self.set_property_protection_rules(malformed_rules) - self.assertRaises(webob.exc.HTTPInternalServerError, + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, property_utils.PropertyRules) def test_property_protection_with_missing_operation(self): @@ -68,7 +75,7 @@ class TestPropertyRules(utils.BaseTestCase): 'update': ['fake-role'], 'delete': ['fake-role']}} self.set_property_protection_rules(rules_with_missing_operation) - self.assertRaises(webob.exc.HTTPInternalServerError, + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, property_utils.PropertyRules) def test_property_protection_with_misspelt_operation(self): @@ -77,7 +84,7 @@ class TestPropertyRules(utils.BaseTestCase): 'update': ['fake-role'], 'delete': ['fake-role']}} self.set_property_protection_rules(rules_with_misspelt_operation) - self.assertRaises(webob.exc.HTTPInternalServerError, + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, property_utils.PropertyRules) def test_property_protection_with_whitespace(self): @@ -92,81 +99,85 @@ class TestPropertyRules(utils.BaseTestCase): self.set_property_protection_rules(rules_whitespace) self.rules_checker = property_utils.PropertyRules() self.assertTrue(self.rules_checker.check_property_rules('test_prop_1', - 'read', ['member'])) + 'read', create_context(self.policy, ['member']))) self.assertTrue(self.rules_checker.check_property_rules('test_prop_1', - 'read', ['fake-role'])) + 'read', create_context(self.policy, ['fake-role']))) def test_check_property_rules_invalid_action(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertFalse(self.rules_checker.check_property_rules('test_prop', - 'hall', ['admin'])) + 'hall', create_context(self.policy, ['admin']))) def test_check_property_rules_read_permitted_admin_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules('test_prop', - 'read', ['admin'])) + 'read', create_context(self.policy, ['admin']))) def test_check_property_rules_read_permitted_specific_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules( - 'x_owner_prop', 'read', ['member'])) + 'x_owner_prop', 'read', + create_context(self.policy, ['member']))) def test_check_property_rules_read_unpermitted_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertFalse(self.rules_checker.check_property_rules('test_prop', - 'read', ['member'])) + 'read', create_context(self.policy, ['member']))) def test_check_property_rules_create_permitted_admin_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules('test_prop', - 'create', ['admin'])) + 'create', create_context(self.policy, ['admin']))) def test_check_property_rules_create_permitted_specific_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules( - 'x_owner_prop', 'create', ['member'])) + 'x_owner_prop', 'create', + create_context(self.policy, ['member']))) def test_check_property_rules_create_unpermitted_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertFalse(self.rules_checker.check_property_rules('test_prop', - 'create', ['member'])) + 'create', create_context(self.policy, ['member']))) def test_check_property_rules_update_permitted_admin_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules('test_prop', - 'update', ['admin'])) + 'update', create_context(self.policy, ['admin']))) def test_check_property_rules_update_permitted_specific_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules( - 'x_owner_prop', 'update', ['member'])) + 'x_owner_prop', 'update', + create_context(self.policy, ['member']))) def test_check_property_rules_update_unpermitted_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertFalse(self.rules_checker.check_property_rules('test_prop', - 'update', ['member'])) + 'update', create_context(self.policy, ['member']))) def test_check_property_rules_delete_permitted_admin_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules('test_prop', - 'delete', ['admin'])) + 'delete', create_context(self.policy, ['admin']))) def test_check_property_rules_delete_permitted_specific_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertTrue(self.rules_checker.check_property_rules( - 'x_owner_prop', 'delete', ['member'])) + 'x_owner_prop', 'delete', + create_context(self.policy, ['member']))) def test_check_property_rules_delete_unpermitted_role(self): - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertFalse(self.rules_checker.check_property_rules('test_prop', - 'delete', ['member'])) + 'delete', create_context(self.policy, ['member']))) def test_property_config_loaded_in_order(self): """ Verify the order of loaded config sections matches that from the configuration file """ - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) self.assertEqual(property_utils.CONFIG.sections(), CONFIG_SECTIONS) def test_property_rules_loaded_in_order(self): @@ -174,7 +185,77 @@ class TestPropertyRules(utils.BaseTestCase): Verify rules are iterable in the same order as read from the config file """ - self.rules_checker = property_utils.PropertyRules() + self.rules_checker = property_utils.PropertyRules(self.policy) for i in xrange(len(property_utils.CONFIG.sections())): self.assertEqual(property_utils.CONFIG.sections()[i], self.rules_checker.rules[i][0].pattern) + + +class TestPropertyRulesWithPolicies(base.IsolatedUnitTest): + + def setUp(self): + super(TestPropertyRulesWithPolicies, self).setUp() + self.set_property_protections(use_policies=True) + self.policy = policy.Enforcer() + self.rules_checker = property_utils.PropertyRules(self.policy) + + def tearDown(self): + super(TestPropertyRulesWithPolicies, self).tearDown() + + def test_check_property_rules_create_permitted_specific_policy(self): + self.assertTrue(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'create', + create_context(self.policy, ['spl_role']))) + + def test_check_property_rules_create_unpermitted_policy(self): + self.assertFalse(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'create', + create_context(self.policy, ['fake-role']))) + + def test_check_property_rules_read_permitted_specific_policy(self): + self.assertTrue(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'read', + create_context(self.policy, ['spl_role']))) + + def test_check_property_rules_read_unpermitted_policy(self): + self.assertFalse(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'read', + create_context(self.policy, ['fake-role']))) + + def test_check_property_rules_update_permitted_specific_policy(self): + self.assertTrue(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'update', + create_context(self.policy, ['admin']))) + + def test_check_property_rules_update_unpermitted_policy(self): + self.assertFalse(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'update', + create_context(self.policy, ['fake-role']))) + + def test_check_property_rules_delete_permitted_specific_policy(self): + self.assertTrue(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'delete', + create_context(self.policy, ['admin']))) + + def test_check_property_rules_delete_unpermitted_policy(self): + self.assertFalse(self.rules_checker.check_property_rules( + 'spl_creator_policy', 'delete', + create_context(self.policy, ['fake-role']))) + + def test_property_protection_with_malformed_rule(self): + malformed_rules = {'^[0-9)': {'create': ['fake-policy'], + 'read': ['fake-policy'], + 'update': ['fake-policy'], + 'delete': ['fake-policy']}} + self.set_property_protection_rules(malformed_rules) + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, + property_utils.PropertyRules) + + def test_property_protection_with_multiple_policies(self): + malformed_rules = {'^x_.*': {'create': ['fake-policy, another_pol'], + 'read': ['fake-policy'], + 'update': ['fake-policy'], + 'delete': ['fake-policy']}} + self.set_property_protection_rules(malformed_rules) + self.assertRaises(exception.InvalidPropertyProtectionConfiguration, + property_utils.PropertyRules) diff --git a/glance/tests/unit/v1/test_api.py b/glance/tests/unit/v1/test_api.py index 7e66f373db..fa44d4ba8d 100644 --- a/glance/tests/unit/v1/test_api.py +++ b/glance/tests/unit/v1/test_api.py @@ -2487,7 +2487,7 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): def test_prop_protection_with_create_and_permitted_role(self): """ - As admin role, create and image and verify permitted role 'member' can + As admin role, create an image and verify permitted role 'member' can create a protected property """ image_id = self._create_admin_image() @@ -2501,6 +2501,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): res_body = json.loads(output.body)['image'] self.assertEqual(res_body['properties']['x_owner_foo'], 'bar') + def test_prop_protection_with_permitted_policy_config(self): + """ + As admin role, create an image and verify permitted role 'member' can + create a protected property + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image() + another_request = unit_test_utils.get_fake_request( + path='/images/%s' % image_id, method='PUT') + headers = {'x-auth-token': 'user:tenant:admin', + 'x-image-meta-property-spl_create_prop_policy': 'bar'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + self.assertEqual(output.status_int, 200) + res_body = json.loads(output.body)['image'] + self.assertEqual(res_body['properties']['spl_create_prop_policy'], + 'bar') + def test_prop_protection_with_create_and_unpermitted_role(self): """ As admin role, create an image and verify unpermitted role @@ -2605,6 +2624,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): res_body = json.loads(output.body)['images'][0] self.assertEqual(res_body['properties']['x_owner_foo'], 'bar') + def test_prop_protection_with_detail_and_permitted_policy(self): + """ + As admin role, create an image with a protected property, and verify + permitted role 'member' can read that protected property via + /images/detail + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image( + {'x-image-meta-property-x_owner_foo': 'bar'}) + another_request = unit_test_utils.get_fake_request( + method='GET', path='/images/detail') + headers = {'x-auth-token': 'user:tenant:member'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + self.assertEqual(output.status_int, 200) + res_body = json.loads(output.body)['images'][0] + self.assertEqual(res_body['properties']['x_owner_foo'], 'bar') + def test_prop_protection_with_detail_and_unpermitted_role(self): """ As admin role, create an image with a protected property, and verify @@ -2624,6 +2662,26 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): self.assertNotIn('x-image-meta-property-x_owner_foo', res_body['properties']) + def test_prop_protection_with_detail_and_unpermitted_policy(self): + """ + As admin role, create an image with a protected property, and verify + permitted role 'fake_role' can *not* read that protected property via + /images/detail + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image( + {'x-image-meta-property-x_owner_foo': 'bar'}) + another_request = unit_test_utils.get_fake_request( + method='GET', path='/images/detail') + headers = {'x-auth-token': 'user:tenant:fake_role'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + self.assertEqual(output.status_int, 200) + res_body = json.loads(output.body)['images'][0] + self.assertNotIn('x-image-meta-property-x_owner_foo', + res_body['properties']) + def test_prop_protection_with_update_and_permitted_role(self): """ As admin role, create an image with protected property, and verify @@ -2641,6 +2699,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): res_body = json.loads(output.body)['image'] self.assertEqual(res_body['properties']['x_owner_foo'], 'baz') + def test_prop_protection_with_update_and_permitted_policy(self): + """ + As admin role, create an image with protected property, and verify + permitted role 'admin' can update that protected property + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image( + {'x-image-meta-property-spl_default_policy': 'bar'}) + another_request = unit_test_utils.get_fake_request( + path='/images/%s' % image_id, method='PUT') + headers = {'x-auth-token': 'user:tenant:admin', + 'x-image-meta-property-spl_default_policy': 'baz'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + res_body = json.loads(output.body)['image'] + self.assertEqual(res_body['properties']['spl_default_policy'], 'baz') + def test_prop_protection_with_update_and_unpermitted_role(self): """ As admin role, create an image with protected property, and verify @@ -2659,6 +2735,25 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): self.assertIn("Property '%s' is protected" % "x_owner_foo", output.body) + def test_prop_protection_with_update_and_unpermitted_policy(self): + """ + As admin role, create an image with protected property, and verify + unpermitted role 'fake_role' can *not* update that protected property + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image( + {'x-image-meta-property-x_owner_foo': 'bar'}) + another_request = unit_test_utils.get_fake_request( + path='/images/%s' % image_id, method='PUT') + headers = {'x-auth-token': 'user:tenant:fake_role', + 'x-image-meta-property-x_owner_foo': 'baz'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + self.assertEquals(output.status_int, webob.exc.HTTPForbidden.code) + self.assertIn("Property '%s' is protected" % + "x_owner_foo", output.body) + def test_prop_protection_update_without_read(self): """ Test protected property cannot be updated without read permission @@ -2711,6 +2806,24 @@ class TestAPIProtectedProps(base.IsolatedUnitTest): res_body = json.loads(output.body)['image'] self.assertEqual(res_body['properties'], {}) + def test_prop_protection_with_delete_and_permitted_policy(self): + """ + As admin role, create an image with protected property, and verify + permitted role 'member' can can delete that protected property + """ + self.set_property_protections(use_policies=True) + image_id = self._create_admin_image( + {'x-image-meta-property-x_owner_foo': 'bar'}) + another_request = unit_test_utils.get_fake_request( + path='/images/%s' % image_id, method='PUT') + headers = {'x-auth-token': 'user:tenant:member', + 'X-Glance-Registry-Purge-Props': 'True'} + for k, v in headers.iteritems(): + another_request.headers[k] = v + output = another_request.get_response(self.api) + res_body = json.loads(output.body)['image'] + self.assertEqual(res_body['properties'], {}) + def test_prop_protection_with_delete_and_unpermitted_read(self): """ Test protected property cannot be deleted without read permission diff --git a/glance/tests/unit/v2/test_images_resource.py b/glance/tests/unit/v2/test_images_resource.py index 4acb6103a2..1ddcb10b24 100644 --- a/glance/tests/unit/v2/test_images_resource.py +++ b/glance/tests/unit/v2/test_images_resource.py @@ -105,7 +105,7 @@ def _db_image_member_fixture(image_id, member_id, **kwargs): return obj -class TestImagesController(test_utils.BaseTestCase): +class TestImagesController(base.IsolatedUnitTest): def setUp(self): super(TestImagesController, self).setUp() @@ -607,6 +607,11 @@ class TestImagesController(test_utils.BaseTestCase): self.controller.update, request, UUID1, changes) def test_prop_protection_with_create_and_permitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -621,7 +626,66 @@ class TestImagesController(test_utils.BaseTestCase): created_image.image_id, changes) self.assertEqual(output.extra_properties['x_owner_foo'], 'bar') + def test_prop_protection_with_update_and_permitted_policy(self): + self.set_property_protections(use_policies=True) + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) + request = unit_test_utils.get_fake_request(roles=['spl_role']) + image = {'name': 'image-1'} + extra_props = {'spl_creator_policy': 'bar'} + created_image = self.controller.create(request, image=image, + extra_properties=extra_props, + tags=[]) + self.assertEqual(created_image.extra_properties['spl_creator_policy'], + 'bar') + + another_request = unit_test_utils.get_fake_request(roles=['spl_role']) + changes = [ + {'op': 'replace', 'path': ['spl_creator_policy'], 'value': 'par'}, + ] + self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, + another_request, created_image.image_id, changes) + another_request = unit_test_utils.get_fake_request(roles=['admin']) + output = self.controller.update(another_request, + created_image.image_id, changes) + self.assertEqual(output.extra_properties['spl_creator_policy'], + 'par') + + def test_prop_protection_with_create_with_patch_and_policy(self): + self.set_property_protections(use_policies=True) + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) + request = unit_test_utils.get_fake_request(roles=['spl_role', 'admin']) + image = {'name': 'image-1'} + extra_props = {'spl_default_policy': 'bar'} + created_image = self.controller.create(request, image=image, + extra_properties=extra_props, + tags=[]) + another_request = unit_test_utils.get_fake_request(roles=['fake_role']) + changes = [ + {'op': 'add', 'path': ['spl_creator_policy'], 'value': 'bar'}, + ] + self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, + another_request, created_image.image_id, changes) + + another_request = unit_test_utils.get_fake_request(roles=['spl_role']) + output = self.controller.update(another_request, + created_image.image_id, changes) + self.assertEqual(output.extra_properties['spl_creator_policy'], + 'bar') + def test_prop_protection_with_create_and_unpermitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -638,6 +702,11 @@ class TestImagesController(test_utils.BaseTestCase): created_image.image_id, changes) def test_prop_protection_with_show_and_permitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -650,6 +719,11 @@ class TestImagesController(test_utils.BaseTestCase): self.assertEqual(output.extra_properties['x_owner_foo'], 'bar') def test_prop_protection_with_show_and_unpermitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['member']) image = {'name': 'image-1'} @@ -663,6 +737,11 @@ class TestImagesController(test_utils.BaseTestCase): 'x_owner_foo') def test_prop_protection_with_update_and_permitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -679,6 +758,11 @@ class TestImagesController(test_utils.BaseTestCase): self.assertEqual(output.extra_properties['x_owner_foo'], 'baz') def test_prop_protection_with_update_and_unpermitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -694,6 +778,11 @@ class TestImagesController(test_utils.BaseTestCase): request, UUID1, changes) def test_prop_protection_with_delete_and_permitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} @@ -711,6 +800,11 @@ class TestImagesController(test_utils.BaseTestCase): 'x_owner_foo') def test_prop_protection_with_delete_and_unpermitted_role(self): + enforcer = glance.api.policy.Enforcer() + self.controller = glance.api.v2.images.ImagesController(self.db, + enforcer, + self.notifier, + self.store) self.set_property_protections() request = unit_test_utils.get_fake_request(roles=['admin']) image = {'name': 'image-1'} diff --git a/glance/tests/utils.py b/glance/tests/utils.py index e86afa967e..d1356ddafb 100644 --- a/glance/tests/utils.py +++ b/glance/tests/utils.py @@ -65,11 +65,19 @@ class BaseTestCase(testtools.TestCase): self.stubs.SmartUnsetAll() super(BaseTestCase, self).tearDown() - def set_property_protections(self): - self.property_file = self._copy_data_file('property-protections.conf', - self.test_dir) + def set_property_protections(self, use_policies=False): + self.unset_property_protections() + conf_file = "property-protections.conf" + if use_policies: + conf_file = "property-protections-policies.conf" + self.config(property_protection_rule_format="policies") + self.property_file = self._copy_data_file(conf_file, self.test_dir) self.config(property_protection_file=self.property_file) + def unset_property_protections(self): + for section in property_utils.CONFIG.sections(): + property_utils.CONFIG.remove_section(section) + def _copy_data_file(self, file_name, dst_dir): src_file_name = os.path.join('glance/tests/etc', file_name) shutil.copy(src_file_name, dst_dir)