Implement project personas for backups API

Co-Authored-by: Alan Bishop <abishop@redhat.com>

Change-Id: I7a71f796af13ab8ada12bfd8407da04fe9ba28a2
This commit is contained in:
Lance Bragstad 2021-08-31 15:47:37 -07:00 committed by Alan Bishop
parent 3493493703
commit 64ba8e08a4
3 changed files with 484 additions and 32 deletions

View File

@ -28,10 +28,37 @@ IMPORT_POLICY = 'backup:backup-import'
EXPORT_POLICY = 'backup:export-import' EXPORT_POLICY = 'backup:export-import'
BACKUP_ATTRIBUTES_POLICY = 'backup:backup_project_attribute' BACKUP_ATTRIBUTES_POLICY = 'backup:backup_project_attribute'
deprecated_get_all_policy = base.CinderDeprecatedRule(
name=GET_ALL_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
)
deprecated_get_policy = base.CinderDeprecatedRule(
name=GET_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER,
)
deprecated_create_policy = base.CinderDeprecatedRule(
name=CREATE_POLICY,
check_str=""
)
deprecated_update_policy = base.CinderDeprecatedRule(
name=UPDATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_delete_policy = base.CinderDeprecatedRule(
name=DELETE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_restore_policy = base.CinderDeprecatedRule(
name=RESTORE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
backups_policies = [ backups_policies = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=GET_ALL_POLICY, name=GET_ALL_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_READER_OR_PROJECT_READER,
description="List backups.", description="List backups.",
operations=[ operations=[
{ {
@ -42,7 +69,9 @@ backups_policies = [
'method': 'GET', 'method': 'GET',
'path': '/backups/detail' 'path': '/backups/detail'
} }
]), ],
deprecated_rule=deprecated_get_all_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=BACKUP_ATTRIBUTES_POLICY, name=BACKUP_ATTRIBUTES_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -56,57 +85,68 @@ backups_policies = [
'method': 'GET', 'method': 'GET',
'path': '/backups/detail' 'path': '/backups/detail'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=CREATE_POLICY, name=CREATE_POLICY,
check_str="", check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Create backup.", description="Create backup.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/backups' 'path': '/backups'
} }
]), ],
deprecated_rule=deprecated_create_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=GET_POLICY, name=GET_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_READER_OR_PROJECT_READER,
description="Show backup.", description="Show backup.",
operations=[ operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/backups/{backup_id}' 'path': '/backups/{backup_id}'
} }
]), ],
deprecated_rule=deprecated_get_policy
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=UPDATE_POLICY, name=UPDATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Update backup.", description="Update backup.",
operations=[ operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/backups/{backup_id}' 'path': '/backups/{backup_id}'
} }
]), ],
deprecated_rule=deprecated_update_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=DELETE_POLICY, name=DELETE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Delete backup.", description="Delete backup.",
operations=[ operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/backups/{backup_id}' 'path': '/backups/{backup_id}'
} }
]), ],
deprecated_rule=deprecated_delete_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=RESTORE_POLICY, name=RESTORE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Restore backup.", description="Restore backup.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/backups/{backup_id}/restore' 'path': '/backups/{backup_id}/restore'
} }
]), ],
deprecated_rule=deprecated_restore_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=IMPORT_POLICY, name=IMPORT_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -116,7 +156,8 @@ backups_policies = [
'method': 'POST', 'method': 'POST',
'path': '/backups/{backup_id}/import_record' 'path': '/backups/{backup_id}/import_record'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=EXPORT_POLICY, name=EXPORT_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -126,7 +167,8 @@ backups_policies = [
'method': 'POST', 'method': 'POST',
'path': '/backups/{backup_id}/export_record' 'path': '/backups/{backup_id}/export_record'
} }
]), ],
),
] ]

View File

