Xena project personas for volume type access API

Implements the 3 Xena personas (no system scope).  Also introduces
a new policy 'volume_extension:volume_type_access:get_all_for_type'
that governs the GET /types/{type_id}/os-volume-type-access call.

Also removes an overriding policy definition from policy.yaml in
cinder/tests/unit/ that doesn't appear to be needed.

Co-authored-by: Lance Bragstad <lbragstad@gmail.com>
Co-authored-by: Brian Rosmaita <rosmaita.fossdev@gmail.com>

Change-Id: I6db266315efa02f08dfc0c069514f2ff9b8c551e
This commit is contained in:
Lance Bragstad 2020-10-28 16:35:08 +00:00 committed by Brian Rosmaita
parent 7c556df249
commit 5fc7df24a5
4 changed files with 287 additions and 15 deletions

View File

@ -40,7 +40,7 @@ class VolumeTypeAccessController(object):
def index(self, req, type_id): def index(self, req, type_id):
context = req.environ['cinder.context'] context = req.environ['cinder.context']
context.authorize(policy.TYPE_ACCESS_POLICY) context.authorize(policy.TYPE_ACCESS_WHO_POLICY)
# Not found exception will be handled at the wsgi level # Not found exception will be handled at the wsgi level
vol_type = volume_types.get_volume_type( vol_type = volume_types.get_volume_type(
@ -77,6 +77,7 @@ class VolumeTypeActionController(wsgi.Controller):
vol_type = req.cached_resource_by_id(type_id, name='types') vol_type = req.cached_resource_by_id(type_id, name='types')
self._extend_vol_type(vol_type_rval, vol_type) self._extend_vol_type(vol_type_rval, vol_type)
# TODO: remove this, there is no /types/detail call for this to extend
@wsgi.extends @wsgi.extends
def detail(self, req, resp_obj): def detail(self, req, resp_obj):
context = req.environ['cinder.context'] context = req.environ['cinder.context']

View File

@ -22,21 +22,37 @@ ADD_PROJECT_POLICY = "volume_extension:volume_type_access:addProjectAccess"
REMOVE_PROJECT_POLICY = \ REMOVE_PROJECT_POLICY = \
"volume_extension:volume_type_access:removeProjectAccess" "volume_extension:volume_type_access:removeProjectAccess"
TYPE_ACCESS_POLICY = "volume_extension:volume_type_access" TYPE_ACCESS_POLICY = "volume_extension:volume_type_access"
TYPE_ACCESS_WHO_POLICY = "volume_extension:volume_type_access:get_all_for_type"
deprecated_volume_type_access = base.CinderDeprecatedRule(
name=TYPE_ACCESS_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_type_access_who_policy = base.CinderDeprecatedRule(
name=TYPE_ACCESS_WHO_POLICY,
# TODO: revise check_str and dep_reason in Yoga
check_str=TYPE_ACCESS_POLICY,
deprecated_reason=(
f"Reason: '{TYPE_ACCESS_WHO_POLICY}' is a new policy that protects "
f"an API call formerly governed by '{TYPE_ACCESS_POLICY}', but which "
'has been separated for finer-grained policy control.'),
)
volume_access_policies = [ volume_access_policies = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=TYPE_ACCESS_POLICY, name=TYPE_ACCESS_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Volume type access related APIs.", description=(
"Adds the boolean field 'os-volume-type-access:is_public' to "
'the responses for these API calls. The ability to make these '
'calls is governed by other policies.'),
operations=[ operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/types' 'path': '/types'
}, },
{
'method': 'GET',
'path': '/types/detail'
},
{ {
'method': 'GET', 'method': 'GET',
'path': '/types/{type_id}' 'path': '/types/{type_id}'
@ -45,7 +61,9 @@ volume_access_policies = [
'method': 'POST', 'method': 'POST',
'path': '/types' 'path': '/types'
} }
]), ],
deprecated_rule=deprecated_volume_type_access,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=ADD_PROJECT_POLICY, name=ADD_PROJECT_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -66,6 +84,20 @@ volume_access_policies = [
'path': '/types/{type_id}/action (removeProjectAccess)' 'path': '/types/{type_id}/action (removeProjectAccess)'
} }
]), ]),
policy.DocumentedRuleDefault(
name=TYPE_ACCESS_WHO_POLICY,
check_str=base.RULE_ADMIN_API,
description=(
'List private volume type access detail, that is, list the '
'projects that have access to this volume type.'),
operations=[
{
'method': 'GET',
'path': '/types/{type_id}/os-volume-type-access'
}
],
deprecated_rule=deprecated_type_access_who_policy,
),
] ]

View File

