Merge "Remove dead code of auth and policy layers"

This commit is contained in:
Zuul 2022-06-23 15:56:18 +00:00 committed by Gerrit Code Review
commit 61619c8f00
24 changed files with 172 additions and 3300 deletions

@ -1,925 +0,0 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2013 IBM Corp.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo_config import cfg
from oslo_log import log as logging
from glance.common import exception
import glance.domain.proxy
from glance.i18n import _
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def is_image_mutable(context, image):
"""Return True if the image is mutable in this context."""
if context.is_admin:
return True
if image.owner is None or context.owner is None:
return False
return image.owner == context.owner
def proxy_image(context, image, image_repo):
if is_image_mutable(context, image):
return ImageProxy(image, context)
else:
return ImmutableImageProxy(image, context)
def is_member_mutable(context, member):
"""Return True if the image is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return member.member_id == context.owner
def proxy_member(context, member):
if is_member_mutable(context, member):
return member
else:
return ImmutableMemberProxy(member)
def is_task_mutable(context, task):
"""Return True if the task is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return task.owner == context.owner
def is_task_stub_mutable(context, task_stub):
"""Return True if the task stub is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return task_stub.owner == context.owner
def proxy_task(context, task):
if is_task_mutable(context, task):
return task
else:
return ImmutableTaskProxy(task)
def proxy_task_stub(context, task_stub):
if is_task_stub_mutable(context, task_stub):
return task_stub
else:
return ImmutableTaskStubProxy(task_stub)
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context):
self.context = context
self.image_repo = image_repo
proxy_kwargs = {'context': self.context}
super(ImageRepoProxy, self).__init__(image_repo,
item_proxy_class=ImageProxy,
item_proxy_kwargs=proxy_kwargs)
def get(self, image_id):
image = self.image_repo.get(image_id)
return proxy_image(self.context, image, self.image_repo)
def list(self, *args, **kwargs):
images = self.image_repo.list(*args, **kwargs)
return [proxy_image(self.context, i, self.image_repo) for i in images]
def _validate_image_accepts_members(visibility):
if visibility != 'shared':
message = _("Only shared images have members.")
raise exception.Forbidden(message)
class ImageMemberRepoProxy(glance.domain.proxy.MemberRepo):
def __init__(self, member_repo, image, context):
self.member_repo = member_repo
self.image = image
self.context = context
proxy_kwargs = {'context': self.context}
super(ImageMemberRepoProxy, self).__init__(
image,
member_repo,
member_proxy_class=ImageMemberProxy,
member_proxy_kwargs=proxy_kwargs)
_validate_image_accepts_members(self.image.visibility)
def get(self, member_id):
if (self.context.is_admin or
self.context.owner in (self.image.owner, member_id)):
member = self.member_repo.get(member_id)
return proxy_member(self.context, member)
else:
message = _("You cannot get image member for %s")
raise exception.Forbidden(message % member_id)
def list(self, *args, **kwargs):
members = self.member_repo.list(*args, **kwargs)
if (self.context.is_admin or
self.context.owner == self.image.owner):
return [proxy_member(self.context, m) for m in members]
for member in members:
if member.member_id == self.context.owner:
return [proxy_member(self.context, member)]
message = _("You cannot get image member for %s")
raise exception.Forbidden(message % self.image.image_id)
def remove(self, image_member):
if (self.image.owner == self.context.owner or
self.context.is_admin):
self.member_repo.remove(image_member)
else:
message = _("You cannot delete image member for %s")
raise exception.Forbidden(message
% self.image.image_id)
def add(self, image_member):
if (self.image.owner == self.context.owner or
self.context.is_admin):
self.member_repo.add(image_member)
else:
message = _("You cannot add image member for %s")
raise exception.Forbidden(message
% self.image.image_id)
def save(self, image_member, from_state=None):
if (self.context.is_admin or
self.context.owner == image_member.member_id):
self.member_repo.save(image_member, from_state=from_state)
else:
message = _("You cannot update image member %s")
raise exception.Forbidden(message % image_member.member_id)
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, image_factory, context):
self.image_factory = image_factory
self.context = context
kwargs = {'context': self.context}
super(ImageFactoryProxy, self).__init__(image_factory,
proxy_class=ImageProxy,
proxy_kwargs=kwargs)
def new_image(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create images "
"owned by '%s'.")
raise exception.Forbidden(message % owner)
return super(ImageFactoryProxy, self).new_image(owner=owner, **kwargs)
class ImageMemberFactoryProxy(glance.domain.proxy.ImageMembershipFactory):
def __init__(self, image_member_factory, context):
self.image_member_factory = image_member_factory
self.context = context
kwargs = {'context': self.context}
super(ImageMemberFactoryProxy, self).__init__(
image_member_factory,
proxy_class=ImageMemberProxy,
proxy_kwargs=kwargs)
def new_image_member(self, image, member_id):
owner = image.owner
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create image members "
"for the image.")
raise exception.Forbidden(message)
_validate_image_accepts_members(image.visibility)
return self.image_member_factory.new_image_member(image, member_id)
def _immutable_attr(target, attr, proxy=None):
def get_attr(self):
value = getattr(getattr(self, target), attr)
if proxy is not None:
value = proxy(value)
return value
def forbidden(self, *args, **kwargs):
resource = getattr(self, 'resource_name', 'resource')
message = _("You are not permitted to modify '%(attr)s' on this "
"%(resource)s.")
raise exception.Forbidden(message % {'attr': attr,
'resource': resource})
return property(get_attr, forbidden, forbidden)
class ImmutableLocations(list):
def forbidden(self, *args, **kwargs):
message = _("You are not permitted to modify locations "
"for this image.")
raise exception.Forbidden(message)
def __deepcopy__(self, memo):
return ImmutableLocations(copy.deepcopy(list(self), memo))
append = forbidden
extend = forbidden
insert = forbidden
pop = forbidden
remove = forbidden
reverse = forbidden
sort = forbidden
__delitem__ = forbidden
__delslice__ = forbidden
__iadd__ = forbidden
__imul__ = forbidden
__setitem__ = forbidden
__setslice__ = forbidden
class ImmutableProperties(dict):
def forbidden_key(self, key, *args, **kwargs):
message = _("You are not permitted to modify '%s' on this image.")
raise exception.Forbidden(message % key)
def forbidden(self, *args, **kwargs):
message = _("You are not permitted to modify this image.")
raise exception.Forbidden(message)
__delitem__ = forbidden_key
__setitem__ = forbidden_key
pop = forbidden
popitem = forbidden
setdefault = forbidden
update = forbidden
class ImmutableTags(set):
def forbidden(self, *args, **kwargs):
message = _("You are not permitted to modify tags on this image.")
raise exception.Forbidden(message)
add = forbidden
clear = forbidden
difference_update = forbidden
intersection_update = forbidden
pop = forbidden
remove = forbidden
symmetric_difference_update = forbidden
update = forbidden
class ImmutableImageProxy(object):
def __init__(self, base, context):
self.base = base
self.context = context
self.resource_name = 'image'
name = _immutable_attr('base', 'name')
image_id = _immutable_attr('base', 'image_id')
status = _immutable_attr('base', 'status')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
visibility = _immutable_attr('base', 'visibility')
min_disk = _immutable_attr('base', 'min_disk')
min_ram = _immutable_attr('base', 'min_ram')
protected = _immutable_attr('base', 'protected')
os_hash_algo = _immutable_attr('base', 'os_hash_algo')
os_hash_value = _immutable_attr('base', 'os_hash_value')
os_hidden = _immutable_attr('base', 'os_hidden')
locations = _immutable_attr('base', 'locations', proxy=ImmutableLocations)
checksum = _immutable_attr('base', 'checksum')
owner = _immutable_attr('base', 'owner')
disk_format = _immutable_attr('base', 'disk_format')
container_format = _immutable_attr('base', 'container_format')
size = _immutable_attr('base', 'size')
virtual_size = _immutable_attr('base', 'virtual_size')
extra_properties = _immutable_attr('base', 'extra_properties',
proxy=ImmutableProperties)
member = _immutable_attr('base', 'member')
tags = _immutable_attr('base', 'tags', proxy=ImmutableTags)
def delete(self):
message = _("You are not permitted to delete this image.")
raise exception.Forbidden(message)
def get_data(self, *args, **kwargs):
return self.base.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
message = _("You are not permitted to upload data for this image.")
raise exception.Forbidden(message)
def deactivate(self, *args, **kwargs):
message = _("You are not permitted to deactivate this image.")
raise exception.Forbidden(message)
def reactivate(self, *args, **kwargs):
message = _("You are not permitted to reactivate this image.")
raise exception.Forbidden(message)
class ImmutableMemberProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'image member'
id = _immutable_attr('base', 'id')
image_id = _immutable_attr('base', 'image_id')
member_id = _immutable_attr('base', 'member_id')
status = _immutable_attr('base', 'status')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
class ImmutableTaskProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'task'
task_id = _immutable_attr('base', 'task_id')
type = _immutable_attr('base', 'type')
status = _immutable_attr('base', 'status')
owner = _immutable_attr('base', 'owner')
expires_at = _immutable_attr('base', 'expires_at')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
input = _immutable_attr('base', 'input')
message = _immutable_attr('base', 'message')
result = _immutable_attr('base', 'result')
def run(self, executor):
self.base.run(executor)
def begin_processing(self):
message = _("You are not permitted to set status on this task.")
raise exception.Forbidden(message)
def succeed(self, result):
message = _("You are not permitted to set status on this task.")
raise exception.Forbidden(message)
def fail(self, message):
message = _("You are not permitted to set status on this task.")
raise exception.Forbidden(message)
class ImmutableTaskStubProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'task stub'
task_id = _immutable_attr('base', 'task_id')
type = _immutable_attr('base', 'type')
status = _immutable_attr('base', 'status')
owner = _immutable_attr('base', 'owner')
expires_at = _immutable_attr('base', 'expires_at')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context):
self.image = image
self.context = context
super(ImageProxy, self).__init__(image)
class ImageMemberProxy(glance.domain.proxy.ImageMember):
def __init__(self, image_member, context):
self.image_member = image_member
self.context = context
super(ImageMemberProxy, self).__init__(image_member)
class TaskProxy(glance.domain.proxy.Task):
def __init__(self, task):
self.task = task
super(TaskProxy, self).__init__(task)
class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
def __init__(self, task_factory, context):
self.task_factory = task_factory
self.context = context
super(TaskFactoryProxy, self).__init__(
task_factory,
task_proxy_class=TaskProxy)
def new_task(self, **kwargs):
owner = kwargs.get('owner', self.context.owner)
# NOTE(nikhil): Unlike Images, Tasks are expected to have owner.
# We currently do not allow even admins to set the owner to None.
if owner is not None and (owner == self.context.owner
or self.context.is_admin):
return super(TaskFactoryProxy, self).new_task(**kwargs)
else:
message = _("You are not permitted to create this task with "
"owner as: %s")
raise exception.Forbidden(message % owner)
class TaskRepoProxy(glance.domain.proxy.TaskRepo):
def __init__(self, task_repo, context):
self.task_repo = task_repo
self.context = context
super(TaskRepoProxy, self).__init__(task_repo)
def get(self, task_id):
task = self.task_repo.get(task_id)
return proxy_task(self.context, task)
class TaskStubRepoProxy(glance.domain.proxy.TaskStubRepo):
def __init__(self, task_stub_repo, context):
self.task_stub_repo = task_stub_repo
self.context = context
super(TaskStubRepoProxy, self).__init__(task_stub_repo)
def list(self, *args, **kwargs):
task_stubs = self.task_stub_repo.list(*args, **kwargs)
return [proxy_task_stub(self.context, t) for t in task_stubs]
# Metadef Namespace classes
def is_namespace_mutable(context, namespace):
"""Return True if the namespace is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return namespace.owner == context.owner
def proxy_namespace(context, namespace):
if is_namespace_mutable(context, namespace):
return namespace
else:
return ImmutableMetadefNamespaceProxy(namespace)
class ImmutableMetadefNamespaceProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'namespace'
namespace_id = _immutable_attr('base', 'namespace_id')
namespace = _immutable_attr('base', 'namespace')
display_name = _immutable_attr('base', 'display_name')
description = _immutable_attr('base', 'description')
owner = _immutable_attr('base', 'owner')
visibility = _immutable_attr('base', 'visibility')
protected = _immutable_attr('base', 'protected')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
def delete(self):
message = _("You are not permitted to delete this namespace.")
raise exception.Forbidden(message)
def save(self):
message = _("You are not permitted to update this namespace.")
raise exception.Forbidden(message)
class MetadefNamespaceProxy(glance.domain.proxy.MetadefNamespace):
def __init__(self, namespace):
self.namespace_input = namespace
super(MetadefNamespaceProxy, self).__init__(namespace)
class MetadefNamespaceFactoryProxy(
glance.domain.proxy.MetadefNamespaceFactory):
def __init__(self, meta_namespace_factory, context):
self.meta_namespace_factory = meta_namespace_factory
self.context = context
super(MetadefNamespaceFactoryProxy, self).__init__(
meta_namespace_factory,
meta_namespace_proxy_class=MetadefNamespaceProxy)
def new_namespace(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create namespace "
"owned by '%s'")
raise exception.Forbidden(message % (owner))
return super(MetadefNamespaceFactoryProxy, self).new_namespace(
owner=owner, **kwargs)
class MetadefNamespaceRepoProxy(glance.domain.proxy.MetadefNamespaceRepo):
def __init__(self, namespace_repo, context):
self.namespace_repo = namespace_repo
self.context = context
super(MetadefNamespaceRepoProxy, self).__init__(namespace_repo)
def get(self, namespace):
namespace_obj = self.namespace_repo.get(namespace)
return proxy_namespace(self.context, namespace_obj)
def list(self, *args, **kwargs):
namespaces = self.namespace_repo.list(*args, **kwargs)
return [proxy_namespace(self.context, namespace) for
namespace in namespaces]
# Metadef Object classes
def is_object_mutable(context, object):
"""Return True if the object is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return object.namespace.owner == context.owner
def proxy_object(context, object):
if is_object_mutable(context, object):
return object
else:
return ImmutableMetadefObjectProxy(object)
class ImmutableMetadefObjectProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'object'
object_id = _immutable_attr('base', 'object_id')
name = _immutable_attr('base', 'name')
required = _immutable_attr('base', 'required')
description = _immutable_attr('base', 'description')
properties = _immutable_attr('base', 'properties')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
def delete(self):
message = _("You are not permitted to delete this object.")
raise exception.Forbidden(message)
def save(self):
message = _("You are not permitted to update this object.")
raise exception.Forbidden(message)
class MetadefObjectProxy(glance.domain.proxy.MetadefObject):
def __init__(self, meta_object):
self.meta_object = meta_object
super(MetadefObjectProxy, self).__init__(meta_object)
class MetadefObjectFactoryProxy(glance.domain.proxy.MetadefObjectFactory):
def __init__(self, meta_object_factory, context):
self.meta_object_factory = meta_object_factory
self.context = context
super(MetadefObjectFactoryProxy, self).__init__(
meta_object_factory,
meta_object_proxy_class=MetadefObjectProxy)
def new_object(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create object "
"owned by '%s'")
raise exception.Forbidden(message % (owner))
return super(MetadefObjectFactoryProxy, self).new_object(**kwargs)
class MetadefObjectRepoProxy(glance.domain.proxy.MetadefObjectRepo):
def __init__(self, object_repo, context):
self.object_repo = object_repo
self.context = context
super(MetadefObjectRepoProxy, self).__init__(object_repo)
def get(self, namespace, object_name):
meta_object = self.object_repo.get(namespace, object_name)
return proxy_object(self.context, meta_object)
def list(self, *args, **kwargs):
objects = self.object_repo.list(*args, **kwargs)
return [proxy_object(self.context, meta_object) for
meta_object in objects]
# Metadef ResourceType classes
def is_meta_resource_type_mutable(context, meta_resource_type):
"""Return True if the meta_resource_type is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
# (lakshmiS): resource type can exist without an association with
# namespace and resource type cannot be created/update/deleted directly(
# they have to be associated/de-associated from namespace)
if meta_resource_type.namespace:
return meta_resource_type.namespace.owner == context.owner
else:
return False
def proxy_meta_resource_type(context, meta_resource_type):
if is_meta_resource_type_mutable(context, meta_resource_type):
return meta_resource_type
else:
return ImmutableMetadefResourceTypeProxy(meta_resource_type)
class ImmutableMetadefResourceTypeProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'meta_resource_type'
namespace = _immutable_attr('base', 'namespace')
name = _immutable_attr('base', 'name')
prefix = _immutable_attr('base', 'prefix')
properties_target = _immutable_attr('base', 'properties_target')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
def delete(self):
message = _("You are not permitted to delete this meta_resource_type.")
raise exception.Forbidden(message)
class MetadefResourceTypeProxy(glance.domain.proxy.MetadefResourceType):
def __init__(self, meta_resource_type):
self.meta_resource_type = meta_resource_type
super(MetadefResourceTypeProxy, self).__init__(meta_resource_type)
class MetadefResourceTypeFactoryProxy(
glance.domain.proxy.MetadefResourceTypeFactory):
def __init__(self, resource_type_factory, context):
self.meta_resource_type_factory = resource_type_factory
self.context = context
super(MetadefResourceTypeFactoryProxy, self).__init__(
resource_type_factory,
resource_type_proxy_class=MetadefResourceTypeProxy)
def new_resource_type(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create resource_type "
"owned by '%s'")
raise exception.Forbidden(message % (owner))
return super(MetadefResourceTypeFactoryProxy, self).new_resource_type(
**kwargs)
class MetadefResourceTypeRepoProxy(
glance.domain.proxy.MetadefResourceTypeRepo):
def __init__(self, meta_resource_type_repo, context):
self.meta_resource_type_repo = meta_resource_type_repo
self.context = context
super(MetadefResourceTypeRepoProxy, self).__init__(
meta_resource_type_repo)
def list(self, *args, **kwargs):
meta_resource_types = self.meta_resource_type_repo.list(
*args, **kwargs)
return [proxy_meta_resource_type(self.context, meta_resource_type) for
meta_resource_type in meta_resource_types]
def get(self, *args, **kwargs):
meta_resource_type = self.meta_resource_type_repo.get(*args, **kwargs)
return proxy_meta_resource_type(self.context, meta_resource_type)
# Metadef namespace properties classes
def is_namespace_property_mutable(context, namespace_property):
"""Return True if the object is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return namespace_property.namespace.owner == context.owner
def proxy_namespace_property(context, namespace_property):
if is_namespace_property_mutable(context, namespace_property):
return namespace_property
else:
return ImmutableMetadefPropertyProxy(namespace_property)
class ImmutableMetadefPropertyProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'namespace_property'
property_id = _immutable_attr('base', 'property_id')
name = _immutable_attr('base', 'name')
schema = _immutable_attr('base', 'schema')
def delete(self):
message = _("You are not permitted to delete this property.")
raise exception.Forbidden(message)
def save(self):
message = _("You are not permitted to update this property.")
raise exception.Forbidden(message)
class MetadefPropertyProxy(glance.domain.proxy.MetadefProperty):
def __init__(self, namespace_property):
self.meta_object = namespace_property
super(MetadefPropertyProxy, self).__init__(namespace_property)
class MetadefPropertyFactoryProxy(glance.domain.proxy.MetadefPropertyFactory):
def __init__(self, namespace_property_factory, context):
self.meta_object_factory = namespace_property_factory
self.context = context
super(MetadefPropertyFactoryProxy, self).__init__(
namespace_property_factory,
property_proxy_class=MetadefPropertyProxy)
def new_namespace_property(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None or owner != self.context.owner:
message = _("You are not permitted to create property "
"owned by '%s'")
raise exception.Forbidden(message % (owner))
return super(MetadefPropertyFactoryProxy, self).new_namespace_property(
**kwargs)
class MetadefPropertyRepoProxy(glance.domain.proxy.MetadefPropertyRepo):
def __init__(self, namespace_property_repo, context):
self.namespace_property_repo = namespace_property_repo
self.context = context
super(MetadefPropertyRepoProxy, self).__init__(namespace_property_repo)
def get(self, namespace, object_name):
namespace_property = self.namespace_property_repo.get(namespace,
object_name)
return proxy_namespace_property(self.context, namespace_property)
def list(self, *args, **kwargs):
namespace_properties = self.namespace_property_repo.list(
*args, **kwargs)
return [proxy_namespace_property(self.context, namespace_property) for
namespace_property in namespace_properties]
# Metadef Tag classes
def is_tag_mutable(context, tag):
"""Return True if the tag is mutable in this context."""
if context.is_admin:
return True
if context.owner is None:
return False
return tag.namespace.owner == context.owner
def proxy_tag(context, tag):
if is_tag_mutable(context, tag):
return tag
else:
return ImmutableMetadefTagProxy(tag)
class ImmutableMetadefTagProxy(object):
def __init__(self, base):
self.base = base
self.resource_name = 'tag'
tag_id = _immutable_attr('base', 'tag_id')
name = _immutable_attr('base', 'name')
created_at = _immutable_attr('base', 'created_at')
updated_at = _immutable_attr('base', 'updated_at')
def delete(self):
message = _("You are not permitted to delete this tag.")
raise exception.Forbidden(message)
def save(self):
message = _("You are not permitted to update this tag.")
raise exception.Forbidden(message)
class MetadefTagProxy(glance.domain.proxy.MetadefTag):
pass
class MetadefTagFactoryProxy(glance.domain.proxy.MetadefTagFactory):
def __init__(self, meta_tag_factory, context):
self.meta_tag_factory = meta_tag_factory
self.context = context
super(MetadefTagFactoryProxy, self).__init__(
meta_tag_factory,
meta_tag_proxy_class=MetadefTagProxy)
def new_tag(self, **kwargs):
owner = kwargs.pop('owner', self.context.owner)
if not self.context.is_admin:
if owner is None:
message = _("Owner must be specified to create a tag.")
raise exception.Forbidden(message)
elif owner != self.context.owner:
message = _("You are not permitted to create a tag"
" in the namespace owned by '%s'")
raise exception.Forbidden(message % (owner))
return super(MetadefTagFactoryProxy, self).new_tag(**kwargs)
class MetadefTagRepoProxy(glance.domain.proxy.MetadefTagRepo):
def __init__(self, tag_repo, context):
self.tag_repo = tag_repo
self.context = context
super(MetadefTagRepoProxy, self).__init__(tag_repo)
def get(self, namespace, tag_name):
meta_tag = self.tag_repo.get(namespace, tag_name)
return proxy_tag(self.context, meta_tag)
def list(self, *args, **kwargs):
tags = self.tag_repo.list(*args, **kwargs)
return [proxy_tag(self.context, meta_tag) for
meta_tag in tags]

@ -17,7 +17,6 @@
"""Policy Engine For Glance"""
from collections import abc
import copy
from oslo_config import cfg
from oslo_log import log as logging
@ -25,8 +24,8 @@ from oslo_policy import opts
from oslo_policy import policy
from glance.common import exception
import glance.domain.proxy
from glance.i18n import _, _LW
from glance.domain import proxy
from glance.i18n import _LW
from glance import policies
@ -127,87 +126,6 @@ def get_enforcer():
return _ENFORCER
class ImageRepoProxy(glance.domain.proxy.Repo):
def __init__(self, image_repo, context, policy):
self.context = context
self.policy = policy
self.image_repo = image_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(ImageRepoProxy, self).__init__(image_repo,
item_proxy_class=ImageProxy,
item_proxy_kwargs=proxy_kwargs)
def get(self, image_id):
image = super(ImageRepoProxy, self).get(image_id)
target = self._build_image_target(image)
try:
self.policy.enforce(self.context, 'get_image', target)
except exception.Forbidden:
# NOTE (abhishekk): Returning 404 Not Found as the
# image is outside of this user's project
msg = _("No image found with ID %s") % image_id
raise exception.NotFound(msg)
return image
def _build_image_target(self, image):
"""Build a policy enforcement target for an image.
:param image: An instance of `glance.domain.Image`
:returns: a dictionary representing the image for policy enforcement
"""
target = dict(ImageTarget(image))
target['project_id'] = image.owner
# We do this so that members of the image can pass the policy for
# getting an image shared with their project. An alternative would be
# to update the image owner, or project_id, to match the member ID,
# tricking the policy enforcer into thinking image members are actually
# image owners. But, that feels less clear without understanding the
# code that makes that assumption, especially for operators reading
# check strings. Using this approach forces the check_str to be more
# descriptive.
members = self.image_repo.db_api.image_member_find(
self.context, image_id=image.image_id)
# FIXME(lbragstad): Remove this if statement if/when oslo.policy
# supports lists of target attributes via substitution, allowing us to
# do something like:
#
# target['member_ids'] = set(m['member'] for m in members)
for member in members:
if member['member'] == self.context.project_id:
target['member_id'] = member['member']
break
return target
def list(self, *args, **kwargs):
# FIXME(lbragstad): This is a hack to get policy to pass because we
# don't have a reasonable target to use for all images. We set the
# target project_id to the context project_id, which effectively
# ensures the context project_id matches itself in policy enforcement.
#
# A more accurate and cleaner way to implement this, and filtering,
# would be to query all images from the database, build a target for
# each image, and then iterate over each image and call policy
# enforcement. If the user passes policy enforcement, append the image
# to the list of filtered images. If not, the image should be removed
# from the list of images returned to the user.
target = {'project_id': self.context.project_id}
self.policy.enforce(self.context, 'get_images', target)
return super(ImageRepoProxy, self).list(*args, **kwargs)
def save(self, image, from_state=None):
target = dict(image.target)
self.policy.enforce(self.context, 'modify_image', target)
return super(ImageRepoProxy, self).save(image, from_state=from_state)
def add(self, image):
target = dict(image.target)
self.policy.enforce(self.context, 'add_image', target)
return super(ImageRepoProxy, self).add(image)
def _enforce_image_visibility(policy, context, visibility, target):
if visibility == 'public':
policy.enforce(context, 'publicize_image', target)
@ -215,292 +133,6 @@ def _enforce_image_visibility(policy, context, visibility, target):
policy.enforce(context, 'communitize_image', target)
class ImageProxy(glance.domain.proxy.Image):
def __init__(self, image, context, policy):
self.image = image
self.target = ImageTarget(image)
self.context = context
self.policy = policy
super(ImageProxy, self).__init__(image)
@property
def visibility(self):
return self.image.visibility
@visibility.setter
def visibility(self, value):
target = dict(self.target)
_enforce_image_visibility(self.policy, self.context, value, target)
self.image.visibility = value
@property
def locations(self):
return ImageLocationsProxy(self.image.locations,
self.context, self.policy)
@locations.setter
def locations(self, value):
if not isinstance(value, (list, ImageLocationsProxy)):
raise exception.Invalid(_('Invalid locations: %s') % value)
self.policy.enforce(self.context, 'set_image_location', self.target)
new_locations = list(value)
if (set([loc['url'] for loc in self.image.locations]) -
set([loc['url'] for loc in new_locations])):
self.policy.enforce(self.context, 'delete_image_location',
self.target)
self.image.locations = new_locations
def delete(self):
target = dict(self.target)
self.policy.enforce(self.context, 'delete_image', target)
return self.image.delete()
def deactivate(self):
LOG.debug('Attempting deactivate')
target = dict(ImageTarget(self.image))
self.policy.enforce(self.context, 'deactivate', target=target)
LOG.debug('Deactivate allowed, continue')
self.image.deactivate()
def reactivate(self):
LOG.debug('Attempting reactivate')
target = dict(ImageTarget(self.image))
self.policy.enforce(self.context, 'reactivate', target=target)
LOG.debug('Reactivate allowed, continue')
self.image.reactivate()
def get_data(self, *args, **kwargs):
target = dict(ImageTarget(self.image))
self.policy.enforce(self.context, 'download_image', target)
return self.image.get_data(*args, **kwargs)
def set_data(self, *args, **kwargs):
return self.image.set_data(*args, **kwargs)
class ImageMemberProxy(glance.domain.proxy.ImageMember):
def __init__(self, image_member, context, policy):
super(ImageMemberProxy, self).__init__(image_member)
self.image_member = image_member
self.context = context
self.policy = policy
class ImageFactoryProxy(glance.domain.proxy.ImageFactory):
def __init__(self, image_factory, context, policy):
self.image_factory = image_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(ImageFactoryProxy, self).__init__(image_factory,
proxy_class=ImageProxy,
proxy_kwargs=proxy_kwargs)
def new_image(self, **kwargs):
# If we reversed the order of this method and did the policy
# enforcement on the way out instead of before we build the image
# target reference, we could use the actual image as a target instead
# of building a faux target with one attribute.
target = {}
target['project_id'] = kwargs.get('owner', None)
_enforce_image_visibility(self.policy, self.context,
kwargs.get('visibility'), target)
return super(ImageFactoryProxy, self).new_image(**kwargs)
class ImageMemberFactoryProxy(glance.domain.proxy.ImageMembershipFactory):
def __init__(self, member_factory, context, policy):
super(ImageMemberFactoryProxy, self).__init__(
member_factory,
proxy_class=ImageMemberProxy,
proxy_kwargs={'context': context, 'policy': policy})
class ImageMemberRepoProxy(glance.domain.proxy.Repo):
def __init__(self, member_repo, image, context, policy):
self.member_repo = member_repo
self.image = image
self.target = ImageTarget(image)
self.context = context
self.policy = policy
def add(self, member):
target = dict(self.target)
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, 'add_member', target)
self.member_repo.add(member)
def get(self, member_id):
# NOTE(lbragstad): We set the project_id of the target to be the
# project_id of the context object, which is effectively a no-op
# because we're checking the context.project_id matches the
# context.project_id. This is a bandaid to allow project-members and
# project-readers to view shared images owned by their project, or to
# view images shared with them by another project. Glance's database
# layer filters the images by ownership and membership, based on the
# context object and administrative checks. If or when that code is
# pulled into a higher level and if we have a list of members for an
# image, we can write a more accurate target.
target = dict(self.target)
# We can't set the project_id as the image project_id because that
# wouldn't allow image members to pass the policy check. We need the
# list of image members to build an accurate target.
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, 'get_member', target)
return self.member_repo.get(member_id)
def save(self, member, from_state=None):
target = dict(self.target)
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, 'modify_member', target)
self.member_repo.save(member, from_state=from_state)
def list(self, *args, **kwargs):
target = dict(self.target)
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, 'get_members', target)
return self.member_repo.list(*args, **kwargs)
def remove(self, member):
target = dict(self.target)
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, 'delete_member', target)
self.member_repo.remove(member)
class ImageLocationsProxy(object):
__hash__ = None
def __init__(self, locations, context, policy):
self.locations = locations
self.context = context
self.policy = policy
def __copy__(self):
return type(self)(self.locations, self.context, self.policy)
def __deepcopy__(self, memo):
# NOTE(zhiyan): Only copy location entries, others can be reused.
return type(self)(copy.deepcopy(self.locations, memo),
self.context, self.policy)
def _get_checker(action, func_name):
def _checker(self, *args, **kwargs):
target = {}
if self.context.project_id:
target['project_id'] = self.context.project_id
self.policy.enforce(self.context, action, target)
method = getattr(self.locations, func_name)
return method(*args, **kwargs)
return _checker
count = _get_checker('get_image_location', 'count')
index = _get_checker('get_image_location', 'index')
__getitem__ = _get_checker('get_image_location', '__getitem__')
__contains__ = _get_checker('get_image_location', '__contains__')
__len__ = _get_checker('get_image_location', '__len__')
__cast = _get_checker('get_image_location', '__cast')
__cmp__ = _get_checker('get_image_location', '__cmp__')
__iter__ = _get_checker('get_image_location', '__iter__')
append = _get_checker('set_image_location', 'append')
extend = _get_checker('set_image_location', 'extend')
insert = _get_checker('set_image_location', 'insert')
reverse = _get_checker('set_image_location', 'reverse')
__iadd__ = _get_checker('set_image_location', '__iadd__')
__setitem__ = _get_checker('set_image_location', '__setitem__')
pop = _get_checker('delete_image_location', 'pop')
remove = _get_checker('delete_image_location', 'remove')
__delitem__ = _get_checker('delete_image_location', '__delitem__')
__delslice__ = _get_checker('delete_image_location', '__delslice__')
del _get_checker
class TaskProxy(glance.domain.proxy.Task):
def __init__(self, task, context, policy):
self.task = task
self.context = context
self.policy = policy
super(TaskProxy, self).__init__(task)
class TaskStubProxy(glance.domain.proxy.TaskStub):
def __init__(self, task_stub, context, policy):
self.task_stub = task_stub
self.context = context
self.policy = policy
super(TaskStubProxy, self).__init__(task_stub)
class TaskRepoProxy(glance.domain.proxy.TaskRepo):
def __init__(self, task_repo, context, task_policy):
self.context = context
self.policy = task_policy
self.task_repo = task_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(TaskRepoProxy,
self).__init__(task_repo,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs)
# TODO(lbragstad): Move this to the tasks api itself
def get(self, task_id):
self.policy.enforce(self.context, 'get_task', {})
return super(TaskRepoProxy, self).get(task_id)
# TODO(lbragstad): Move this to the tasks api itself
def add(self, task):
self.policy.enforce(self.context, 'add_task', {})
super(TaskRepoProxy, self).add(task)
# TODO(lbragstad): Remove this after Xena
def save(self, task):
self.policy.enforce(self.context, 'modify_task', {})
super(TaskRepoProxy, self).save(task)
class TaskStubRepoProxy(glance.domain.proxy.TaskStubRepo):
def __init__(self, task_stub_repo, context, task_policy):
self.context = context
self.policy = task_policy
self.task_stub_repo = task_stub_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(TaskStubRepoProxy,
self).__init__(task_stub_repo,
task_stub_proxy_class=TaskStubProxy,
task_stub_proxy_kwargs=proxy_kwargs)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_tasks', {})
return super(TaskStubRepoProxy, self).list(*args, **kwargs)
class TaskFactoryProxy(glance.domain.proxy.TaskFactory):
def __init__(self, task_factory, context, policy):
self.task_factory = task_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(TaskFactoryProxy, self).__init__(
task_factory,
task_proxy_class=TaskProxy,
task_proxy_kwargs=proxy_kwargs)
class ImageTarget(abc.Mapping):
SENTINEL = object()
@ -510,7 +142,7 @@ class ImageTarget(abc.Mapping):
:param target: Object being targeted
"""
self.target = target
self._target_keys = [k for k in dir(ImageProxy)
self._target_keys = [k for k in dir(proxy.Image)
if not k.startswith('__')
# NOTE(lbragstad): The locations attributes is an
# instance of ImageLocationsProxy, which isn't
@ -521,7 +153,7 @@ class ImageTarget(abc.Mapping):
# information. Omitting for not until that is a
# necessity.
if not k == 'locations'
if not callable(getattr(ImageProxy, k))]
if not callable(getattr(proxy.Image, k))]
def __getitem__(self, key):
"""Return the value of 'key' from the target.
@ -568,317 +200,3 @@ class ImageTarget(abc.Mapping):
}
return transforms.get(key, key)
# Metadef Namespace classes
class MetadefNamespaceProxy(glance.domain.proxy.MetadefNamespace):
def __init__(self, namespace, context, policy):
self.namespace_input = namespace
self.context = context
self.policy = policy
super(MetadefNamespaceProxy, self).__init__(namespace)
def delete(self):
self.policy.enforce(self.context, 'delete_metadef_namespace', {})
return super(MetadefNamespaceProxy, self).delete()
class MetadefNamespaceRepoProxy(glance.domain.proxy.MetadefNamespaceRepo):
def __init__(self, namespace_repo, context, namespace_policy):
self.context = context
self.policy = namespace_policy
self.namespace_repo = namespace_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefNamespaceRepoProxy,
self).__init__(namespace_repo,
namespace_proxy_class=MetadefNamespaceProxy,
namespace_proxy_kwargs=proxy_kwargs)
def get(self, namespace):
self.policy.enforce(self.context, 'get_metadef_namespace', {})
return super(MetadefNamespaceRepoProxy, self).get(namespace)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_metadef_namespaces', {})
return super(MetadefNamespaceRepoProxy, self).list(*args, **kwargs)
def save(self, namespace):
self.policy.enforce(self.context, 'modify_metadef_namespace', {})
return super(MetadefNamespaceRepoProxy, self).save(namespace)
def add(self, namespace):
self.policy.enforce(self.context, 'add_metadef_namespace', {})
return super(MetadefNamespaceRepoProxy, self).add(namespace)
def remove(self, namespace):
self.policy.enforce(self.context, 'delete_metadef_namespace', {})
return super(MetadefNamespaceRepoProxy, self).remove(namespace)
def remove_tags(self, namespace):
self.policy.enforce(self.context, 'delete_metadef_tags', {})
return super(MetadefNamespaceRepoProxy, self).remove_tags(namespace)
class MetadefNamespaceFactoryProxy(
glance.domain.proxy.MetadefNamespaceFactory):
def __init__(self, meta_namespace_factory, context, policy):
self.meta_namespace_factory = meta_namespace_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefNamespaceFactoryProxy, self).__init__(
meta_namespace_factory,
meta_namespace_proxy_class=MetadefNamespaceProxy,
meta_namespace_proxy_kwargs=proxy_kwargs)
# Metadef Object classes
class MetadefObjectProxy(glance.domain.proxy.MetadefObject):
def __init__(self, meta_object, context, policy):
self.meta_object = meta_object
self.context = context
self.policy = policy
super(MetadefObjectProxy, self).__init__(meta_object)
def delete(self):
self.policy.enforce(self.context, 'delete_metadef_object', {})
return super(MetadefObjectProxy, self).delete()
class MetadefObjectRepoProxy(glance.domain.proxy.MetadefObjectRepo):
def __init__(self, object_repo, context, object_policy):
self.context = context
self.policy = object_policy
self.object_repo = object_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefObjectRepoProxy,
self).__init__(object_repo,
object_proxy_class=MetadefObjectProxy,
object_proxy_kwargs=proxy_kwargs)
def get(self, namespace, object_name):
self.policy.enforce(self.context, 'get_metadef_object', {})
return super(MetadefObjectRepoProxy, self).get(namespace, object_name)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_metadef_objects', {})
return super(MetadefObjectRepoProxy, self).list(*args, **kwargs)
def save(self, meta_object):
self.policy.enforce(self.context, 'modify_metadef_object', {})
return super(MetadefObjectRepoProxy, self).save(meta_object)
def add(self, meta_object):
self.policy.enforce(self.context, 'add_metadef_object', {})
return super(MetadefObjectRepoProxy, self).add(meta_object)
def remove(self, meta_object):
self.policy.enforce(self.context, 'delete_metadef_object', {})
return super(MetadefObjectRepoProxy, self).remove(meta_object)
class MetadefObjectFactoryProxy(glance.domain.proxy.MetadefObjectFactory):
def __init__(self, meta_object_factory, context, policy):
self.meta_object_factory = meta_object_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefObjectFactoryProxy, self).__init__(
meta_object_factory,
meta_object_proxy_class=MetadefObjectProxy,
meta_object_proxy_kwargs=proxy_kwargs)
# Metadef ResourceType classes
class MetadefResourceTypeProxy(glance.domain.proxy.MetadefResourceType):
def __init__(self, meta_resource_type, context, policy):
self.meta_resource_type = meta_resource_type
self.context = context
self.policy = policy
super(MetadefResourceTypeProxy, self).__init__(meta_resource_type)
def delete(self):
self.policy.enforce(self.context,
'remove_metadef_resource_type_association', {})
return super(MetadefResourceTypeProxy, self).delete()
class MetadefResourceTypeRepoProxy(
glance.domain.proxy.MetadefResourceTypeRepo):
def __init__(self, resource_type_repo, context, resource_type_policy):
self.context = context
self.policy = resource_type_policy
self.resource_type_repo = resource_type_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefResourceTypeRepoProxy, self).__init__(
resource_type_repo,
resource_type_proxy_class=MetadefResourceTypeProxy,
resource_type_proxy_kwargs=proxy_kwargs)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'list_metadef_resource_types', {})
return super(MetadefResourceTypeRepoProxy, self).list(*args, **kwargs)
def get(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_metadef_resource_type', {})
return super(MetadefResourceTypeRepoProxy, self).get(*args, **kwargs)
def add(self, resource_type):
self.policy.enforce(self.context,
'add_metadef_resource_type_association', {})
return super(MetadefResourceTypeRepoProxy, self).add(resource_type)
def remove(self, *args, **kwargs):
self.policy.enforce(self.context,
'remove_metadef_resource_type_association', {})
return super(MetadefResourceTypeRepoProxy,
self).remove(*args, **kwargs)
class MetadefResourceTypeFactoryProxy(
glance.domain.proxy.MetadefResourceTypeFactory):
def __init__(self, resource_type_factory, context, policy):
self.resource_type_factory = resource_type_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefResourceTypeFactoryProxy, self).__init__(
resource_type_factory,
resource_type_proxy_class=MetadefResourceTypeProxy,
resource_type_proxy_kwargs=proxy_kwargs)
# Metadef namespace properties classes
class MetadefPropertyProxy(glance.domain.proxy.MetadefProperty):
def __init__(self, namespace_property, context, policy):
self.namespace_property = namespace_property
self.context = context
self.policy = policy
super(MetadefPropertyProxy, self).__init__(namespace_property)
def delete(self):
self.policy.enforce(self.context, 'remove_metadef_property', {})
return super(MetadefPropertyProxy, self).delete()
class MetadefPropertyRepoProxy(glance.domain.proxy.MetadefPropertyRepo):
def __init__(self, property_repo, context, object_policy):
self.context = context
self.policy = object_policy
self.property_repo = property_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefPropertyRepoProxy, self).__init__(
property_repo,
property_proxy_class=MetadefPropertyProxy,
property_proxy_kwargs=proxy_kwargs)
def get(self, namespace, property_name):
self.policy.enforce(self.context, 'get_metadef_property', {})
return super(MetadefPropertyRepoProxy, self).get(namespace,
property_name)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_metadef_properties', {})
return super(MetadefPropertyRepoProxy, self).list(
*args, **kwargs)
def save(self, namespace_property):
self.policy.enforce(self.context, 'modify_metadef_property', {})
return super(MetadefPropertyRepoProxy, self).save(
namespace_property)
def add(self, namespace_property):
self.policy.enforce(self.context, 'add_metadef_property', {})
return super(MetadefPropertyRepoProxy, self).add(
namespace_property)
def remove(self, *args, **kwargs):
self.policy.enforce(self.context, 'remove_metadef_property', {})
return super(MetadefPropertyRepoProxy, self).remove(*args, **kwargs)
class MetadefPropertyFactoryProxy(glance.domain.proxy.MetadefPropertyFactory):
def __init__(self, namespace_property_factory, context, policy):
self.namespace_property_factory = namespace_property_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefPropertyFactoryProxy, self).__init__(
namespace_property_factory,
property_proxy_class=MetadefPropertyProxy,
property_proxy_kwargs=proxy_kwargs)
# Metadef Tag classes
class MetadefTagProxy(glance.domain.proxy.MetadefTag):
def __init__(self, meta_tag, context, policy):
self.context = context
self.policy = policy
super(MetadefTagProxy, self).__init__(meta_tag)
def delete(self):
self.policy.enforce(self.context, 'delete_metadef_tag', {})
return super(MetadefTagProxy, self).delete()
class MetadefTagRepoProxy(glance.domain.proxy.MetadefTagRepo):
def __init__(self, tag_repo, context, tag_policy):
self.context = context
self.policy = tag_policy
self.tag_repo = tag_repo
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefTagRepoProxy,
self).__init__(tag_repo,
tag_proxy_class=MetadefTagProxy,
tag_proxy_kwargs=proxy_kwargs)
def get(self, namespace, tag_name):
self.policy.enforce(self.context, 'get_metadef_tag', {})
return super(MetadefTagRepoProxy, self).get(namespace, tag_name)
def list(self, *args, **kwargs):
self.policy.enforce(self.context, 'get_metadef_tags', {})
return super(MetadefTagRepoProxy, self).list(*args, **kwargs)
def save(self, meta_tag):
self.policy.enforce(self.context, 'modify_metadef_tag', {})
return super(MetadefTagRepoProxy, self).save(meta_tag)
def add(self, meta_tag):
self.policy.enforce(self.context, 'add_metadef_tag', {})
return super(MetadefTagRepoProxy, self).add(meta_tag)
def add_tags(self, meta_tags):
self.policy.enforce(self.context, 'add_metadef_tags', {})
return super(MetadefTagRepoProxy, self).add_tags(meta_tags)
def remove(self, meta_tag):
self.policy.enforce(self.context, 'delete_metadef_tag', {})
return super(MetadefTagRepoProxy, self).remove(meta_tag)
class MetadefTagFactoryProxy(glance.domain.proxy.MetadefTagFactory):
def __init__(self, meta_tag_factory, context, policy):
self.meta_tag_factory = meta_tag_factory
self.context = context
self.policy = policy
proxy_kwargs = {'context': self.context, 'policy': self.policy}
super(MetadefTagFactoryProxy, self).__init__(
meta_tag_factory,
meta_tag_proxy_class=MetadefTagProxy,
meta_tag_proxy_kwargs=proxy_kwargs)

@ -145,8 +145,7 @@ class CacheController(object):
Removes the image from cache or queue.
"""
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
except exception.NotFound:
@ -214,8 +213,7 @@ class CacheController(object):
the image is in the registry here. That is done by the
prefetcher...
"""
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
except exception.NotFound:

@ -45,8 +45,7 @@ class ImageActionsController(object):
@utils.mutating
def deactivate(self, req, image_id):
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
# FIXME(danms): This will still enforce the get_image policy
# which we don't want
@ -75,8 +74,7 @@ class ImageActionsController(object):
@utils.mutating
def reactivate(self, req, image_id):
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
# FIXME(danms): This will still enforce the get_image policy
# which we don't want

@ -124,8 +124,7 @@ class ImageDataController(object):
request=req,
content_type='text/plain')
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
image = None
refresher = None
cxt = req.context
@ -311,8 +310,7 @@ class ImageDataController(object):
raise webob.exc.HTTPRequestEntityTooLarge(explanation=str(e),
request=req)
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
# NOTE(abhishekk): stage API call does not have its own policy but
# it requires get_image access, this is the right place to check
# whether user has access to image or not
@ -440,8 +438,7 @@ class ImageDataController(object):
self._restore(image_repo, image)
def download(self, req, image_id):
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
if image.status == 'deactivated' and not req.context.is_admin:

@ -52,8 +52,7 @@ class ImageMembersController(object):
def _get_member_repo(self, req, image):
try:
return self.gateway.get_member_repo(image, req.context,
authorization_layer=False)
return self.gateway.get_member_repo(image, req.context)
except exception.Forbidden as e:
msg = (_("Error fetching members of image %(image_id)s: "
"%(inner_msg)s") % {"image_id": image.image_id,
@ -62,8 +61,7 @@ class ImageMembersController(object):
raise webob.exc.HTTPForbidden(explanation=msg)
def _lookup_image(self, req, image_id):
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
return image_repo.get(image_id)
except exception.NotFound:
@ -160,7 +158,7 @@ class ImageMembersController(object):
enforcer=self.policy).add_member()
image_member_factory = self.gateway.get_image_member_factory(
req.context, authorization_layer=False)
req.context)
new_member = image_member_factory.new_image_member(image,
member_id)
member_repo.add(new_member)

@ -46,8 +46,7 @@ class Controller(object):
@utils.mutating
def update(self, req, image_id, tag_value):
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
api_policy.ImageAPIPolicy(req.context, image,
@ -76,8 +75,7 @@ class Controller(object):
@utils.mutating
def delete(self, req, image_id, tag_value):
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
api_policy.ImageAPIPolicy(req.context, image,

@ -90,10 +90,8 @@ class ImagesController(object):
@utils.mutating
def create(self, req, image, extra_properties, tags):
image_factory = self.gateway.get_image_factory(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_factory = self.gateway.get_image_factory(req.context)
image_repo = self.gateway.get_repo(req.context)
try:
if 'owner' not in image:
image['owner'] = req.context.project_id
@ -166,10 +164,8 @@ class ImagesController(object):
def _enforce_import_lock(self, req, image):
admin_context = req.context.elevated()
admin_image_repo = self.gateway.get_repo(
admin_context, authorization_layer=False)
admin_task_repo = self.gateway.get_task_repo(
admin_context, authorization_layer=False)
admin_image_repo = self.gateway.get_repo(admin_context)
admin_task_repo = self.gateway.get_task_repo(admin_context)
other_task = image.extra_properties['os_glance_import_task']
expiry = datetime.timedelta(minutes=60)
@ -313,12 +309,9 @@ class ImagesController(object):
@utils.mutating
def import_image(self, req, image_id, body):
ctxt = req.context
image_repo = self.gateway.get_repo(ctxt,
authorization_layer=False)
task_factory = self.gateway.get_task_factory(
ctxt, authorization_layer=False)
task_repo = self.gateway.get_task_repo(
ctxt, authorization_layer=False)
image_repo = self.gateway.get_repo(ctxt)
task_factory = self.gateway.get_task_factory(ctxt)
task_repo = self.gateway.get_task_repo(ctxt)
import_method = body.get('method').get('name')
uri = body.get('method').get('uri')
all_stores_must_succeed = body.get('all_stores_must_succeed', True)
@ -439,7 +432,7 @@ class ImagesController(object):
admin_context = None
executor_factory = self.gateway.get_task_executor_factory(
ctxt, admin_context=admin_context, authorization_layer=False)
ctxt, admin_context=admin_context)
if (import_method == 'web-download' and
not utils.validate_import_uri(uri)):
@ -528,8 +521,7 @@ class ImagesController(object):
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
# NOTE(danms): This is just a "do you have permission to
# list images" check. Each image is checked against
@ -568,8 +560,7 @@ class ImagesController(object):
return result
def show(self, req, image_id):
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
api_policy.ImageAPIPolicy(req.context, image,
@ -581,8 +572,7 @@ class ImagesController(object):
raise webob.exc.HTTPUnauthorized(explanation=e.msg)
def get_task_info(self, req, image_id):
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
# NOTE (abhishekk): Just to check image is valid
@ -600,8 +590,7 @@ class ImagesController(object):
@utils.mutating
def update(self, req, image_id, changes):
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
api_pol = api_policy.ImageAPIPolicy(req.context, image,
@ -734,8 +723,7 @@ class ImagesController(object):
store_id)
raise webob.exc.HTTPConflict(explanation=msg)
image_repo = self.gateway.get_repo(
req.context, authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)
except exception.NotAuthenticated as e:
@ -837,14 +825,11 @@ class ImagesController(object):
# continue on the local worker to match the user's
# expectations. If the image is already deleted, the caller
# will catch this NotFound like normal.
return self.gateway.get_repo(
req.context,
authorization_layer=False).get(image.image_id)
return self.gateway.get_repo(req.context).get(image.image_id)
@utils.mutating
def delete(self, req, image_id):
image_repo = self.gateway.get_repo(req.context,
authorization_layer=False)
image_repo = self.gateway.get_repo(req.context)
try:
image = image_repo.get(image_id)

@ -61,8 +61,7 @@ class NamespaceController(object):
def index(self, req, marker=None, limit=None, sort_key='created_at',
sort_dir='desc', filters=None):
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
policy_check = api_policy.MetadefAPIPolicy(
req.context,
@ -92,8 +91,7 @@ class NamespaceController(object):
'get_metadef_namespace')]
rs_repo = (
self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False))
self.gateway.get_metadef_resource_type_repo(req.context))
for db_namespace in ns_list:
# Get resource type associations
filters = dict()
@ -137,9 +135,8 @@ class NamespaceController(object):
namespace_created = False
# Create Namespace
ns_factory = self.gateway.get_metadef_namespace_factory(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
# NOTE(abhishekk): Here we are going to check if user is authorized
# to create namespace, resource_types, objects, properties etc.
@ -170,9 +167,9 @@ class NamespaceController(object):
# Create Resource Types
if namespace.resource_type_associations:
rs_factory = (self.gateway.get_metadef_resource_type_factory(
req.context, authorization_layer=False))
req.context))
rs_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
for resource_type in namespace.resource_type_associations:
new_resource = rs_factory.new_resource_type(
namespace=namespace.namespace,
@ -182,9 +179,9 @@ class NamespaceController(object):
# Create Objects
if namespace.objects:
object_factory = self.gateway.get_metadef_object_factory(
req.context, authorization_layer=False)
req.context)
object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
req.context)
for metadata_object in namespace.objects:
new_meta_object = object_factory.new_object(
namespace=namespace.namespace,
@ -194,9 +191,9 @@ class NamespaceController(object):
# Create Tags
if namespace.tags:
tag_factory = self.gateway.get_metadef_tag_factory(
req.context, authorization_layer=False)
req.context)
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
req.context)
for metadata_tag in namespace.tags:
new_meta_tag = tag_factory.new_tag(
namespace=namespace.namespace,
@ -206,9 +203,9 @@ class NamespaceController(object):
# Create Namespace Properties
if namespace.properties:
prop_factory = (self.gateway.get_metadef_property_factory(
req.context, authorization_layer=False))
req.context))
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
req.context)
for (name, value) in namespace.properties.items():
new_property_type = (
prop_factory.new_namespace_property(
@ -267,7 +264,7 @@ class NamespaceController(object):
try:
# Get namespace
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
req.context)
try:
namespace_obj = ns_repo.get(namespace)
policy_check = api_policy.MetadefAPIPolicy(
@ -298,8 +295,7 @@ class NamespaceController(object):
ns_filters['namespace'] = namespace
# Get objects
object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
object_repo = self.gateway.get_metadef_object_repo(req.context)
db_metaobject_list = object_repo.list(filters=ns_filters)
object_list = [MetadefObject.to_wsme_model(
db_metaobject,
@ -310,7 +306,7 @@ class NamespaceController(object):
# Get resource type associations
rs_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
db_resource_type_list = rs_repo.list(filters=ns_filters)
resource_type_list = [ResourceTypeAssociation.to_wsme_model(
resource_type) for resource_type in db_resource_type_list]
@ -319,8 +315,7 @@ class NamespaceController(object):
resource_type_list)
# Get properties
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
db_properties = prop_repo.list(filters=ns_filters)
property_list = Namespace.to_model_properties(db_properties)
if property_list:
@ -331,8 +326,7 @@ class NamespaceController(object):
namespace_detail, filters['resource_type'])
# Get tags
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
db_metatag_list = tag_repo.list(filters=ns_filters)
tag_list = [MetadefTag(**{'name': db_metatag.name})
for db_metatag in db_metatag_list]
@ -348,8 +342,7 @@ class NamespaceController(object):
return namespace_detail
def update(self, req, user_ns, namespace):
namespace_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
namespace_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
ns_obj = namespace_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -397,8 +390,7 @@ class NamespaceController(object):
self.ns_schema_link)
def delete(self, req, namespace):
namespace_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
namespace_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = namespace_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -424,8 +416,7 @@ class NamespaceController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
def delete_objects(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -453,8 +444,7 @@ class NamespaceController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
def delete_tags(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -486,8 +476,7 @@ class NamespaceController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
def delete_properties(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):

@ -49,13 +49,10 @@ class MetadefObjectsController(object):
self.obj_schema_link = '/v2/schemas/metadefs/object'
def create(self, req, metadata_object, namespace):
object_factory = self.gateway.get_metadef_object_factory(
req.context, authorization_layer=False)
object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
object_factory = self.gateway.get_metadef_object_factory(req.context)
object_repo = self.gateway.get_metadef_object_repo(req.context)
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
# NOTE(abhishekk): Verifying that namespace is visible
# to user
@ -99,8 +96,7 @@ class MetadefObjectsController(object):
def index(self, req, namespace, marker=None, limit=None,
sort_key='created_at', sort_dir='desc', filters=None):
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except exception.Forbidden:
@ -119,8 +115,7 @@ class MetadefObjectsController(object):
filters = filters or dict()
filters['namespace'] = namespace
object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
object_repo = self.gateway.get_metadef_object_repo(req.context)
db_metaobject_list = object_repo.list(
marker=marker, limit=limit, sort_key=sort_key,
@ -146,11 +141,9 @@ class MetadefObjectsController(object):
return metadef_objects
def show(self, req, namespace, object_name):
meta_object_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
meta_object_repo = self.gateway.get_metadef_object_repo(req.context)
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except exception.Forbidden:
@ -181,11 +174,9 @@ class MetadefObjectsController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
def update(self, req, metadata_object, namespace, object_name):
meta_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
meta_repo = self.gateway.get_metadef_object_repo(req.context)
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
# NOTE(abhishekk): Verifying that namespace is visible
# to user
@ -233,11 +224,9 @@ class MetadefObjectsController(object):
self.obj_schema_link)
def delete(self, req, namespace, object_name):
meta_repo = self.gateway.get_metadef_object_repo(
req.context, authorization_layer=False)
meta_repo = self.gateway.get_metadef_object_repo(req.context)
try:
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
# NOTE(abhishekk): Verifying that namespace is visible
# to user

@ -63,8 +63,7 @@ class NamespacePropertiesController(object):
return property_type
def index(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -84,8 +83,7 @@ class NamespacePropertiesController(object):
filters = dict()
filters['namespace'] = namespace
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
db_properties = prop_repo.list(filters=filters)
property_list = Namespace.to_model_properties(db_properties)
namespace_properties = PropertyTypes()
@ -99,8 +97,7 @@ class NamespacePropertiesController(object):
return namespace_properties
def show(self, req, namespace, property_name, filters=None):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -124,7 +121,7 @@ class NamespacePropertiesController(object):
api_pol.get_metadef_resource_type()
rs_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
db_resource_type = rs_repo.get(filters['resource_type'],
namespace)
prefix = db_resource_type.prefix
@ -138,8 +135,7 @@ class NamespacePropertiesController(object):
'prefix': prefix})
raise exception.NotFound(msg)
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
db_property = prop_repo.get(namespace, property_name)
property = self._to_model(db_property)
except exception.Forbidden as e:
@ -151,12 +147,9 @@ class NamespacePropertiesController(object):
return property
def create(self, req, namespace, property_type):
prop_factory = self.gateway.get_metadef_property_factory(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
prop_factory = self.gateway.get_metadef_property_factory(req.context)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -192,10 +185,8 @@ class NamespacePropertiesController(object):
return self._to_model(new_property_type)
def update(self, req, namespace, property_name, property_type):
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -233,10 +224,8 @@ class NamespacePropertiesController(object):
return self._to_model(updated_property_type)
def delete(self, req, namespace, property_name):
prop_repo = self.gateway.get_metadef_property_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
prop_repo = self.gateway.get_metadef_property_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):

@ -50,7 +50,7 @@ class ResourceTypeController(object):
try:
filters = {'namespace': None}
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
# NOTE(abhishekk): Here we are just checking if user is
# authorized to view/list metadef resource types or not.
# Also there is no relation between list_metadef_resource_types
@ -75,8 +75,7 @@ class ResourceTypeController(object):
return resource_types
def show(self, req, namespace):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -97,7 +96,7 @@ class ResourceTypeController(object):
filters = {'namespace': namespace}
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
db_type_list = rs_type_repo.list(filters=filters)
rs_type_list = [
@ -120,11 +119,11 @@ class ResourceTypeController(object):
def create(self, req, resource_type, namespace):
rs_type_factory = self.gateway.get_metadef_resource_type_factory(
req.context, authorization_layer=False)
req.context)
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -158,9 +157,9 @@ class ResourceTypeController(object):
def delete(self, req, namespace, resource_type):
rs_type_repo = self.gateway.get_metadef_resource_type_repo(
req.context, authorization_layer=False)
req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):

@ -50,12 +50,9 @@ class TagsController(object):
self.tag_schema_link = '/v2/schemas/metadefs/tag'
def create(self, req, namespace, tag_name):
tag_factory = self.gateway.get_metadef_tag_factory(
req.context, authorization_layer=False)
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
tag_factory = self.gateway.get_metadef_tag_factory(req.context)
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -98,12 +95,9 @@ class TagsController(object):
return MetadefTag.to_wsme_model(new_meta_tag)
def create_tags(self, req, metadata_tags, namespace):
tag_factory = self.gateway.get_metadef_tag_factory(
req.context, authorization_layer=False)
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
tag_factory = self.gateway.get_metadef_tag_factory(req.context)
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -146,8 +140,7 @@ class TagsController(object):
def index(self, req, namespace, marker=None, limit=None,
sort_key='created_at', sort_dir='desc', filters=None):
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -168,8 +161,7 @@ class TagsController(object):
filters = filters or dict()
filters['namespace'] = namespace
tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
tag_repo = self.gateway.get_metadef_tag_repo(req.context)
if marker:
metadef_tag = tag_repo.get(namespace, marker)
marker = metadef_tag.tag_id
@ -193,10 +185,8 @@ class TagsController(object):
return metadef_tags
def show(self, req, namespace, tag_name):
meta_tag_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
meta_tag_repo = self.gateway.get_metadef_tag_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -224,10 +214,8 @@ class TagsController(object):
raise webob.exc.HTTPNotFound(explanation=e.msg)
def update(self, req, metadata_tag, namespace, tag_name):
meta_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
meta_repo = self.gateway.get_metadef_tag_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):
@ -266,10 +254,8 @@ class TagsController(object):
return MetadefTag.to_wsme_model(updated_metadata_tag)
def delete(self, req, namespace, tag_name):
meta_repo = self.gateway.get_metadef_tag_repo(
req.context, authorization_layer=False)
ns_repo = self.gateway.get_metadef_namespace_repo(
req.context, authorization_layer=False)
meta_repo = self.gateway.get_metadef_tag_repo(req.context)
ns_repo = self.gateway.get_metadef_namespace_repo(req.context)
try:
namespace_obj = ns_repo.get(namespace)
except (exception.Forbidden, exception.NotFound):

@ -70,12 +70,9 @@ class TasksController(object):
# NOTE(rosmaita): access to this call is enforced in the deserializer
ctxt = req.context
task_factory = self.gateway.get_task_factory(
ctxt, authorization_layer=False)
executor_factory = self.gateway.get_task_executor_factory(
ctxt, authorization_layer=False)
task_repo = self.gateway.get_task_repo(ctxt,
authorization_layer=False)
task_factory = self.gateway.get_task_factory(ctxt)
executor_factory = self.gateway.get_task_executor_factory(ctxt)
task_repo = self.gateway.get_task_repo(ctxt)
try:
new_task = task_factory.new_task(
task_type=task['type'],
@ -109,8 +106,7 @@ class TasksController(object):
limit = CONF.limit_param_default
limit = min(CONF.api_limit_max, limit)
task_repo = self.gateway.get_task_stub_repo(
req.context, authorization_layer=False)
task_repo = self.gateway.get_task_stub_repo(req.context)
try:
tasks = task_repo.list(marker, limit, sort_key,
sort_dir, filters)
@ -130,8 +126,7 @@ class TasksController(object):
def get(self, req, task_id):
_enforce_access_policy(self.policy, req)
try:
task_repo = self.gateway.get_task_repo(
req.context, authorization_layer=False)
task_repo = self.gateway.get_task_repo(req.context)
task = task_repo.get(task_id)
except exception.NotFound as e:
msg = (_LW("Failed to find task %(task_id)s. Reason: %(reason)s")

@ -15,7 +15,6 @@
# under the License.
import glance_store
from glance.api import authorization
from glance.api import policy
from glance.api import property_protections
from glance.common import property_utils
@ -36,53 +35,34 @@ class Gateway(object):
self.notifier = notifier or glance.notifier.Notifier()
self.policy = policy_enforcer or policy.Enforcer()
def get_image_factory(self, context, authorization_layer=True):
def get_image_factory(self, context):
factory = glance.domain.ImageFactory()
factory = glance.location.ImageFactoryProxy(
factory, context, self.store_api, self.store_utils)
factory = glance.quota.ImageFactoryProxy(
factory, context, self.db_api, self.store_utils)
if authorization_layer:
factory = policy.ImageFactoryProxy(factory, context, self.policy)
factory = glance.notifier.ImageFactoryProxy(
factory, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
factory = property_protections.ProtectedImageFactoryProxy(
factory, context, property_rules)
if authorization_layer:
factory = authorization.ImageFactoryProxy(
factory, context)
return factory
def get_image_member_factory(self, context, authorization_layer=True):
def get_image_member_factory(self, context):
factory = glance.domain.ImageMemberFactory()
factory = glance.quota.ImageMemberFactoryProxy(
factory, context, self.db_api, self.store_utils)
if authorization_layer:
factory = policy.ImageMemberFactoryProxy(
factory, context, self.policy)
if authorization_layer:
factory = authorization.ImageMemberFactoryProxy(
factory, context)
return factory
def get_repo(self, context, authorization_layer=True):
def get_repo(self, context):
"""Get the layered ImageRepo model.
This is where we construct the "the onion" by layering
ImageRepo models on top of each other, starting with the DB at
the bottom.
NB: Code that has implemented policy checks fully above this
layer should pass authorization_layer=False to ensure that no
conflicts with old checks happen. Legacy code should continue
passing True until legacy checks are no longer needed.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An ImageRepo-like object
"""
@ -91,82 +71,49 @@ class Gateway(object):
repo, context, self.store_api, self.store_utils)
repo = glance.quota.ImageRepoProxy(
repo, context, self.db_api, self.store_utils)
if authorization_layer:
repo = policy.ImageRepoProxy(repo, context, self.policy)
repo = glance.notifier.ImageRepoProxy(
repo, context, self.notifier)
if property_utils.is_property_protection_enabled():
property_rules = property_utils.PropertyRules(self.policy)
repo = property_protections.ProtectedImageRepoProxy(
repo, context, property_rules)
if authorization_layer:
repo = authorization.ImageRepoProxy(repo, context)
return repo
def get_member_repo(self, image, context, authorization_layer=True):
def get_member_repo(self, image, context):
repo = glance.db.ImageMemberRepo(
context, self.db_api, image)
repo = glance.location.ImageMemberRepoProxy(
repo, image, context, self.store_api)
if authorization_layer:
repo = policy.ImageMemberRepoProxy(
repo, image, context, self.policy)
repo = glance.notifier.ImageMemberRepoProxy(
repo, image, context, self.notifier)
if authorization_layer:
repo = authorization.ImageMemberRepoProxy(
repo, image, context)
return repo
def get_task_factory(self, context, authorization_layer=True):
def get_task_factory(self, context):
factory = glance.domain.TaskFactory()
if authorization_layer:
factory = policy.TaskFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.TaskFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.TaskFactoryProxy(
factory, context)
return factory
def get_task_repo(self, context, authorization_layer=True):
def get_task_repo(self, context):
repo = glance.db.TaskRepo(context, self.db_api)
if authorization_layer:
repo = policy.TaskRepoProxy(
repo, context, self.policy)
repo = glance.notifier.TaskRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.TaskRepoProxy(
repo, context)
return repo
def get_task_stub_repo(self, context, authorization_layer=True):
def get_task_stub_repo(self, context):
repo = glance.db.TaskRepo(context, self.db_api)
if authorization_layer:
repo = policy.TaskStubRepoProxy(
repo, context, self.policy)
repo = glance.notifier.TaskStubRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.TaskStubRepoProxy(
repo, context)
return repo
def get_task_executor_factory(self, context, admin_context=None,
authorization_layer=True):
task_repo = self.get_task_repo(
context, authorization_layer=authorization_layer)
image_repo = self.get_repo(context,
authorization_layer=authorization_layer)
image_factory = self.get_image_factory(
context, authorization_layer=authorization_layer)
def get_task_executor_factory(self, context, admin_context=None):
task_repo = self.get_task_repo(context)
image_repo = self.get_repo(context)
image_factory = self.get_image_factory(context)
if admin_context:
admin_repo = self.get_repo(admin_context,
authorization_layer=authorization_layer)
admin_repo = self.get_repo(admin_context)
else:
admin_repo = None
return glance.domain.TaskExecutorFactory(task_repo,
@ -174,20 +121,13 @@ class Gateway(object):
image_factory,
admin_repo=admin_repo)
def get_metadef_namespace_factory(self, context,
authorization_layer=True):
def get_metadef_namespace_factory(self, context):
factory = glance.domain.MetadefNamespaceFactory()
if authorization_layer:
factory = policy.MetadefNamespaceFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefNamespaceFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefNamespaceFactoryProxy(
factory, context)
return factory
def get_metadef_namespace_repo(self, context, authorization_layer=True):
def get_metadef_namespace_repo(self, context):
"""Get the layered NamespaceRepo model.
This is where we construct the "the onion" by layering
@ -195,36 +135,20 @@ class Gateway(object):
the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An NamespaceRepo-like object
"""
repo = glance.db.MetadefNamespaceRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefNamespaceRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefNamespaceRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefNamespaceRepoProxy(
repo, context)
return repo
def get_metadef_object_factory(self, context,
authorization_layer=True):
def get_metadef_object_factory(self, context):
factory = glance.domain.MetadefObjectFactory()
if authorization_layer:
factory = policy.MetadefObjectFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefObjectFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefObjectFactoryProxy(
factory, context)
return factory
def get_metadef_object_repo(self, context, authorization_layer=True):
def get_metadef_object_repo(self, context):
"""Get the layered MetadefObjectRepo model.
This is where we construct the "the onion" by layering
@ -232,37 +156,20 @@ class Gateway(object):
the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefObjectRepo-like object
"""
repo = glance.db.MetadefObjectRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefObjectRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefObjectRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefObjectRepoProxy(
repo, context)
return repo
def get_metadef_resource_type_factory(self, context,
authorization_layer=True):
def get_metadef_resource_type_factory(self, context):
factory = glance.domain.MetadefResourceTypeFactory()
if authorization_layer:
factory = policy.MetadefResourceTypeFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefResourceTypeFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefResourceTypeFactoryProxy(
factory, context)
return factory
def get_metadef_resource_type_repo(self, context,
authorization_layer=True):
def get_metadef_resource_type_repo(self, context):
"""Get the layered MetadefResourceTypeRepo model.
This is where we construct the "the onion" by layering
@ -270,37 +177,21 @@ class Gateway(object):
the DB at the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefResourceTypeRepo-like object
"""
repo = glance.db.MetadefResourceTypeRepo(
context, self.db_api)
if authorization_layer:
repo = policy.MetadefResourceTypeRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefResourceTypeRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefResourceTypeRepoProxy(
repo, context)
return repo
def get_metadef_property_factory(self, context,
authorization_layer=True):
def get_metadef_property_factory(self, context):
factory = glance.domain.MetadefPropertyFactory()
if authorization_layer:
factory = policy.MetadefPropertyFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefPropertyFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefPropertyFactoryProxy(
factory, context)
return factory
def get_metadef_property_repo(self, context, authorization_layer=True):
def get_metadef_property_repo(self, context):
"""Get the layered MetadefPropertyRepo model.
This is where we construct the "the onion" by layering
@ -308,36 +199,20 @@ class Gateway(object):
the DB at the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefPropertyRepo-like object
"""
repo = glance.db.MetadefPropertyRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefPropertyRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefPropertyRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefPropertyRepoProxy(
repo, context)
return repo
def get_metadef_tag_factory(self, context,
authorization_layer=True):
def get_metadef_tag_factory(self, context):
factory = glance.domain.MetadefTagFactory()
if authorization_layer:
factory = policy.MetadefTagFactoryProxy(
factory, context, self.policy)
factory = glance.notifier.MetadefTagFactoryProxy(
factory, context, self.notifier)
if authorization_layer:
factory = authorization.MetadefTagFactoryProxy(
factory, context)
return factory
def get_metadef_tag_repo(self, context, authorization_layer=True):
def get_metadef_tag_repo(self, context):
"""Get the layered MetadefTagRepo model.
This is where we construct the "the onion" by layering
@ -345,18 +220,9 @@ class Gateway(object):
the DB at the bottom.
:param context: The RequestContext
:param authorization_layer: Controls whether or not we add the legacy
glance.authorization and glance.policy
layers.
:returns: An MetadefTagRepo-like object
"""
repo = glance.db.MetadefTagRepo(context, self.db_api)
if authorization_layer:
repo = policy.MetadefTagRepoProxy(
repo, context, self.policy)
repo = glance.notifier.MetadefTagRepoProxy(
repo, context, self.notifier)
if authorization_layer:
repo = authorization.MetadefTagRepoProxy(
repo, context)
return repo

@ -44,7 +44,7 @@ class Prefetcher(base.CacheApp):
ctx = context.RequestContext(is_admin=True, show_deleted=True,
roles=['admin'])
try:
image_repo = self.gateway.get_repo(ctx, authorization_layer=False)
image_repo = self.gateway.get_repo(ctx)
image = image_repo.get(image_id)
except exception.NotFound:
LOG.warning(_LW("Image '%s' not found"), image_id)

@ -19,12 +19,8 @@ import http.client as http
from oslo_serialization import jsonutils
import webob
from glance.api import authorization
from glance.common import auth
from glance.common import exception
from glance.common import timeutils
import glance.domain
from glance.tests.unit import utils as unittest_utils
from glance.tests import utils
@ -602,517 +598,3 @@ class TestEndpoints(utils.BaseTestCase):
service_type='object-store',
endpoint_region='foo',
endpoint_type='internalURL')
class TestImageMutability(utils.BaseTestCase):
def setUp(self):
super(TestImageMutability, self).setUp()
self.image_factory = glance.domain.ImageFactory()
def _is_mutable(self, tenant, owner, is_admin=False):
context = glance.context.RequestContext(tenant=tenant,
is_admin=is_admin)
image = self.image_factory.new_image(owner=owner)
return authorization.is_image_mutable(context, image)
def test_admin_everything_mutable(self):
self.assertTrue(self._is_mutable(None, None, is_admin=True))
self.assertTrue(self._is_mutable(None, TENANT1, is_admin=True))
self.assertTrue(self._is_mutable(TENANT1, None, is_admin=True))
self.assertTrue(self._is_mutable(TENANT1, TENANT1, is_admin=True))
self.assertTrue(self._is_mutable(TENANT1, TENANT2, is_admin=True))
def test_no_tenant_nothing_mutable(self):
self.assertFalse(self._is_mutable(None, None))
self.assertFalse(self._is_mutable(None, TENANT1))
def test_regular_user(self):
self.assertFalse(self._is_mutable(TENANT1, None))
self.assertFalse(self._is_mutable(TENANT1, TENANT2))
self.assertTrue(self._is_mutable(TENANT1, TENANT1))
class TestImmutableImage(utils.BaseTestCase):
def setUp(self):
super(TestImmutableImage, self).setUp()
image_factory = glance.domain.ImageFactory()
self.context = glance.context.RequestContext(tenant=TENANT1)
image = image_factory.new_image(
image_id=UUID1,
name='Marvin',
owner=TENANT1,
disk_format='raw',
container_format='bare',
extra_properties={'foo': 'bar'},
tags=['ping', 'pong'],
)
self.image = authorization.ImmutableImageProxy(image, self.context)
def _test_change(self, attr, value):
self.assertRaises(exception.Forbidden,
setattr, self.image, attr, value)
self.assertRaises(exception.Forbidden,
delattr, self.image, attr)
def test_change_id(self):
self._test_change('image_id', UUID2)
def test_change_name(self):
self._test_change('name', 'Freddie')
def test_change_owner(self):
self._test_change('owner', TENANT2)
def test_change_min_disk(self):
self._test_change('min_disk', 100)
def test_change_min_ram(self):
self._test_change('min_ram', 1024)
def test_change_disk_format(self):
self._test_change('disk_format', 'vhd')
def test_change_container_format(self):
self._test_change('container_format', 'ova')
def test_change_visibility(self):
self._test_change('visibility', 'public')
def test_change_status(self):
self._test_change('status', 'active')
def test_change_created_at(self):
self._test_change('created_at', timeutils.utcnow())
def test_change_updated_at(self):
self._test_change('updated_at', timeutils.utcnow())
def test_change_locations(self):
self._test_change('locations', ['http://a/b/c'])
self.assertRaises(exception.Forbidden,
self.image.locations.append, 'http://a/b/c')
self.assertRaises(exception.Forbidden,
self.image.locations.extend, ['http://a/b/c'])
self.assertRaises(exception.Forbidden,
self.image.locations.insert, 'foo')
self.assertRaises(exception.Forbidden,
self.image.locations.pop)
self.assertRaises(exception.Forbidden,
self.image.locations.remove, 'foo')
self.assertRaises(exception.Forbidden,
self.image.locations.reverse)
self.assertRaises(exception.Forbidden,
self.image.locations.sort)
self.assertRaises(exception.Forbidden,
self.image.locations.__delitem__, 0)
self.assertRaises(exception.Forbidden,
self.image.locations.__delslice__, 0, 2)
self.assertRaises(exception.Forbidden,
self.image.locations.__setitem__, 0, 'foo')
self.assertRaises(exception.Forbidden,
self.image.locations.__setslice__,
0, 2, ['foo', 'bar'])
self.assertRaises(exception.Forbidden,
self.image.locations.__iadd__, 'foo')
self.assertRaises(exception.Forbidden,
self.image.locations.__imul__, 2)
def test_change_size(self):
self._test_change('size', 32)
def test_change_tags(self):
self.assertRaises(exception.Forbidden,
delattr, self.image, 'tags')
self.assertRaises(exception.Forbidden,
setattr, self.image, 'tags', ['king', 'kong'])
self.assertRaises(exception.Forbidden, self.image.tags.pop)
self.assertRaises(exception.Forbidden, self.image.tags.clear)
self.assertRaises(exception.Forbidden, self.image.tags.add, 'king')
self.assertRaises(exception.Forbidden, self.image.tags.remove, 'ping')
self.assertRaises(exception.Forbidden,
self.image.tags.update, set(['king', 'kong']))
self.assertRaises(exception.Forbidden,
self.image.tags.intersection_update, set([]))
self.assertRaises(exception.Forbidden,
self.image.tags.difference_update, set([]))
self.assertRaises(exception.Forbidden,
self.image.tags.symmetric_difference_update,
set([]))
def test_change_properties(self):
self.assertRaises(exception.Forbidden,
delattr, self.image, 'extra_properties')
self.assertRaises(exception.Forbidden,
setattr, self.image, 'extra_properties', {})
self.assertRaises(exception.Forbidden,
self.image.extra_properties.__delitem__, 'foo')
self.assertRaises(exception.Forbidden,
self.image.extra_properties.__setitem__, 'foo', 'b')
self.assertRaises(exception.Forbidden,
self.image.extra_properties.__setitem__, 'z', 'j')
self.assertRaises(exception.Forbidden,
self.image.extra_properties.pop)
self.assertRaises(exception.Forbidden,
self.image.extra_properties.popitem)
self.assertRaises(exception.Forbidden,
self.image.extra_properties.setdefault, 'p', 'j')
self.assertRaises(exception.Forbidden,
self.image.extra_properties.update, {})
def test_delete(self):
self.assertRaises(exception.Forbidden, self.image.delete)
def test_set_data(self):
self.assertRaises(exception.Forbidden,
self.image.set_data, 'blah', 4)
def test_deactivate_image(self):
self.assertRaises(exception.Forbidden, self.image.deactivate)
def test_reactivate_image(self):
self.assertRaises(exception.Forbidden, self.image.reactivate)
def test_get_data(self):
class FakeImage(object):
def get_data(self):
return 'tiddlywinks'
image = glance.api.authorization.ImmutableImageProxy(
FakeImage(), self.context)
self.assertEqual('tiddlywinks', image.get_data())
class TestImageFactoryProxy(utils.BaseTestCase):
def setUp(self):
super(TestImageFactoryProxy, self).setUp()
factory = glance.domain.ImageFactory()
self.context = glance.context.RequestContext(tenant=TENANT1)
self.image_factory = authorization.ImageFactoryProxy(factory,
self.context)
def test_default_owner_is_set(self):
image = self.image_factory.new_image()
self.assertEqual(TENANT1, image.owner)
def test_wrong_owner_cannot_be_set(self):
self.assertRaises(exception.Forbidden,
self.image_factory.new_image, owner=TENANT2)
def test_cannot_set_owner_to_none(self):
self.assertRaises(exception.Forbidden,
self.image_factory.new_image, owner=None)
def test_admin_can_set_any_owner(self):
self.context.is_admin = True
image = self.image_factory.new_image(owner=TENANT2)
self.assertEqual(TENANT2, image.owner)
def test_admin_can_set_owner_to_none(self):
self.context.is_admin = True
image = self.image_factory.new_image(owner=None)
self.assertIsNone(image.owner)
def test_admin_still_gets_default_tenant(self):
self.context.is_admin = True
image = self.image_factory.new_image()
self.assertEqual(TENANT1, image.owner)
class TestImageRepoProxy(utils.BaseTestCase):
class ImageRepoStub(object):
def __init__(self, fixtures):
self.fixtures = fixtures
def get(self, image_id):
for f in self.fixtures:
if f.image_id == image_id:
return f
else:
raise ValueError(image_id)
def list(self, *args, **kwargs):
return self.fixtures
def setUp(self):
super(TestImageRepoProxy, self).setUp()
image_factory = glance.domain.ImageFactory()
self.fixtures = [
image_factory.new_image(owner=TENANT1),
image_factory.new_image(owner=TENANT2, visibility='public'),
image_factory.new_image(owner=TENANT2),
]
self.context = glance.context.RequestContext(tenant=TENANT1)
image_repo = self.ImageRepoStub(self.fixtures)
self.image_repo = authorization.ImageRepoProxy(image_repo,
self.context)
def test_get_mutable_image(self):
image = self.image_repo.get(self.fixtures[0].image_id)
self.assertEqual(image.image_id, self.fixtures[0].image_id)
def test_get_immutable_image(self):
image = self.image_repo.get(self.fixtures[1].image_id)
self.assertRaises(exception.Forbidden,
setattr, image, 'name', 'Vince')
def test_list(self):
images = self.image_repo.list()
self.assertEqual(images[0].image_id, self.fixtures[0].image_id)
self.assertRaises(exception.Forbidden,
setattr, images[1], 'name', 'Wally')
self.assertRaises(exception.Forbidden,
setattr, images[2], 'name', 'Calvin')
class TestImmutableTask(utils.BaseTestCase):
def setUp(self):
super(TestImmutableTask, self).setUp()
task_factory = glance.domain.TaskFactory()
self.context = glance.context.RequestContext(tenant=TENANT2)
task_type = 'import'
image_id = 'fake_image_id'
user_id = 'fake_user'
request_id = 'fake_request_id'
owner = TENANT2
task = task_factory.new_task(task_type, owner, image_id,
user_id, request_id)
self.task = authorization.ImmutableTaskProxy(task)
def _test_change(self, attr, value):
self.assertRaises(
exception.Forbidden,
setattr,
self.task,
attr,
value
)
self.assertRaises(
exception.Forbidden,
delattr,
self.task,
attr
)
def test_change_id(self):
self._test_change('task_id', UUID2)
def test_change_type(self):
self._test_change('type', 'fake')
def test_change_status(self):
self._test_change('status', 'success')
def test_change_owner(self):
self._test_change('owner', 'fake')
def test_change_expires_at(self):
self._test_change('expires_at', 'fake')
def test_change_created_at(self):
self._test_change('created_at', 'fake')
def test_change_updated_at(self):
self._test_change('updated_at', 'fake')
def test_begin_processing(self):
self.assertRaises(
exception.Forbidden,
self.task.begin_processing
)
def test_succeed(self):
self.assertRaises(
exception.Forbidden,
self.task.succeed,
'result'
)
def test_fail(self):
self.assertRaises(
exception.Forbidden,
self.task.fail,
'message'
)
class TestImmutableTaskStub(utils.BaseTestCase):
def setUp(self):
super(TestImmutableTaskStub, self).setUp()
task_factory = glance.domain.TaskFactory()
self.context = glance.context.RequestContext(tenant=TENANT2)
task_type = 'import'
image_id = 'fake_image_id'
user_id = 'fake_user'
request_id = 'fake_request_id'
owner = TENANT2
task = task_factory.new_task(task_type, owner, image_id,
user_id, request_id)
self.task = authorization.ImmutableTaskStubProxy(task)
def _test_change(self, attr, value):
self.assertRaises(
exception.Forbidden,
setattr,
self.task,
attr,
value
)
self.assertRaises(
exception.Forbidden,
delattr,
self.task,
attr
)
def test_change_id(self):
self._test_change('task_id', UUID2)
def test_change_type(self):
self._test_change('type', 'fake')
def test_change_status(self):
self._test_change('status', 'success')
def test_change_owner(self):
self._test_change('owner', 'fake')
def test_change_expires_at(self):
self._test_change('expires_at', 'fake')
def test_change_created_at(self):
self._test_change('created_at', 'fake')
def test_change_updated_at(self):
self._test_change('updated_at', 'fake')
class TestTaskFactoryProxy(utils.BaseTestCase):
def setUp(self):
super(TestTaskFactoryProxy, self).setUp()
factory = glance.domain.TaskFactory()
self.context = glance.context.RequestContext(tenant=TENANT1)
self.context_owner_is_none = glance.context.RequestContext()
self.task_factory = authorization.TaskFactoryProxy(
factory,
self.context
)
self.task_type = 'import'
self.task_input = '{"loc": "fake"}'
self.owner = 'foo'
self.image_id = 'fake_image_id'
self.user_id = 'fake_user'
self.request_id = 'fake_request_id'
self.request1 = unittest_utils.get_fake_request(tenant=TENANT1)
self.request2 = unittest_utils.get_fake_request(tenant=TENANT2)
def test_task_create_default_owner(self):
owner = self.request1.context.owner
task = self.task_factory.new_task(task_type=self.task_type,
owner=owner, image_id=self.image_id,
user_id=self.user_id,
request_id=self.request_id)
self.assertEqual(TENANT1, task.owner)
def test_task_create_wrong_owner(self):
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=self.owner, image_id=self.image_id,
user_id=self.user_id,
request_id=self.request_id)
def test_task_create_owner_as_None(self):
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=None, image_id=self.image_id,
user_id=self.user_id,
request_id=self.request_id)
def test_task_create_admin_context_owner_as_None(self):
self.context.is_admin = True
self.assertRaises(exception.Forbidden,
self.task_factory.new_task,
task_type=self.task_type,
task_input=self.task_input,
owner=None, image_id=self.image_id,
user_id=self.user_id,
request_id=self.request_id)
class TestTaskRepoProxy(utils.BaseTestCase):
class TaskRepoStub(object):
def __init__(self, fixtures):
self.fixtures = fixtures
def get(self, task_id):
for f in self.fixtures:
if f.task_id == task_id:
return f
else:
raise ValueError(task_id)
class TaskStubRepoStub(object):
def __init__(self, fixtures):
self.fixtures = fixtures
def list(self, *args, **kwargs):
return self.fixtures
def setUp(self):
super(TestTaskRepoProxy, self).setUp()
task_factory = glance.domain.TaskFactory()
task_type = 'import'
image_id = 'fake_image_id'
user_id = 'fake_user'
request_id = 'fake_request_id'
owner = None
self.fixtures = [
task_factory.new_task(task_type, owner, image_id,
user_id, request_id),
task_factory.new_task(task_type, owner, image_id,
user_id, request_id),
task_factory.new_task(task_type, owner, image_id,
user_id, request_id),
]
self.context = glance.context.RequestContext(tenant=TENANT1)
task_repo = self.TaskRepoStub(self.fixtures)
task_stub_repo = self.TaskStubRepoStub(self.fixtures)
self.task_repo = authorization.TaskRepoProxy(
task_repo,
self.context
)
self.task_stub_repo = authorization.TaskStubRepoProxy(
task_stub_repo,
self.context
)
def test_get_mutable_task(self):
task = self.task_repo.get(self.fixtures[0].task_id)
self.assertEqual(task.task_id, self.fixtures[0].task_id)
def test_get_immutable_task(self):
task_id = self.fixtures[1].task_id
task = self.task_repo.get(task_id)
self.assertRaises(exception.Forbidden,
setattr, task, 'input', 'foo')
def test_list(self):
tasks = self.task_stub_repo.list()
self.assertEqual(tasks[0].task_id, self.fixtures[0].task_id)
self.assertRaises(exception.Forbidden,
setattr,
tasks[1],
'owner',
'foo')
self.assertRaises(exception.Forbidden,
setattr,
tasks[2],
'owner',
'foo')

@ -15,7 +15,6 @@
from unittest import mock
from glance.api import authorization
from glance.api import property_protections
from glance import context
from glance import gateway
@ -38,12 +37,9 @@ class TestGateway(test_utils.BaseTestCase):
@mock.patch.object(self.gateway, 'get_image_factory')
def _test(mock_gif, mock_gr, mock_gtr):
self.gateway.get_task_executor_factory(self.context)
mock_gtr.assert_called_once_with(
self.context, authorization_layer=True)
mock_gr.assert_called_once_with(
self.context, authorization_layer=True)
mock_gif.assert_called_once_with(
self.context, authorization_layer=True)
mock_gtr.assert_called_once_with(self.context)
mock_gr.assert_called_once_with(self.context)
mock_gif.assert_called_once_with(self.context)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock_gr.return_value,
@ -63,15 +59,12 @@ class TestGateway(test_utils.BaseTestCase):
self.gateway.get_task_executor_factory(
self.context,
admin_context=mock.sentinel.admin_context)
mock_gtr.assert_called_once_with(
self.context, authorization_layer=True)
mock_gtr.assert_called_once_with(self.context)
mock_gr.assert_has_calls([
mock.call(self.context, authorization_layer=True),
mock.call(mock.sentinel.admin_context,
authorization_layer=True),
mock.call(self.context),
mock.call(mock.sentinel.admin_context)
])
mock_gif.assert_called_once_with(
self.context, authorization_layer=True)
mock_gif.assert_called_once_with(self.context)
mock_factory.assert_called_once_with(
mock_gtr.return_value,
mock.sentinel.image_repo,
@ -80,50 +73,28 @@ class TestGateway(test_utils.BaseTestCase):
_test()
@mock.patch('glance.api.policy.ImageRepoProxy')
def test_get_repo(self, mock_proxy):
def test_get_repo(self):
repo = self.gateway.get_repo(self.context)
self.assertIsInstance(repo, authorization.ImageRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.ImageRepoProxy')
def test_get_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_repo(self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.ImageRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.common.property_utils.PropertyRules._load_rules')
def test_get_repo_without_auth_with_pp(self, mock_load):
def test_get_repo_with_pp(self, mock_load):
self.config(property_protection_file='foo')
repo = self.gateway.get_repo(self.context, authorization_layer=False)
repo = self.gateway.get_repo(self.context)
self.assertIsInstance(repo,
property_protections.ProtectedImageRepoProxy)
def test_get_image_factory(self):
factory = self.gateway.get_image_factory(self.context)
self.assertIsInstance(factory, authorization.ImageFactoryProxy)
def test_get_image_factory_without_auth(self):
factory = self.gateway.get_image_factory(self.context,
authorization_layer=False)
self.assertIsInstance(factory, notifier.ImageFactoryProxy)
@mock.patch('glance.common.property_utils.PropertyRules._load_rules')
def test_get_image_factory_without_auth_with_pp(self, mock_load):
def test_get_image_factory_with_pp(self, mock_load):
self.config(property_protection_file='foo')
factory = self.gateway.get_image_factory(self.context,
authorization_layer=False)
factory = self.gateway.get_image_factory(self.context)
self.assertIsInstance(factory,
property_protections.ProtectedImageFactoryProxy)
@mock.patch('glance.api.policy.ImageFactoryProxy')
def test_get_image_factory_policy_layer(self, mock_pif):
self.gateway.get_image_factory(self.context, authorization_layer=False)
mock_pif.assert_not_called()
self.gateway.get_image_factory(self.context)
self.assertTrue(mock_pif.called)
def test_get_repo_member_property(self):
"""Test that the image.member property is propagated all the way from
the DB to the top of the gateway repo stack.
@ -145,238 +116,62 @@ class TestGateway(test_utils.BaseTestCase):
# We are a member, so member is our tenant id
self.assertEqual(unit_test_utils.TENANT2, image.member)
@mock.patch('glance.api.policy.MetadefNamespaceRepoProxy')
def test_get_namespace_repo(self, mock_proxy):
def test_get_namespace_repo(self):
repo = self.gateway.get_metadef_namespace_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefNamespaceRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefNamespaceRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_factory(self, mock_proxy):
def test_get_namespace_factory(self):
repo = self.gateway.get_metadef_namespace_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefNamespaceFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefNamespaceFactoryProxy')
def test_get_namespace_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_namespace_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefNamespaceFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefObjectRepoProxy')
def test_get_object_repo(self, mock_proxy):
def test_get_object_repo(self):
repo = self.gateway.get_metadef_object_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefObjectRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefObjectRepoProxy')
def test_get_object_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_object_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefObjectRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefObjectFactoryProxy')
def test_get_object_factory(self, mock_proxy):
def test_get_object_factory(self):
repo = self.gateway.get_metadef_object_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefObjectFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefObjectFactoryProxy')
def test_get_object_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_object_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefObjectFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefResourceTypeRepoProxy')
def test_get_resourcetype_repo(self, mock_proxy):
def test_get_resourcetype_repo(self):
repo = self.gateway.get_metadef_resource_type_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefResourceTypeRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefResourceTypeRepoProxy')
def test_get_resourcetype_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefResourceTypeRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefResourceTypeFactoryProxy')
def test_get_resource_type_factory(self, mock_proxy):
def test_get_resource_type_factory(self):
repo = self.gateway.get_metadef_resource_type_factory(self.context)
self.assertIsInstance(repo,
authorization.MetadefResourceTypeFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefResourceTypeFactoryProxy')
def test_get_resource_type_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_resource_type_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefResourceTypeFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefPropertyRepoProxy')
def test_get_property_repo(self, mock_proxy):
def test_get_property_repo(self):
repo = self.gateway.get_metadef_property_repo(self.context)
self.assertIsInstance(repo,
authorization.MetadefPropertyRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefPropertyRepoProxy')
def test_get_property_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_property_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefPropertyRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefPropertyFactoryProxy')
def test_get_property_factory(self, mock_proxy):
def test_get_property_factory(self):
repo = self.gateway.get_metadef_property_factory(self.context)
self.assertIsInstance(repo, authorization.MetadefPropertyFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefPropertyFactoryProxy')
def test_get_property_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_property_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefPropertyFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefTagRepoProxy')
def test_get_tag_repo(self, mock_proxy):
def test_get_tag_repo(self):
repo = self.gateway.get_metadef_tag_repo(self.context)
self.assertIsInstance(repo, authorization.MetadefTagRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefTagRepoProxy')
def test_get_tag_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_tag_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefTagRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.MetadefTagFactoryProxy')
def test_get_tag_factory(self, mock_proxy):
def test_get_tag_factory(self):
repo = self.gateway.get_metadef_tag_factory(self.context)
self.assertIsInstance(repo, authorization.MetadefTagFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.MetadefTagFactoryProxy')
def test_get_tag_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_metadef_tag_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.MetadefTagFactoryProxy)
@mock.patch('glance.api.policy.ImageMemberRepoProxy')
def test_get_member_repo(self, mock_proxy):
with mock.patch.object(
authorization, '_validate_image_accepts_members'):
repo = self.gateway.get_member_repo(
mock.Mock(), self.context)
self.assertIsInstance(repo, authorization.ImageMemberRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.ANY,
mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.ImageMemberRepoProxy')
def test_get_member_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_member_repo(
mock.sentinel.image, self.context, authorization_layer=False)
def test_get_member_repo(self):
repo = self.gateway.get_member_repo(mock.sentinel.image, self.context)
self.assertIsInstance(repo, notifier.ImageMemberRepoProxy)
@mock.patch('glance.api.policy.ImageMemberFactoryProxy')
def test_get_member_factory(self, mock_proxy):
def test_get_member_factory(self):
repo = self.gateway.get_image_member_factory(self.context)
self.assertIsInstance(repo, authorization.ImageMemberFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.ImageMemberFactoryProxy')
def test_get_member_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_image_member_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, quota.ImageMemberFactoryProxy)
@mock.patch('glance.api.policy.TaskRepoProxy')
def test_get_task_repo(self, mock_proxy):
def test_get_task_repo(self):
repo = self.gateway.get_task_repo(self.context)
self.assertIsInstance(repo, authorization.TaskRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.TaskRepoProxy')
def test_get_task_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_task_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.TaskRepoProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.TaskFactoryProxy')
def test_get_task_factory(self, mock_proxy):
def test_get_task_factory(self):
repo = self.gateway.get_task_factory(self.context)
self.assertIsInstance(repo, authorization.TaskFactoryProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.TaskFactoryProxy')
def test_get_task_factory_without_auth(self, mock_proxy):
repo = self.gateway.get_task_factory(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.TaskFactoryProxy)
mock_proxy.assert_not_called()
@mock.patch('glance.api.policy.ImageRepoProxy')
@mock.patch('glance.api.policy.TaskRepoProxy')
def test_get_task_executor_factory_with_auth(self, mock_task_proxy,
mock_image_proxy):
self.gateway.get_task_executor_factory(self.context)
mock_task_proxy.assert_called_once_with(mock.ANY,
mock.sentinel.context,
mock.ANY)
mock_image_proxy.assert_called_once_with(mock.ANY,
mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.ImageRepoProxy')
@mock.patch('glance.api.policy.TaskRepoProxy')
def test_get_task_executor_factory_without_auth(self, mock_task_proxy,
mock_image_proxy):
self.gateway.get_task_executor_factory(self.context,
authorization_layer=False)
mock_task_proxy.assert_not_called()
mock_image_proxy.assert_not_called()
@mock.patch('glance.api.policy.TaskStubRepoProxy')
def test_get_task_stub_repo(self, mock_proxy):
def test_get_task_stub_repo(self):
repo = self.gateway.get_task_stub_repo(self.context)
self.assertIsInstance(repo, authorization.TaskStubRepoProxy)
mock_proxy.assert_called_once_with(mock.ANY, mock.sentinel.context,
mock.ANY)
@mock.patch('glance.api.policy.TaskStubRepoProxy')
def test_get_task_stub_repo_without_auth(self, mock_proxy):
repo = self.gateway.get_task_stub_repo(
self.context, authorization_layer=False)
self.assertIsInstance(repo, notifier.TaskStubRepoProxy)
mock_proxy.assert_not_called()

@ -533,9 +533,8 @@ class TestImageCacheSqlite(test_utils.BaseTestCase,
ctx = context.RequestContext(is_admin=True, roles=['admin'])
gateway = glance_gateway.Gateway()
image_factory = gateway.get_image_factory(ctx,
authorization_layer=False)
image_repo = gateway.get_repo(ctx, authorization_layer=False)
image_factory = gateway.get_image_factory(ctx)
image_repo = gateway.get_repo(ctx)
fetcher = prefetcher.Prefetcher()
# Create an image with no values set and queue it
@ -642,5 +641,4 @@ class TestImagePrefetcher(test_utils.BaseTestCase):
with mock.patch.object(self.prefetcher.gateway,
'get_repo') as mock_get:
self.prefetcher.fetch_image_into_cache('fake-image-id')
mock_get.assert_called_once_with(mock.ANY,
authorization_layer=False)
mock_get.assert_called_once_with(mock.ANY)

@ -27,8 +27,6 @@ from glance.common import exception
import glance.context
from glance.policies import base as base_policy
from glance.tests.unit import base
import glance.tests.unit.utils as unit_test_utils
from glance.tests import utils as test_utils
UUID1 = 'c80a1a6c-bd1f-41c5-90ee-81afedb1d58d'
@ -457,579 +455,6 @@ class TestPolicyEnforcerNoFile(base.IsolatedUnitTest):
enforcer.enforce(admin_context, 'manage_image_cache', {})
class TestImagePolicy(test_utils.BaseTestCase):
def setUp(self):
self.image_stub = ImageStub(UUID1)
self.image_repo_stub = ImageRepoStub()
self.image_factory_stub = ImageFactoryStub()
self.policy = mock.Mock()
self.policy.enforce = mock.Mock()
self.context = mock.Mock()
super(TestImagePolicy, self).setUp()
def test_publicize_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'public')
self.assertEqual('private', image.visibility)
self.policy.enforce.assert_called_once_with(
self.context, "publicize_image", expected_target)
def test_publicize_image_allowed(self):
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
image.visibility = 'public'
self.assertEqual('public', image.visibility)
self.policy.enforce.assert_called_once_with(
self.context, "publicize_image", expected_target)
def test_communitize_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden,
setattr, image, 'visibility', 'community')
self.assertEqual('private', image.visibility)
self.policy.enforce.assert_called_once_with(
self.context, "communitize_image", expected_target)
def test_communitize_image_allowed(self):
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
image.visibility = 'community'
self.assertEqual('community', image.visibility)
self.policy.enforce.assert_called_once_with(
self.context, "communitize_image", expected_target)
def test_delete_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image.delete)
self.assertEqual('active', image.status)
self.policy.enforce.assert_called_once_with(
self.context, "delete_image", expected_target)
def test_delete_image_allowed(self):
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
image.delete()
self.assertEqual('deleted', image.status)
self.policy.enforce.assert_called_once_with(
self.context, "delete_image", expected_target)
def test_get_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_target = IterableMock()
with mock.patch.object(glance.api.policy, 'ImageTarget') as target:
target.return_value = image_target
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
self.assertRaises(exception.NotFound, image_repo.get, UUID1)
expected_target = {'project_id': 'tenant1'}
self.policy.enforce.assert_called_once_with(
self.context, "get_image", expected_target)
def test_get_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
image = image_repo.get(UUID1)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertIsInstance(image, glance.api.policy.ImageProxy)
self.policy.enforce.assert_called_once_with(
self.context, "get_image", expected_target)
def test_get_images_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_repo.list)
expected_target = {'project_id': self.context.project_id}
self.policy.enforce.assert_called_once_with(
self.context, "get_images", expected_target)
def test_get_images_allowed(self):
expected_target = {'project_id': self.context.project_id}
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
images = image_repo.list()
for i, image in enumerate(images):
self.assertIsInstance(image, glance.api.policy.ImageProxy)
self.assertEqual('image_from_list_%d' % i, image.image)
self.policy.enforce.assert_called_once_with(
self.context, "get_images", expected_target)
def test_modify_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image_repo.save, image)
self.policy.enforce.assert_called_once_with(
self.context, "modify_image", expected_target)
def test_modify_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
image_repo.save(image)
self.policy.enforce.assert_called_once_with(
self.context, "modify_image", expected_target)
def test_add_image_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.assertRaises(exception.Forbidden, image_repo.add, image)
self.policy.enforce.assert_called_once_with(
self.context, "add_image", expected_target)
def test_add_image_allowed(self):
image_repo = glance.api.policy.ImageRepoProxy(
self.image_repo_stub, self.context, self.policy)
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
image_repo.add(image)
self.policy.enforce.assert_called_once_with(
self.context, "add_image", expected_target)
def test_new_image_visibility_public_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_factory.new_image,
visibility='public', owner='tenant1')
expected_target = {'project_id': 'tenant1'}
self.policy.enforce.assert_called_once_with(
self.context, "publicize_image", expected_target)
def test_new_image_visibility_public_allowed(self):
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, self.context, self.policy)
image_factory.new_image(visibility='public', owner='tenant1')
expected_target = {'project_id': 'tenant1'}
self.policy.enforce.assert_called_once_with(
self.context, "publicize_image", expected_target)
def test_new_image_visibility_community_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, self.context, self.policy)
self.assertRaises(exception.Forbidden, image_factory.new_image,
visibility='community', owner='tenant1')
expected_target = {'project_id': 'tenant1'}
self.policy.enforce.assert_called_once_with(
self.context, "communitize_image", expected_target)
def test_new_image_visibility_community_allowed(self):
image_factory = glance.api.policy.ImageFactoryProxy(
self.image_factory_stub, self.context, self.policy)
image_factory.new_image(visibility='community', owner='tenant1')
expected_target = {'project_id': 'tenant1'}
self.policy.enforce.assert_called_once_with(self.context,
"communitize_image",
expected_target)
def test_image_get_data_policy_enforced_with_target(self):
extra_properties = {
'test_key': 'test_4321'
}
image_stub = ImageStub(UUID1, extra_properties=extra_properties)
image = glance.api.policy.ImageProxy(
image_stub, self.context, self.policy)
expected_target = dict(image.target)
expected_target['project_id'] = image.owner
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, image.get_data)
self.policy.enforce.assert_called_once_with(
self.context, "download_image", expected_target)
class TestMemberPolicy(test_utils.BaseTestCase):
def setUp(self):
self.policy = mock.Mock()
self.policy.enforce = mock.Mock()
self.image_stub = ImageStub(UUID1)
self.context = mock.Mock()
image = glance.api.policy.ImageProxy(
self.image_stub, self.context, self.policy)
self.member_repo = glance.api.policy.ImageMemberRepoProxy(
MemberRepoStub(), image, self.context, self.policy)
self.target = dict(self.member_repo.target)
self.target['project_id'] = self.context.project_id
super(TestMemberPolicy, self).setUp()
def test_add_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.add, '')
self.policy.enforce.assert_called_once_with(
self.context, "add_member", self.target)
def test_add_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.add(image_member)
self.assertEqual('member_repo_add', image_member.output)
self.policy.enforce.assert_called_once_with(
self.context, "add_member", self.target)
def test_get_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.get, '')
self.policy.enforce.assert_called_once_with(
self.context, "get_member", self.target)
def test_get_member_allowed(self):
output = self.member_repo.get('')
self.assertEqual('member_repo_get', output)
self.policy.enforce.assert_called_once_with(
self.context, "get_member", self.target)
def test_modify_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.save, '')
self.policy.enforce.assert_called_once_with(
self.context, "modify_member", self.target)
def test_modify_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.save(image_member)
self.assertEqual('member_repo_save', image_member.output)
self.policy.enforce.assert_called_once_with(
self.context, "modify_member", self.target)
def test_get_members_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.list, '')
self.policy.enforce.assert_called_once_with(
self.context, "get_members", self.target)
def test_get_members_allowed(self):
output = self.member_repo.list('')
self.assertEqual('member_repo_list', output)
self.policy.enforce.assert_called_once_with(
self.context, "get_members", self.target)
def test_delete_member_not_allowed(self):
self.policy.enforce.side_effect = exception.Forbidden
self.assertRaises(exception.Forbidden, self.member_repo.remove, '')
self.policy.enforce.assert_called_once_with(
self.context, "delete_member", self.target)
def test_delete_member_allowed(self):
image_member = ImageMembershipStub()
self.member_repo.remove(image_member)
self.assertEqual('member_repo_remove', image_member.output)
self.policy.enforce.assert_called_once_with(
self.context, "delete_member", self.target)
class TestTaskPolicy(test_utils.BaseTestCase):
def setUp(self):
self.task_stub = TaskStub(UUID1)
self.task_repo_stub = TaskRepoStub()
self.task_factory_stub = TaskFactoryStub()
self.policy = unit_test_utils.FakePolicyEnforcer()
super(TestTaskPolicy, self).setUp()
def test_get_task_not_allowed(self):
rules = {"get_task": False}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskRepoProxy(
self.task_repo_stub,
{},
self.policy
)
self.assertRaises(exception.Forbidden,
task_repo.get,
UUID1)
def test_get_task_allowed(self):
rules = {"get_task": True}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskRepoProxy(
self.task_repo_stub,
{},
self.policy
)
task = task_repo.get(UUID1)
self.assertIsInstance(task, glance.api.policy.TaskProxy)
self.assertEqual('task_from_get', task.task)
def test_get_tasks_not_allowed(self):
rules = {"get_tasks": False}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskStubRepoProxy(
self.task_repo_stub,
{},
self.policy
)
self.assertRaises(exception.Forbidden, task_repo.list)
def test_get_tasks_allowed(self):
rules = {"get_task": True}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskStubRepoProxy(
self.task_repo_stub,
{},
self.policy
)
tasks = task_repo.list()
for i, task in enumerate(tasks):
self.assertIsInstance(task, glance.api.policy.TaskStubProxy)
self.assertEqual('task_from_list_%d' % i, task.task_stub)
def test_add_task_not_allowed(self):
rules = {"add_task": False}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskRepoProxy(
self.task_repo_stub,
{},
self.policy
)
task = glance.api.policy.TaskProxy(self.task_stub, {}, self.policy)
self.assertRaises(exception.Forbidden, task_repo.add, task)
def test_add_task_allowed(self):
rules = {"add_task": True}
self.policy.set_rules(rules)
task_repo = glance.api.policy.TaskRepoProxy(
self.task_repo_stub,
{},
self.policy
)
task = glance.api.policy.TaskProxy(self.task_stub, {}, self.policy)
task_repo.add(task)
class TestMetadefPolicy(test_utils.BaseTestCase):
def setUp(self):
self.fakens = mock.Mock()
self.fakeobj = mock.Mock()
self.fakert = mock.Mock()
self.fakeprop = mock.Mock()
self.faketag = mock.Mock()
self.policy = unit_test_utils.FakePolicyEnforcer()
super(TestMetadefPolicy, self).setUp()
def test_md_namespace_not_allowed(self):
rules = {'get_metadef_namespace': False,
'get_metadef_namespaces': False,
'modify_metadef_namespace': False,
'add_metadef_namespace': False,
'delete_metadef_namespace': False}
self.policy.set_rules(rules)
mdns_repo = glance.api.policy.MetadefNamespaceRepoProxy(
MdNamespaceRepoStub(), {}, self.policy)
self.assertRaises(exception.Forbidden, mdns_repo.add, self.fakens)
self.assertRaises(exception.Forbidden, mdns_repo.get, self.fakens)
self.assertRaises(exception.Forbidden, mdns_repo.list)
self.assertRaises(exception.Forbidden, mdns_repo.remove, self.fakens)
self.assertRaises(exception.Forbidden, mdns_repo.save, self.fakens)
def test_md_namespace_allowed(self):
rules = {'get_metadef_namespace': True,
'get_metadef_namespaces': True,
'modify_metadef_namespace': True,
'add_metadef_namespace': True,
'delete_metadef_namespace': True}
self.policy.set_rules(rules)
mdns_repo = glance.api.policy.MetadefNamespaceRepoProxy(
MdNamespaceRepoStub(), {}, self.policy)
self.assertEqual(None, mdns_repo.add(self.fakens))
self.assertEqual('mdns_get',
mdns_repo.get(self.fakens).namespace_input)
self.assertEqual(['mdns_list'],
[ns.namespace_input for ns in mdns_repo.list()])
self.assertEqual('mdns_save',
mdns_repo.save(self.fakens).namespace_input)
self.assertEqual('mdns_remove',
mdns_repo.remove(self.fakens).namespace_input)
def test_md_object_not_allowed(self):
rules = {'get_metadef_object': False,
'get_metadef_objects': False,
'modify_metadef_object': False,
'add_metadef_object': False,
'delete_metadef_object': False}
self.policy.set_rules(rules)
mdobj_repo = glance.api.policy.MetadefObjectRepoProxy(
MdObjectRepoStub(), {}, self.policy)
self.assertRaises(exception.Forbidden, mdobj_repo.add, self.fakeobj)
self.assertRaises(exception.Forbidden, mdobj_repo.get, self.fakens,
self.fakeobj)
self.assertRaises(exception.Forbidden, mdobj_repo.list)
self.assertRaises(exception.Forbidden, mdobj_repo.remove, self.fakeobj)
self.assertRaises(exception.Forbidden, mdobj_repo.save, self.fakeobj)
def test_md_object_allowed(self):
rules = {'get_metadef_object': True,
'get_metadef_objects': True,
'modify_metadef_object': True,
'add_metadef_object': True,
'delete_metadef_object': True}
self.policy.set_rules(rules)
mdobj_repo = glance.api.policy.MetadefObjectRepoProxy(
MdObjectRepoStub(), {}, self.policy)
self.assertEqual(None, mdobj_repo.add(self.fakeobj))
self.assertEqual('mdobj_get',
mdobj_repo.get(self.fakens, 'fakeobj').meta_object)
self.assertEqual(['mdobj_list'],
[obj.meta_object for obj in mdobj_repo.list()])
self.assertEqual('mdobj_save',
mdobj_repo.save(self.fakeobj).meta_object)
self.assertEqual('mdobj_remove',
mdobj_repo.remove(self.fakeobj).meta_object)
def test_md_resource_type_not_allowed(self):
rules = {'get_metadef_resource_type': False,
'list_metadef_resource_types': False,
'add_metadef_resource_type_association': False,
'remove_metadef_resource_type_association': False}
self.policy.set_rules(rules)
mdrt_repo = glance.api.policy.MetadefResourceTypeRepoProxy(
MdResourceTypeRepoStub(), {}, self.policy)
self.assertRaises(exception.Forbidden, mdrt_repo.add, self.fakert)
self.assertRaises(exception.Forbidden, mdrt_repo.get, self.fakert)
self.assertRaises(exception.Forbidden, mdrt_repo.list)
self.assertRaises(exception.Forbidden, mdrt_repo.remove, self.fakert)
def test_md_resource_type_allowed(self):
rules = {'get_metadef_resource_type': True,
'list_metadef_resource_types': True,
'add_metadef_resource_type_association': True,
'remove_metadef_resource_type_association': True}
self.policy.set_rules(rules)
mdrt_repo = glance.api.policy.MetadefResourceTypeRepoProxy(
MdResourceTypeRepoStub(), {}, self.policy)
self.assertEqual(None, mdrt_repo.add(self.fakert))
self.assertEqual(
'mdrt_get', mdrt_repo.get(self.fakens,
'fakert').meta_resource_type)
self.assertEqual(['mdrt_list'],
[rt.meta_resource_type for rt in mdrt_repo.list()])
self.assertEqual('mdrt_remove',
mdrt_repo.remove(self.fakert).meta_resource_type)
def test_md_property_not_allowed(self):
rules = {'get_metadef_property': False,
'get_metadef_properties': False,
'modify_metadef_property': False,
'add_metadef_property': False,
'remove_metadef_property': False}
self.policy.set_rules(rules)
mdprop_repo = glance.api.policy.MetadefPropertyRepoProxy(
MdPropertyRepoStub(), {}, self.policy)
self.assertRaises(exception.Forbidden, mdprop_repo.add, self.fakeprop)
self.assertRaises(exception.Forbidden, mdprop_repo.get, self.fakens,
self.fakeprop)
self.assertRaises(exception.Forbidden, mdprop_repo.list)
self.assertRaises(exception.Forbidden, mdprop_repo.remove,
self.fakeprop)
self.assertRaises(exception.Forbidden, mdprop_repo.save, self.fakeprop)
def test_md_property_allowed(self):
rules = {'get_metadef_property': True,
'get_metadef_properties': True,
'modify_metadef_property': True,
'add_metadef_property': True,
'remove_metadef_property': True}
self.policy.set_rules(rules)
mdprop_repo = glance.api.policy.MetadefPropertyRepoProxy(
MdPropertyRepoStub(), {}, self.policy)
self.assertEqual(None, mdprop_repo.add(self.fakeprop))
self.assertEqual(
'mdprop_get', mdprop_repo.get(self.fakens,
'fakeprop').namespace_property)
self.assertEqual(['mdprop_list'],
[prop.namespace_property for prop
in mdprop_repo.list()])
self.assertEqual('mdprop_save',
mdprop_repo.save(self.fakeprop).namespace_property)
self.assertEqual('mdprop_remove',
mdprop_repo.remove(self.fakeprop).namespace_property)
def test_md_tag_not_allowed(self):
rules = {'get_metadef_tag': False,
'get_metadef_tags': False,
'modify_metadef_tag': False,
'add_metadef_tag': False,
'add_metadef_tags': False,
'delete_metadef_tag': False,
'delete_metadef_tags': False}
self.policy.set_rules(rules)
mdtag_repo = glance.api.policy.MetadefTagRepoProxy(
MdTagRepoStub(), {}, self.policy)
mdns_repo = glance.api.policy.MetadefNamespaceRepoProxy(
MdNamespaceRepoStub(), {}, self.policy)
self.assertRaises(exception.Forbidden, mdtag_repo.add, self.faketag)
self.assertRaises(exception.Forbidden, mdtag_repo.add_tags,
[self.faketag])
self.assertRaises(exception.Forbidden, mdtag_repo.get, self.fakens,
self.faketag)
self.assertRaises(exception.Forbidden, mdtag_repo.list)
self.assertRaises(exception.Forbidden, mdtag_repo.remove, self.faketag)
self.assertRaises(exception.Forbidden, mdns_repo.remove_tags,
self.fakens)
self.assertRaises(exception.Forbidden, mdtag_repo.save, self.faketag)
def test_md_tag_allowed(self):
rules = {'get_metadef_tag': True,
'get_metadef_tags': True,
'modify_metadef_tag': True,
'add_metadef_tag': True,
'add_metadef_tags': True,
'delete_metadef_tag': True,
'delete_metadef_tags': True}
self.policy.set_rules(rules)
mdtag_repo = glance.api.policy.MetadefTagRepoProxy(
MdTagRepoStub(), {}, self.policy)
mdns_repo = glance.api.policy.MetadefNamespaceRepoProxy(
MdNamespaceRepoStub(), {}, self.policy)
self.assertEqual(None, mdtag_repo.add(self.faketag))
self.assertEqual(None, mdtag_repo.add_tags([self.faketag]))
self.assertEqual('mdtag_get',
mdtag_repo.get(self.fakens, 'faketag').base)
self.assertEqual(['mdtag_list'],
[tag.base for tag in mdtag_repo.list()])
self.assertEqual('mdtag_save',
mdtag_repo.save(self.faketag).base)
self.assertEqual('mdtag_remove',
mdtag_repo.remove(self.faketag).base)
self.assertEqual('mdtags_remove',
mdns_repo.remove_tags(self.fakens).base)
class TestContextPolicyEnforcer(base.IsolatedUnitTest):
def _do_test_policy_influence_context_admin(self,

@ -109,7 +109,7 @@ class FakeGateway(object):
self.policy = policy
self.repo = repo
def get_repo(self, context, authorization_layer=True):
def get_repo(self, context):
return self.repo
@ -852,7 +852,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
response, image)
def test_download_failure_with_valid_range(self):
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as mock_get_data:
mock_get_data.side_effect = glance_store.NotFound(image="image")
request = wsgi.Request.blank('/')
@ -914,7 +914,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
download_failures_ContentRange('bytes 4-8/3')
def test_download_failure_with_valid_content_range(self):
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as mock_get_data:
mock_get_data.side_effect = glance_store.NotFound(image="image")
request = wsgi.Request.blank('/')
@ -949,7 +949,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
def get_data(*args, **kwargs):
raise exception.Forbidden()
self.mock_object(glance.api.policy.ImageProxy,
self.mock_object(glance.domain.proxy.Image,
'get_data',
get_data)
request = wsgi.Request.blank('/')
@ -968,7 +968,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
Make sure that serializer returns 204 no content error in case of
image data is not available at specified location.
"""
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as mock_get_data:
mock_get_data.side_effect = glance_store.NotFound(image="image")
@ -983,7 +983,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
def test_download_service_unavailable(self):
"""Test image download returns HTTPServiceUnavailable."""
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as mock_get_data:
mock_get_data.side_effect = glance_store.RemoteServiceUnavailable()
@ -1002,7 +1002,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
Make sure that serializer returns 400 bad request error in case of
getting images from this store is not supported at specified location.
"""
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as mock_get_data:
mock_get_data.side_effect = glance_store.StoreGetNotSupported()
@ -1022,7 +1022,7 @@ class TestImageDataSerializer(test_utils.BaseTestCase):
getting randomly images from this store is not supported at
specified location.
"""
with mock.patch.object(glance.api.policy.ImageProxy,
with mock.patch.object(glance.domain.proxy.Image,
'get_data') as m_get_data:
err = glance_store.StoreRandomGetNotSupported(offset=0,
chunk_size=0)

@ -66,7 +66,7 @@ class TestImageTagsController(base.IsolatedUnitTest):
image_repo = image_data_tests.FakeImageRepo()
image_repo.get = fake_get
def get_fake_repo(self, authorization_layer=False):
def get_fake_repo(self):
return image_repo
self.controller.gateway.get_repo = get_fake_repo

@ -3430,8 +3430,7 @@ class TestImagesController(base.IsolatedUnitTest):
# Make sure we passed an admin context to our task executor factory
mock_tef.assert_called_once_with(
request.context,
admin_context=mock_elevated.return_value,
authorization_layer=False)
admin_context=mock_elevated.return_value)
expected_input = {'image_id': UUID4,
'import_req': mock.ANY,

@ -443,14 +443,11 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
notifier=self.notifier,
policy_enforcer=self.policy)
req = unit_test_utils.get_fake_request(roles=['admin'])
ns_factory = fake_gateway.get_metadef_namespace_factory(
req.context)
ns_repo = fake_gateway.get_metadef_namespace_repo(req.context)
namespace = namespaces.Namespace()
namespace.namespace = 'FakeNamespace'
new_namespace = ns_factory.new_namespace(**namespace.to_dict())
ns_repo.add(new_namespace)
namespace = self.namespace_controller.create(req, namespace)
ns_repo = fake_gateway.get_metadef_namespace_repo(req.context)
self.namespace_controller._cleanup_namespace(ns_repo, namespace, True)
mock_log.debug.assert_called_with(
@ -458,22 +455,18 @@ class TestMetadefsControllers(base.IsolatedUnitTest):
{'namespace': namespace.namespace})
@mock.patch('glance.api.v2.metadef_namespaces.LOG')
@mock.patch('glance.api.authorization.MetadefNamespaceRepoProxy.remove')
@mock.patch('glance.notifier.MetadefNamespaceRepoProxy.remove')
def test_cleanup_namespace_exception(self, mock_remove, mock_log):
mock_remove.side_effect = Exception(u'Mock remove was called')
fake_gateway = glance.gateway.Gateway(db_api=self.db,
notifier=self.notifier,
policy_enforcer=self.policy)
req = unit_test_utils.get_fake_request(roles=['admin'])
ns_factory = fake_gateway.get_metadef_namespace_factory(
req.context)
ns_repo = fake_gateway.get_metadef_namespace_repo(req.context)
namespace = namespaces.Namespace()
namespace.namespace = 'FakeNamespace'
new_namespace = ns_factory.new_namespace(**namespace.to_dict())
ns_repo.add(new_namespace)
namespace = self.namespace_controller.create(req, namespace)
ns_repo = fake_gateway.get_metadef_namespace_repo(req.context)
self.namespace_controller._cleanup_namespace(ns_repo, namespace, True)
called_msg = 'Failed to delete namespace %(namespace)s.' \