@ -0,0 +1,427 @@
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from cinder.api import microversions as mv
from cinder.api.v3 import backups
from cinder import exception
from cinder.objects import fields
from cinder.policies import backups as backups_policies
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit.policies import base
from cinder.tests.unit import utils as test_utils
@ddt.ddt
class BackupsPolicyTest(base.BasePolicyTest):
authorized_readers = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_readers = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_members = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_members = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_admins = [
'legacy_admin',
'system_admin',
'project_admin',
]
unauthorized_admins = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
def setUp(self, enforce_scope=False, enforce_new_defaults=False, *args,
**kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.override_config('backup_use_same_host', True)
self.controller = backups.BackupsController()
self.api_path = '/v3/%s/backups' % (self.project_id)
self.api_version = mv.BASE_VERSION
def _create_backup(self):
backup = test_utils.create_backup(self.project_member_context,
status=fields.BackupStatus.AVAILABLE,
size=1)
self.addCleanup(backup.destroy)
return backup
@ddt.data(*base.all_users)
def test_get_all_backups_policy(self, user_id):
self._create_backup()
rule_name = backups_policies.GET_ALL_POLICY
url = self.api_path
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
# Generally, any logged in user can list all backups.
authorized_users = [user_id]
unauthorized_users = []
# The exception is when deprecated rules are disabled, in which case
# roles are enforced. Users without the 'reader' role should be
# blocked.
if self.enforce_new_defaults:
context = self.create_context(user_id)
if 'reader' not in context.roles:
authorized_users = []
unauthorized_users = [user_id]
response = self.common_policy_check(user_id, authorized_users,
unauthorized_users, [],
rule_name,
self.controller.index, req)
# For some users, even if they're authorized, the list of backups
# will be empty if they are not in the backup's project.
empty_response_users = [
*self.unauthorized_readers,
# legacy_admin and system_admin do not have a project_id, and
# so the list of backups returned will be empty.
'legacy_admin',
'system_admin',
]
backups = response['backups'] if response else []
backup_count = 0 if user_id in empty_response_users else 1
self.assertEqual(backup_count, len(backups))
@ddt.data(*base.all_users)
def test_get_backup_policy(self, user_id):
backup_id = self._create_backup().id
rule_name = backups_policies.GET_POLICY
url = '%s/%s' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
unauthorized_exceptions = [
exception.BackupNotFound,
]
self.common_policy_check(user_id, self.authorized_readers,
self.unauthorized_readers,
unauthorized_exceptions,
rule_name, self.controller.show, req,
id=backup_id)
@ddt.data(*base.all_users)
def test_create_backup_policy(self, user_id):
volume = test_utils.create_volume(self.project_member_context,
testcase_instance=self)
rule_name = backups_policies.CREATE_POLICY
url = self.api_path
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"backup": {
"container": None,
"description": None,
"name": "backup001",
"volume_id": volume.id,
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name, self.controller.create, req,
body=body)
@ddt.data(*base.all_users)
def test_update_backup_policy(self, user_id):
backup_id = self._create_backup().id
rule_name = backups_policies.UPDATE_POLICY
url = '%s/%s' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url, version=mv.BACKUP_UPDATE)
req.method = 'PUT'
body = {
"backup": {
"name": "backup666",
}
}
# Relax the GET_POLICY in order to get past that check.
self.policy.set_rules({backups_policies.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.BackupNotFound,
]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name, self.controller.update, req,
id=backup_id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.backup.api.API._is_backup_service_enabled',
return_value=True)
def test_delete_backup_policy(self, user_id, mock_backup_service_enabled):
backup_id = self._create_backup().id
rule_name = backups_policies.DELETE_POLICY
url = '%s/%s' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'DELETE'
# Relax the GET_POLICY in order to get past that check.
self.policy.set_rules({backups_policies.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.BackupNotFound,
]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name, self.controller.delete, req,
id=backup_id)
@ddt.data(*base.all_users)
@mock.patch('cinder.backup.api.API._is_backup_service_enabled',
return_value=True)
@mock.patch('cinder.backup.rpcapi.BackupAPI.restore_backup')
def test_restore_backup_policy(self, user_id,
mock_backup_restore,
mock_backup_service_enabled):
backup_id = self._create_backup().id
volume = test_utils.create_volume(self.project_member_context,
testcase_instance=self)
rule_name = backups_policies.RESTORE_POLICY
url = '%s/%s' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"restore": {
"volume_id": volume.id
}
}
# Relax the GET_POLICY in order to get past that check.
self.policy.set_rules({backups_policies.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.BackupNotFound,
]
self.common_policy_check(user_id, self.authorized_members,
self.unauthorized_members,
unauthorized_exceptions,
rule_name, self.controller.restore, req,
id=backup_id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.backup.api.API._list_backup_hosts')
@mock.patch('cinder.backup.api.API._get_import_backup')
@mock.patch('cinder.backup.rpcapi.BackupAPI.import_record')
def test_import_backup_policy(self, user_id,
mock_import_record,
mock_get_import_backup,
mock_list_backup_hosts):
def _list_backup_hosts(*args):
return ['backup-host']
def _get_import_backup(*args):
return self._create_backup()
mock_list_backup_hosts.side_effect = _list_backup_hosts
mock_get_import_backup.side_effect = _get_import_backup
rule_name = backups_policies.IMPORT_POLICY
url = '%s/import_record' % (self.api_path)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'PUT'
body = {
"backup-record": {
"backup_service": "backup-host",
"backup_url": "eyJzdGF0"
}
}
unauthorized_exceptions = []
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name, self.controller.import_record, req,
body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.backup.api.API._get_available_backup_service_host',
return_value='backup-host')
@mock.patch('cinder.backup.rpcapi.BackupAPI.export_record',
return_value={
"backup_service": "backup-host",
"backup_url": "eyJzdGF0"
})
def test_export_backup_policy(self, user_id,
mock_export_record,
mock_get_backup_service_host):
backup_id = self._create_backup().id
rule_name = backups_policies.EXPORT_POLICY
url = '%s/%s/export_record' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
# Relax the GET_POLICY in order to get past that check.
self.policy.set_rules({backups_policies.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.BackupNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name, self.controller.export_record, req,
id=backup_id)
@ddt.data(*base.all_users)
def test_backup_attributes_policy(self, user_id):
backup_id = self._create_backup().id
# Although we're testing the BACKUP_ATTRIBUTES_POLICY, unauthorized
# readers will (correctly) fail on the GET_POLICY. For authorized
# readers, later we'll test the response to verify the
# BACKUP_ATTRIBUTES_POLICY is properly enforced.
rule_name = backups_policies.GET_POLICY
url = '%s/%s' % (self.api_path, backup_id)
req = fake_api.HTTPRequest.blank(url,
version=mv.BACKUP_PROJECT_USER_ID)
unauthorized_exceptions = [
exception.BackupNotFound,
]
response = self.common_policy_check(user_id, self.authorized_readers,
self.unauthorized_readers,
unauthorized_exceptions,
rule_name, self.controller.show,
req, id=backup_id)
if user_id in self.authorized_readers:
# Check whether the backup record includes a user_id. Only
# authorized_admins should see one.
backup_user_id = response['backup'].get('user_id', None)
if user_id in self.authorized_admins:
self.assertIsNotNone(backup_user_id)
else:
self.assertIsNone(backup_user_id)
class BackupsPolicySecureRbacTest(BackupsPolicyTest):
authorized_readers = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
'project_reader',
]
unauthorized_readers = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_foo',
'other_project_member',
'other_project_reader',
]
authorized_members = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
]
unauthorized_members = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
# NOTE(Xena): The authorized_admins and unauthorized_admins are the same
# as the BackupsPolicyTest's. This is because in Xena the "admin only"
# rules are the legacy RULE_ADMIN_API. This will change in Yoga, when
# RULE_ADMIN_API will be deprecated in favor of the SYSTEM_ADMIN rule that
# is scope based.
authorized_admins = [
'legacy_admin',
'system_admin',
'project_admin',
]
unauthorized_admins = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)

View File

@ -19,23 +19,6 @@
# DELETE /snapshots/{snapshot_id}/metadata/{key} # DELETE /snapshots/{snapshot_id}/metadata/{key}
"volume:delete_snapshot_metadata": "" "volume:delete_snapshot_metadata": ""
# List backups.
# GET /backups
# GET /backups/detail
"backup:get_all": ""
# Show backup.
# GET /backups/{backup_id}
"backup:get": ""
# Delete backup.
# DELETE /backups/{backup_id}
"backup:delete": ""
# Restore backup.
# POST /backups/{backup_id}/restore
"backup:restore": ""
# List group snapshots. # List group snapshots.
# GET /group_snapshots # GET /group_snapshots
# GET /group_snapshots/detail # GET /group_snapshots/detail