Add infrastructure for testing new RBAC policies

A new BasePolicyTest class (heavily borrowed from nova) provides
machinery for verifying policies against a list of contexts for which
an action should be authorized, and then again against a list
of contexts for which the action should be unauthorized. The set of
contexts map to the OpenStack RBAC personas.

Cinder's context.authorize() function is updated to ensure the correct
project_id is used when authenticating against a target object.

A unit test utility function "attach_volume" is fixed to correctly
use an elevated context when creating an attachment DB entry.

Change-Id: Ic8d9d3608ad4eb2c3188d04d59605f3b5a768e30
This commit is contained in:
Alan Bishop 2021-08-31 15:41:17 -07:00
parent 7fb4f272cd
commit 81e0da35dc
6 changed files with 316 additions and 3 deletions

View File

@ -224,6 +224,19 @@ class RequestContext(context.RequestContext):
# Turn object into dict so target.update can work # Turn object into dict so target.update can work
target.update( target.update(
target_obj.obj_to_primitive()['versioned_object.data'] or {}) target_obj.obj_to_primitive()['versioned_object.data'] or {})
# Ensure 'project_id' and 'user_id' attributes are captured.
# Some objects (e.g. attachments) have a project_id attribute
# that isn't present in the dict. The try/except wrappers avoid
# lazy-load issues when the attribute doesn't exist.
try:
target['project_id'] = target_obj.project_id
except Exception:
pass
try:
target['user_id'] = target_obj.user_id
except Exception:
pass
else: else:
target.update(target_obj or {}) target.update(target_obj or {})

View File

@ -41,7 +41,8 @@ def reset():
_ENFORCER = None _ENFORCER = None
def init(use_conf=True): def init(use_conf=True,
suppress_deprecation_warnings=False):
"""Init an Enforcer class. """Init an Enforcer class.
:param use_conf: Whether to load rules from config file. :param use_conf: Whether to load rules from config file.
@ -53,6 +54,7 @@ def init(use_conf=True):
CONF, CONF,
use_conf=use_conf, use_conf=use_conf,
fallback_to_json_file=False) fallback_to_json_file=False)
_ENFORCER.suppress_deprecation_warnings = suppress_deprecation_warnings
register_rules(_ENFORCER) register_rules(_ENFORCER)
_ENFORCER.load_rules() _ENFORCER.load_rules()

View File

@ -21,8 +21,13 @@ import os
import warnings import warnings
import fixtures import fixtures
from oslo_config import cfg
from oslo_policy import policy as oslo_policy
from oslo_privsep import daemon as privsep_daemon from oslo_privsep import daemon as privsep_daemon
import cinder.policy
CONF = cfg.CONF
_TRUE_VALUES = ('True', 'true', '1', 'yes') _TRUE_VALUES = ('True', 'true', '1', 'yes')
@ -158,3 +163,23 @@ class PrivsepNoHelperFixture(fixtures.Fixture):
self.useFixture(fixtures.MonkeyPatch( self.useFixture(fixtures.MonkeyPatch(
'oslo_privsep.daemon.RootwrapClientChannel', 'oslo_privsep.daemon.RootwrapClientChannel',
UnHelperfulClientChannel)) UnHelperfulClientChannel))
class PolicyFixture(fixtures.Fixture):
"""Load the live policy for tests.
A base policy fixture that starts with the assumption that you'd
like to load and enforce the shipped default policy in tests.
"""
def setUp(self):
super().setUp()
cinder.policy.reset()
# Suppress deprecation warnings for unit tests.
cinder.policy.init(suppress_deprecation_warnings=True)
self.addCleanup(cinder.policy.reset)
def set_rules(self, rules, overwrite=True):
policy = cinder.policy._ENFORCER
policy.set_rules(oslo_policy.Rules.from_dict(rules),
overwrite=overwrite)

View File