@ -0,0 +1,246 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from copy import deepcopy
import ddt
from cinder.api.contrib import volume_type_access as vta
from cinder.api import microversions as mv
from cinder import objects
from cinder.policies import volume_access as vta_policies
from cinder.tests.unit.api.contrib import test_volume_type_access as vta_test
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit.policies import base
IS_PUBLIC_FIELD = 'os-volume-type-access:is_public'
# the original uses a class var and admin context
class FakeRequest(vta_test.FakeRequest):
def __init__(self, context):
self.environ = {"cinder.context": context}
FAKE_RESP_OBJ = {
'volume_type': {'id': fake.VOLUME_TYPE_ID},
'volume_types': [
{'id': fake.VOLUME_TYPE_ID},
{'id': fake.VOLUME_TYPE3_ID}
]}
# need an instance var so this will work with ddt
class FakeResponse(vta_test.FakeResponse):
def __init__(self):
self.obj = deepcopy(FAKE_RESP_OBJ)
@ddt.ddt
class VolumeTypeAccessFieldPolicyTest(base.BasePolicyTest):
# NOTE: We are testing directly against the extension controller.
# Its call to context.authorize doesn't provide a target, so
# "is_admin" or "project_id:%(project_id)s" always matches.
authorized_users = [
'legacy_admin',
'project_admin',
'system_admin',
'legacy_owner',
'project_member',
'project_reader',
'project_foo',
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
# note: authorize is called with fatal=False, so everyone is a winner!
everyone = authorized_users
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.controller = vta.VolumeTypeActionController()
self.rule_name = vta_policies.TYPE_ACCESS_POLICY
self.api_version = mv.BASE_VERSION
self.api_path = f'/v3/{self.project_id}/types'
@ddt.data(*base.all_users)
def test_type_access_policy_types_list(self, user_id):
unauthorized_exceptions = None
req = FakeRequest(self.create_context(user_id))
resp = FakeResponse()
self.common_policy_check(user_id,
self.everyone,
[],
unauthorized_exceptions,
self.rule_name,
self.controller.index,
req,
resp)
# this is where the real check happens
if user_id in self.authorized_users:
for vol_type in resp.obj['volume_types']:
self.assertIn(IS_PUBLIC_FIELD, vol_type)
else:
for vol_type in resp.obj['volume_types']:
self.assertNotIn(IS_PUBLIC_FIELD, vol_type)
@ddt.data(*base.all_users)
def test_type_access_policy_type_show(self, user_id):
unauthorized_exceptions = None
req = FakeRequest(self.create_context(user_id))
resp = FakeResponse()
self.common_policy_check(user_id,
self.everyone,
[],
unauthorized_exceptions,
self.rule_name,
self.controller.show,
req,
resp,
fake.VOLUME_TYPE_ID)
if user_id in self.authorized_users:
self.assertIn(IS_PUBLIC_FIELD, resp.obj['volume_type'])
else:
self.assertNotIn(IS_PUBLIC_FIELD, resp.obj['volume_type'])
@ddt.data(*base.all_users)
def test_type_access_policy_type_create(self, user_id):
unauthorized_exceptions = None
req = FakeRequest(self.create_context(user_id))
resp = FakeResponse()
body = None
self.common_policy_check(user_id,
self.everyone,
[],
unauthorized_exceptions,
self.rule_name,
self.controller.create,
req,
body,
resp)
if user_id in self.authorized_users:
self.assertIn(IS_PUBLIC_FIELD, resp.obj['volume_type'])
else:
self.assertNotIn(IS_PUBLIC_FIELD, resp.obj['volume_type'])
class VolumeTypeAccessFieldPolicySecureRbacTest(
VolumeTypeAccessFieldPolicyTest):
# Remember that we are testing directly against the extension controller,
# so while the below may seem over-permissive, in real life there is
# a more selective check that happens first.
authorized_users = [
'legacy_admin',
'project_admin',
'system_admin',
'project_member',
'system_member',
'other_project_member',
]
# this will be anyone without the 'admin' or 'member' role
unauthorized_users = [
'legacy_owner',
'project_foo',
'project_reader',
'system_reader',
'system_foo',
'other_project_reader',
]
everyone = authorized_users + unauthorized_users
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)
@ddt.ddt
class VolumeTypeAccessListProjectsPolicyTest(base.BasePolicyTest):
authorized_users = [
'legacy_admin',
'project_admin',
'system_admin',
]
unauthorized_users = [
'legacy_owner',
'project_member',
'project_reader',
'project_foo',
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.controller = vta.VolumeTypeAccessController()
self.volume_type = objects.VolumeType(
self.project_admin_context,
name='private_volume_type',
is_public=False,
description='volume type for srbac testing',
extra_specs=None,
projects=[self.project_id, self.project_id_other])
self.volume_type.create()
self.addCleanup(self.volume_type.destroy)
self.api_version = mv.BASE_VERSION
self.api_path = (f'/v3/{self.project_id}/types/'
f'{self.volume_type.id}/os-volume-type-access')
@ddt.data(*base.all_users)
def test_type_access_who_policy(self, user_id):
"""Test policy for listing projects with access to a volume type."""
rule_name = vta_policies.TYPE_ACCESS_WHO_POLICY
unauthorized_exceptions = None
req = fake_api.HTTPRequest.blank(self.api_path)
self.common_policy_check(user_id,
self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller.index,
req,
self.volume_type.id)
class VolumeTypeAccessListProjectsPolicySecureRbacTest(
VolumeTypeAccessListProjectsPolicyTest):
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)

View File

@ -139,13 +139,6 @@
# DELETE /types # DELETE /types
"volume_extension:types_manage": "" "volume_extension:types_manage": ""
# Volume type access related APIs.
# GET /types
# GET /types/detail
# GET /types/{type_id}
# POST /types
"volume_extension:volume_type_access": ""
# Revert a volume to a snapshot. # Revert a volume to a snapshot.
# POST /volumes/{volume_id}/action (revert) # POST /volumes/{volume_id}/action (revert)
"volume:revert_to_snapshot": "" "volume:revert_to_snapshot": ""