@ -0,0 +1,190 @@
# Copyright 2021 Red Hat, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from oslo_utils.fixture import uuidsentinel as uuids
from cinder import context as cinder_context
from cinder import exception
from cinder.tests import fixtures
from cinder.tests.unit import test
LOG = logging.getLogger(__name__)
# The list of users, with characterstics/persona implied by the name,
# are declared statically for use as DDT data.
all_users = [
'legacy_admin',
'legacy_owner',
'system_admin',
# NOTE: Xena does not support these system scoped personae. They need
# to be tested in Yoga when support is added for system scope.
# 'system_member',
# 'system_reader',
# 'system_foo',
'project_admin',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
class BasePolicyTest(test.TestCase):
def setUp(self, enforce_scope, enforce_new_defaults, *args, **kwargs):
super().setUp(*args, **kwargs)
self.enforce_scope = enforce_scope
self.enforce_new_defaults = enforce_new_defaults
self.override_config('enforce_scope',
enforce_scope, 'oslo_policy')
self.override_config('enforce_new_defaults',
enforce_new_defaults, 'oslo_policy')
self.policy = self.useFixture(fixtures.PolicyFixture())
self.admin_project_id = uuids.admin_project_id
self.project_id = uuids.project_id
self.project_id_other = uuids.project_id_other
self.context_details = {
'legacy_admin': dict(
project_id=self.admin_project_id,
roles=['admin', 'member', 'reader'],
),
'legacy_owner': dict(
project_id=self.project_id,
roles=[],
),
'system_admin': dict(
roles=['admin', 'member', 'reader'],
# NOTE: The system_admin in Xena is project scoped, and will
# change in Yoga when support is added for system scope.
project_id=self.admin_project_id,
# system_scope='all',
),
'project_admin': dict(
project_id=self.project_id,
roles=['admin', 'member', 'reader'],
),
'project_member': dict(
project_id=self.project_id,
roles=['member', 'reader'],
),
'project_reader': dict(
project_id=self.project_id,
roles=['reader'],
),
'project_foo': dict(
project_id=self.project_id,
roles=['foo'],
),
'other_project_member': dict(
project_id=self.project_id_other,
roles=['member', 'reader'],
),
'other_project_reader': dict(
project_id=self.project_id_other,
roles=['reader'],
),
}
# These context objects are useful for subclasses to create test
# resources (e.g. volumes). Subclasses may create additional
# contexts as needed.
self.project_admin_context = self.create_context('project_admin')
self.project_member_context = self.create_context('project_member')
def is_authorized(self, user_id, authorized_users, unauthorized_users):
if user_id in authorized_users:
return True
elif user_id in unauthorized_users:
return False
else:
msg = ('"%s" must be either an authorized or unauthorized user.'
% (user_id))
raise exception.CinderException(message=msg)
def create_context(self, user_id):
try:
details = self.context_details[user_id]
except KeyError:
msg = ('No context details defined for user_id "%s".' % (user_id))
raise exception.CinderException(message=msg)
return cinder_context.RequestContext(user_id=user_id, **details)
def common_policy_check(self, user_id, authorized_users,
unauthorized_users, unauthorized_exceptions,
rule_name, func, req, *args, **kwargs):
req.environ['cinder.context'] = self.create_context(user_id)
fatal = kwargs.pop('fatal', True)
def ensure_raises(req, *args, **kwargs):
try:
func(req, *args, **kwargs)
except exception.NotAuthorized as exc:
# In case of multi-policy APIs, PolicyNotAuthorized can be
# raised from either of the policy so checking the error
# message, which includes the rule name, can mismatch. Tests
# verifying the multi policy can pass rule_name as None to
# skip the error message assert.
if (isinstance(exc, exception.PolicyNotAuthorized) and
rule_name is not None):
self.assertEqual(
"Policy doesn't allow %s to be performed." %
rule_name, exc.args[0])
except Exception as exc:
self.assertIn(type(exc), unauthorized_exceptions)
else:
msg = ('"%s" was authorized for "%s" policy when it should '
'be unauthorized.' % (user_id, rule_name))
raise exception.CinderException(message=msg)
return None
if self.is_authorized(user_id, authorized_users, unauthorized_users):
# Verify the context having allowed scope and roles pass
# the policy check.
LOG.info('Testing authorized "%s"', user_id) # noqa: ignore=C309
response = func(req, *args, **kwargs)
else:
# Verify the context not having allowed scope or roles fail
# the policy check.
LOG.info('Testing unauthorized "%s"', user_id) # noqa: ignore=C309
if not fatal:
try:
response = func(req, *args, **kwargs)
# We need to ignore the PolicyNotAuthorized
# exception here so that we can add the correct response
# in unauthorize_response for the case of fatal=False.
# This handle the case of multi policy checks where tests
# are verifying the second policy via the response of
# fatal-False and ignoring the response checks where the
# first policy itself fail to pass (even test override the
# first policy to allow for everyone but still, scope
# checks can leads to PolicyNotAuthorized error).
# For example: flavor extra specs policy for GET flavor
# API. In that case, flavor extra spec policy is checked
# after the GET flavor policy. So any context failing on
# GET flavor will raise the PolicyNotAuthorized and for
# that case we do not have any way to verify the flavor
# extra specs so skip that context to check in test.
except exception.PolicyNotAuthorized:
pass
else:
response = ensure_raises(req, *args, **kwargs)
return response

View File

@ -16,8 +16,11 @@
from unittest import mock from unittest import mock
import ddt import ddt
from oslo_policy import policy as oslo_policy
from cinder import context from cinder import context
from cinder.objects import base as objects_base
from cinder import policy
from cinder.tests.unit import test from cinder.tests.unit import test
@ -129,3 +132,83 @@ class ContextTestCase(test.TestCase):
'222', '222',
roles=roles) roles=roles)
self.assertEqual(roles, ctxt.roles) self.assertEqual(roles, ctxt.roles)
@ddt.ddt
class ContextAuthorizeTestCase(test.TestCase):
def setUp(self):
super(ContextAuthorizeTestCase, self).setUp()
rules = [
oslo_policy.RuleDefault("test:something",
"project_id:%(project_id)s"),
]
policy.reset()
policy.init()
# before a policy rule can be used, its default has to be registered.
policy._ENFORCER.register_defaults(rules)
self.context = context.RequestContext(user_id='me',
project_id='my_project')
self.addCleanup(policy.reset)
def _dict_target_obj(project_id):
return {
'user_id': 'me',
'project_id': project_id,
}
def _real_target_obj(project_id):
target_obj = objects_base.CinderObject()
target_obj.user_id = 'me'
target_obj.project_id = project_id
return target_obj
@ddt.data(
{
# PASS: target inherits 'my_project' from target_obj dict
'target': None,
'target_obj': _dict_target_obj('my_project'),
'expected': True,
},
{
# FAIL: target inherits 'other_project' from target_obj dict
'target': None,
'target_obj': _dict_target_obj('other_project'),
'expected': False,
},
{
# PASS: target inherits 'my_project' from target_obj object
'target': None,
'target_obj': _real_target_obj('my_project'),
'expected': True,
},
{
# FAIL: target inherits 'other_project' from target_obj object
'target': None,
'target_obj': _real_target_obj('other_project'),
'expected': False,
},
{
# PASS: target specifies 'my_project'
'target': {'project_id': 'my_project'},
'target_obj': None,
'expected': True,
},
{
# FAIL: target specifies 'other_project'
'target': {'project_id': 'other_project'},
'target_obj': None,
'expected': False,
},
{
# PASS: target inherits 'my_project' from the context
'target': None,
'target_obj': None,
'expected': True,
},
)
@ddt.unpack
def test_authorize(self, target, target_obj, expected):
result = self.context.authorize("test:something",
target, target_obj, fatal=False)
self.assertEqual(result, expected)

View File

@ -115,9 +115,9 @@ def attach_volume(ctxt, volume_id, instance_uuid, attached_host,
values['mountpoint'] = mountpoint values['mountpoint'] = mountpoint
values['attach_time'] = now values['attach_time'] = now
attachment = db.volume_attach(ctxt, values) attachment = db.volume_attach(ctxt.elevated(), values)
volume, updated_values = db.volume_attached( volume, updated_values = db.volume_attached(
ctxt, attachment['id'], instance_uuid, ctxt.elevated(), attachment['id'], instance_uuid,
attached_host, mountpoint, mode) attached_host, mountpoint, mode)
return volume return volume