Implement project personas for volume actions

The unit tests added for the volume actions policies required a
couple of updates to other unit tests:
- A request context was added to test_volume_revert_update_status_failed
- Several VolumeTransferTestCase tests were updated to use a context
  when mocking volume notifications. This is necessary to prevent
  other tests running in parallel that generate their own notifications
  from affecting the VolumeTransferTestCase tests.

Change-Id: Iba63f8d88f2fc8283e9f6b5c4b03f2045865f1af
This commit is contained in:
Alan Bishop 2021-09-01 11:15:59 -07:00 committed by Brian Rosmaita
parent e76746f805
commit 639224f903
5 changed files with 934 additions and 139 deletions

View File

@ -40,37 +40,100 @@ ROLL_DETACHING_POLICY = "volume_extension:volume_actions:roll_detaching"
TERMINATE_POLICY = "volume_extension:volume_actions:terminate_connection" TERMINATE_POLICY = "volume_extension:volume_actions:terminate_connection"
INITIALIZE_POLICY = "volume_extension:volume_actions:initialize_connection" INITIALIZE_POLICY = "volume_extension:volume_actions:initialize_connection"
deprecated_extend_policy = base.CinderDeprecatedRule(
name=EXTEND_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_extend_attached_policy = base.CinderDeprecatedRule(
name=EXTEND_ATTACHED_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_revert_policy = base.CinderDeprecatedRule(
name=REVERT_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_retype_policy = base.CinderDeprecatedRule(
name=RETYPE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_update_only_policy = base.CinderDeprecatedRule(
name=UPDATE_READONLY_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_upload_image_policy = base.CinderDeprecatedRule(
name=UPLOAD_IMAGE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_initialize_policy = base.CinderDeprecatedRule(
name=INITIALIZE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_terminate_policy = base.CinderDeprecatedRule(
name=TERMINATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_roll_detaching_policy = base.CinderDeprecatedRule(
name=ROLL_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_reserve_policy = base.CinderDeprecatedRule(
name=RESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_unreserve_policy = base.CinderDeprecatedRule(
name=UNRESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_begin_detaching_policy = base.CinderDeprecatedRule(
name=BEGIN_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_attach_policy = base.CinderDeprecatedRule(
name=ATTACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_detach_policy = base.CinderDeprecatedRule(
name=DETACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER
)
volume_action_policies = [ volume_action_policies = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=EXTEND_POLICY, name=EXTEND_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Extend a volume.", description="Extend a volume.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-extend)' 'path': '/volumes/{volume_id}/action (os-extend)'
} }
]), ],
deprecated_rule=deprecated_extend_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=EXTEND_ATTACHED_POLICY, name=EXTEND_ATTACHED_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Extend a attached volume.", description="Extend a attached volume.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-extend)' 'path': '/volumes/{volume_id}/action (os-extend)'
} }
]), ],
deprecated_rule=deprecated_extend_attached_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=REVERT_POLICY, name=REVERT_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Revert a volume to a snapshot.", description="Revert a volume to a snapshot.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (revert)' 'path': '/volumes/{volume_id}/action (revert)'
} }
]), ],
deprecated_rule=deprecated_revert_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=RESET_STATUS, name=RESET_STATUS,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -80,27 +143,32 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-reset_status)' 'path': '/volumes/{volume_id}/action (os-reset_status)'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=RETYPE_POLICY, name=RETYPE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Retype a volume.", description="Retype a volume.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-retype)' 'path': '/volumes/{volume_id}/action (os-retype)'
} }
]), ],
deprecated_rule=deprecated_retype_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=UPDATE_READONLY_POLICY, name=UPDATE_READONLY_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Update a volume's readonly flag.", description="Update a volume's readonly flag.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-update_readonly_flag)' 'path': '/volumes/{volume_id}/action (os-update_readonly_flag)'
} }
]), ],
deprecated_rule=deprecated_update_only_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=FORCE_DELETE_POLICY, name=FORCE_DELETE_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -110,7 +178,8 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-force_delete)' 'path': '/volumes/{volume_id}/action (os-force_delete)'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=UPLOAD_PUBLIC_POLICY, name=UPLOAD_PUBLIC_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -120,17 +189,20 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-volume_upload_image)' 'path': '/volumes/{volume_id}/action (os-volume_upload_image)'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=UPLOAD_IMAGE_POLICY, name=UPLOAD_IMAGE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Upload a volume to image.", description="Upload a volume to image.",
operations=[ operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-volume_upload_image)' 'path': '/volumes/{volume_id}/action (os-volume_upload_image)'
} }
]), ],
deprecated_rule=deprecated_upload_image_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=FORCE_DETACH_POLICY, name=FORCE_DETACH_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -140,7 +212,8 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-force_detach)' 'path': '/volumes/{volume_id}/action (os-force_detach)'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=MIGRATE_POLICY, name=MIGRATE_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -150,7 +223,8 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': '/volumes/{volume_id}/action (os-migrate_volume)' 'path': '/volumes/{volume_id}/action (os-migrate_volume)'
} }
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=MIGRATE_COMPLETE_POLICY, name=MIGRATE_COMPLETE_POLICY,
check_str=base.RULE_ADMIN_API, check_str=base.RULE_ADMIN_API,
@ -159,79 +233,96 @@ volume_action_policies = [
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-migrate_volume_completion)'} '/volumes/{volume_id}/action (os-migrate_volume_completion)'}
]), ],
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=INITIALIZE_POLICY, name=INITIALIZE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Initialize volume attachment.", description="Initialize volume attachment.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-initialize_connection)'} '/volumes/{volume_id}/action (os-initialize_connection)'}
]), ],
deprecated_rule=deprecated_initialize_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=TERMINATE_POLICY, name=TERMINATE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Terminate volume attachment.", description="Terminate volume attachment.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-terminate_connection)'} '/volumes/{volume_id}/action (os-terminate_connection)'}
]), ],
deprecated_rule=deprecated_terminate_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=ROLL_DETACHING_POLICY, name=ROLL_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Roll back volume status to 'in-use'.", description="Roll back volume status to 'in-use'.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-roll_detaching)'} '/volumes/{volume_id}/action (os-roll_detaching)'}
]), ],
deprecated_rule=deprecated_roll_detaching_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=RESERVE_POLICY, name=RESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Mark volume as reserved.", description="Mark volume as reserved.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-reserve)'} '/volumes/{volume_id}/action (os-reserve)'}
]), ],
deprecated_rule=deprecated_reserve_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=UNRESERVE_POLICY, name=UNRESERVE_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Unmark volume as reserved.", description="Unmark volume as reserved.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-unreserve)'} '/volumes/{volume_id}/action (os-unreserve)'}
]), ],
deprecated_rule=deprecated_unreserve_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=BEGIN_DETACHING_POLICY, name=BEGIN_DETACHING_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Begin detach volumes.", description="Begin detach volumes.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-begin_detaching)'} '/volumes/{volume_id}/action (os-begin_detaching)'}
]), ],
deprecated_rule=deprecated_begin_detaching_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=ATTACH_POLICY, name=ATTACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Add attachment metadata.", description="Add attachment metadata.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-attach)'} '/volumes/{volume_id}/action (os-attach)'}
]), ],
deprecated_rule=deprecated_attach_policy,
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=DETACH_POLICY, name=DETACH_POLICY,
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER,
description="Clear attachment metadata.", description="Clear attachment metadata.",
operations=[{ operations=[{
'method': 'POST', 'method': 'POST',
'path': 'path':
'/volumes/{volume_id}/action (os-detach)'} '/volumes/{volume_id}/action (os-detach)'}
]), ],
deprecated_rule=deprecated_detach_policy,
),
] ]

View File

@ -1032,6 +1032,7 @@ class VolumeApiTest(test.TestCase):
req.headers = mv.get_mv_header(mv.VOLUME_REVERT) req.headers = mv.get_mv_header(mv.VOLUME_REVERT)
req.api_version_request = mv.get_api_version( req.api_version_request = mv.get_api_version(
mv.VOLUME_REVERT) mv.VOLUME_REVERT)
req.environ['cinder.context'] = self.ctxt
# update volume's status failed # update volume's status failed
mock_update.side_effect = [False, True] mock_update.side_effect = [False, True]

View File

@ -14,20 +14,706 @@
from http import HTTPStatus from http import HTTPStatus
from unittest import mock from unittest import mock
import ddt
from cinder.api.contrib import admin_actions
from cinder.api.contrib import volume_actions
from cinder.api import extensions
from cinder.api import microversions as mv from cinder.api import microversions as mv
from cinder.api.v3 import volumes
from cinder import exception
from cinder.objects import fields
from cinder.policies import volume_actions as policy
from cinder.policies import volumes as volume_policy
from cinder.tests.unit.api import fakes as fake_api
from cinder.tests.unit import fake_constants from cinder.tests.unit import fake_constants
from cinder.tests.unit.policies import base
from cinder.tests.unit.policies import test_base from cinder.tests.unit.policies import test_base
from cinder.tests.unit import utils as test_utils
from cinder.volume import api as volume_api from cinder.volume import api as volume_api
from cinder.volume import manager as volume_manager
@ddt.ddt
class VolumeActionsPolicyTest(base.BasePolicyTest):
authorized_users = [
'legacy_admin',
'legacy_owner',
'system_admin',
'project_admin',
'project_member',
'project_reader',
'project_foo',
]
unauthorized_users = [
'system_member',
'system_reader',
'system_foo',
'other_project_member',
'other_project_reader',
]
authorized_admins = [
'legacy_admin',
'system_admin',
'project_admin',
]
unauthorized_admins = [
'legacy_owner',
'system_member',
'system_reader',
'system_foo',
'project_member',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
# Basic policy test is without enforcing scope (which cinder doesn't
# yet support) and deprecated rules enabled.
def setUp(self, enforce_scope=False, enforce_new_defaults=False,
*args, **kwargs):
super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs)
self.ext_mgr = extensions.ExtensionManager()
self.controller = volume_actions.VolumeActionsController(self.ext_mgr)
self.admin_controller = admin_actions.VolumeAdminController(
self.ext_mgr)
self.volume_controller = volumes.VolumeController(self.ext_mgr)
self.manager = volume_manager.VolumeManager()
self.manager.driver = mock.MagicMock()
self.manager.driver.initialize_connection = mock.MagicMock()
self.manager.driver.initialize_connection.side_effect = (
self._initialize_connection)
self.api_path = '/v3/%s/volumes' % (self.project_id)
self.api_version = mv.BASE_VERSION
def _initialize_connection(self, volume, connector):
return {'data': connector}
def _create_volume(self, attached=False, **kwargs):
vol_type = test_utils.create_volume_type(self.project_admin_context,
name='fake_vol_type',
testcase_instance=self)
volume = test_utils.create_volume(self.project_member_context,
volume_type_id=vol_type.id,
testcase_instance=self, **kwargs)
if attached:
volume = test_utils.attach_volume(self.project_member_context,
volume.id,
fake_constants.INSTANCE_ID,
'fake_host',
'fake_mountpoint')
return volume
@ddt.data(*base.all_users)
def test_extend_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.EXTEND_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
# DB validations will throw VolumeNotFound for some contexts
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_extend_attached_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.EXTEND_ATTACHED_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_EXTEND_INUSE)
req.method = 'POST'
body = {
"os-extend": {
"new_size": 3
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._extend, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_revert_policy(self, user_id):
volume = self._create_volume()
snap = test_utils.create_snapshot(
self.project_member_context,
volume.id,
status=fields.SnapshotStatus.AVAILABLE,
testcase_instance=self)
rule_name = policy.REVERT_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.VOLUME_REVERT)
req.method = 'POST'
body = {
"revert": {
"snapshot_id": snap.id
}
}
# Relax the volume:GET_POLICY in order to get past that check.
self.policy.set_rules({volume_policy.GET_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.volume_controller.revert, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reset_policy(self, user_id):
volume = self._create_volume(attached=True)
rule_name = policy.RESET_STATUS
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reset_status": {
"status": "available",
"attach_status": "detached",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._reset_status, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_retype_policy(self, user_id):
volume = self._create_volume()
test_utils.create_volume_type(self.project_admin_context,
name='another_vol_type',
testcase_instance=self)
rule_name = policy.RETYPE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-retype": {
"new_type": "another_vol_type",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._retype, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_update_readonly_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.UPDATE_READONLY_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-update_readonly_flag": {
"readonly": True
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_readonly_update, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_force_delete_policy(self, user_id):
volume = self._create_volume()
rule_name = policy.FORCE_DELETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_delete": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_delete, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_force_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.FORCE_DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-force_detach": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._force_detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_image_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_IMAGE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.copy_volume_to_image')
@mock.patch('cinder.image.glance.GlanceImageService.create')
def test_upload_public_policy(self, user_id,
mock_image_create,
mock_copy_volume_to_image):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def copy_volume_to_image(ctxt, volume, image_meta):
return self.manager.copy_volume_to_image(ctxt, volume.id,
image_meta)
mock_copy_volume_to_image.side_effect = copy_volume_to_image
volume = self._create_volume(status='available')
rule_name = policy.UPLOAD_PUBLIC_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=mv.UPLOAD_IMAGE_PARAMS)
req.method = 'POST'
body = {
"os-volume_upload_image": {
"image_name": "test",
"visibility": "public",
}
}
# Relax the UPLOAD_IMAGE_POLICY in order to get past that check.
self.policy.set_rules({policy.UPLOAD_IMAGE_POLICY: ""},
overwrite=False)
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.controller._volume_upload_image, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.objects.Service.get_by_id')
def test_migrate_policy(self, user_id, mock_get_service_by_id):
volume = self._create_volume()
rule_name = policy.MIGRATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume": {
"host": "node1@lvm"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_admins,
self.unauthorized_admins,
unauthorized_exceptions,
rule_name,
self.admin_controller._migrate_volume, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_migrate_complete_policy(self, user_id):
volume = self._create_volume()
# Can't use self._create_volume() because it would fail when
# trying to create the volume type a second time.
new_volume = test_utils.create_volume(self.project_member_context,
testcase_instance=self)
rule_name = policy.MIGRATE_COMPLETE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-migrate_volume_completion": {
"new_volume": new_volume.id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(
user_id, self.authorized_admins, self.unauthorized_admins,
unauthorized_exceptions, rule_name,
self.admin_controller._migrate_volume_completion, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.attach_volume')
def test_attach_policy(self, user_id, mock_attach_volume):
def attach_volume(context, volume, instance_uuid, host_name,
mountpoint, mode):
return self.manager.attach_volume(context, volume.id,
instance_uuid, host_name,
mountpoint, mode)
mock_attach_volume.side_effect = attach_volume
volume = self._create_volume(status='available')
rule_name = policy.ATTACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-attach": {
"instance_uuid": fake_constants.INSTANCE_ID,
"mountpoint": "/dev/vdc"
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._attach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.detach_volume')
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_detach_policy(self, user_id,
mock_terminate_connection,
mock_detach_volume):
# Redirect the RPC calls directly to the volume manager.
# The volume manager needs the volume.id, not the volume.
def detach_volume(ctxt, volume, connector, force=False):
return self.manager.detach_volume(ctxt, volume.id,
attachment_id=None,
volume=None)
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force)
mock_detach_volume.side_effect = detach_volume
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume(attached=True)
rule_name = policy.DETACH_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-detach": {
"attachment_id": volume.volume_attachment[0].id
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._detach, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_begin_detaching_policy(self, user_id):
volume = self._create_volume(status='in-use', attach_status='attached')
rule_name = policy.BEGIN_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-begin_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._begin_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_reserve_policy(self, user_id):
volume = self._create_volume(status='available')
rule_name = policy.RESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-reserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._reserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_unreserve_policy(self, user_id):
volume = self._create_volume(status='reserved')
rule_name = policy.UNRESERVE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-unreserve": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._unreserve, req,
id=volume.id, body=body)
@ddt.data(*base.all_users)
def test_roll_detaching_policy(self, user_id):
volume = self._create_volume(status='detaching')
rule_name = policy.ROLL_DETACHING_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-roll_detaching": {}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name, self.controller._roll_detaching,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.initialize_connection')
def test_initialize_policy(self, user_id, mock_initialize_connection):
def initialize_connection(*args):
return self.manager.initialize_connection(*args)
mock_initialize_connection.side_effect = initialize_connection
volume = self._create_volume()
rule_name = policy.INITIALIZE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-initialize_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._initialize_connection,
req, id=volume.id, body=body)
@ddt.data(*base.all_users)
@mock.patch('cinder.volume.rpcapi.VolumeAPI.terminate_connection')
def test_terminate_policy(self, user_id, mock_terminate_connection):
def terminate_connection(ctxt, volume, connector, force=False):
return self.manager.terminate_connection(ctxt, volume.id,
connector, force=False)
mock_terminate_connection.side_effect = terminate_connection
volume = self._create_volume()
rule_name = policy.TERMINATE_POLICY
url = '%s/%s/action' % (self.api_path, volume.id)
req = fake_api.HTTPRequest.blank(url, version=self.api_version)
req.method = 'POST'
body = {
"os-terminate_connection": {
"connector": {
"platform": "x86_64",
"host": "node2",
"do_local_attach": False,
"ip": "192.168.13.101",
"os_type": "linux2",
"multipath": False,
"initiator": "iqn.1994-05.com.redhat:d16cbb5d31e5"
}
}
}
unauthorized_exceptions = [
exception.VolumeNotFound,
]
self.common_policy_check(user_id, self.authorized_users,
self.unauthorized_users,
unauthorized_exceptions,
rule_name,
self.controller._terminate_connection,
req, id=volume.id, body=body)
class VolumeActionsPolicySecureRbacTest(VolumeActionsPolicyTest):
authorized_users = [
'legacy_admin',
'system_admin',
'project_admin',
'project_member',
]
unauthorized_users = [
'legacy_owner',
'system_member',
'system_foo',
'project_reader',
'project_foo',
'other_project_member',
'other_project_reader',
]
def setUp(self, *args, **kwargs):
# Test secure RBAC by disabling deprecated policy rules (scope
# is still not enabled).
super().setUp(enforce_scope=False, enforce_new_defaults=True,
*args, **kwargs)
# TODO(yikun): The below policy test cases should be added:
# * REVERT_POLICY
# * RESET_STATUS
# * FORCE_DETACH_POLICY
# * UPLOAD_PUBLIC_POLICY
# * UPLOAD_IMAGE_POLICY
# * MIGRATE_POLICY
# * MIGRATE_COMPLETE_POLICY
class VolumeProtectionTests(test_base.CinderPolicyTests): class VolumeProtectionTests(test_base.CinderPolicyTests):
def test_admin_can_extend_volume(self): def test_admin_can_extend_volume(self):
admin_context = self.admin_context admin_context = self.admin_context

View File

@ -118,14 +118,6 @@
# DELETE /types # DELETE /types
"volume_extension:types_manage": "" "volume_extension:types_manage": ""
# Revert a volume to a snapshot.
# POST /volumes/{volume_id}/action (revert)
"volume:revert_to_snapshot": ""
# Upload a volume to image.
# POST /volumes/{volume_id}/action (os-volume_upload_image)
"volume_extension:volume_actions:upload_image": ""
# List type extra specs. # List type extra specs.
# GET /types/{type_id}/extra_specs # GET /types/{type_id}/extra_specs
"volume_extension:types_extra_specs:index": "" "volume_extension:types_extra_specs:index": ""

View File

@ -43,26 +43,31 @@ class VolumeTransferTestCase(test.TestCase):
project_id=fake.PROJECT_ID) project_id=fake.PROJECT_ID)
self.updated_at = timeutils.utcnow() self.updated_at = timeutils.utcnow()
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_volume_create_delete(self):
def test_transfer_volume_create_delete(self, mock_notify):
tx_api = transfer_api.API() tx_api = transfer_api.API()
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at) volume = utils.create_volume(self.ctxt, updated_at=self.updated_at)
response = tx_api.create(self.ctxt, volume.id, 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
response = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id) volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('awaiting-transfer', volume['status'], self.assertEqual('awaiting-transfer', volume['status'],
'Unexpected state') 'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
tx_api.delete(self.ctxt, response['id']) with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
tx_api.delete(self.ctxt, response['id'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.delete.start"),
mock.call(self.ctxt, mock.ANY, "transfer.delete.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id) volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('available', volume['status'], 'Unexpected state') self.assertEqual('available', volume['status'], 'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.delete.start"),
mock.call(self.ctxt, mock.ANY, "transfer.delete.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(4, mock_notify.call_count)
def test_transfer_invalid_volume(self): def test_transfer_invalid_volume(self):
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -84,8 +89,7 @@ class VolumeTransferTestCase(test.TestCase):
tx_api.create, tx_api.create,
self.ctxt, volume.id, 'Description') self.ctxt, volume.id, 'Description')
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_invalid_authkey(self):
def test_transfer_accept_invalid_authkey(self, mock_notify):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -103,39 +107,42 @@ class VolumeTransferTestCase(test.TestCase):
tx_api.accept, tx_api.accept,
self.ctxt, transfer['id'], 'wrong') self.ctxt, transfer['id'], 'wrong')
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_invalid_volume(self):
def test_transfer_accept_invalid_volume(self, mock_notify):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at, volume = utils.create_volume(self.ctxt, updated_at=self.updated_at,
volume_type_id=self.vt['id']) volume_type_id=self.vt['id'])
transfer = tx_api.create(self.ctxt, volume.id, 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id) volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual('awaiting-transfer', volume['status'], self.assertEqual('awaiting-transfer', volume['status'],
'Unexpected state') 'Unexpected state')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume.status = 'wrong' volume.status = 'wrong'
volume.save() volume.save()
self.assertRaises(exception.InvalidVolume, with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
tx_api.accept, ) as mock_notify:
self.ctxt, transfer['id'], transfer['auth_key']) self.assertRaises(exception.InvalidVolume,
tx_api.accept,
self.ctxt, transfer['id'], transfer['auth_key'])
# Because the InvalidVolume exception is raised in tx_api, so
# there is only transfer.accept.start called and missing
# transfer.accept.end.
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start")]
mock_notify.assert_has_calls(calls)
self.assertEqual(1, mock_notify.call_count)
volume.status = 'awaiting-transfer' volume.status = 'awaiting-transfer'
volume.save() volume.save()
# Because the InvalidVolume exception is raised in tx_api, so there is def test_transfer_accept_volume_in_consistencygroup(self):
# only transfer.accept.start called and missing transfer.accept.end.
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start")]
mock_notify.assert_has_calls(calls)
self.assertEqual(3, mock_notify.call_count)
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage')
def test_transfer_accept_volume_in_consistencygroup(self, mock_notify):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -153,8 +160,7 @@ class VolumeTransferTestCase(test.TestCase):
@mock.patch.object(QUOTAS, "limit_check") @mock.patch.object(QUOTAS, "limit_check")
@mock.patch.object(QUOTAS, "reserve") @mock.patch.object(QUOTAS, "reserve")
@mock.patch.object(QUOTAS, "add_volume_type_opts") @mock.patch.object(QUOTAS, "add_volume_type_opts")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept(self, mock_quota_voltype,
def test_transfer_accept(self, mock_notify, mock_quota_voltype,
mock_quota_reserve, mock_quota_limit): mock_quota_reserve, mock_quota_limit):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
@ -162,13 +168,26 @@ class VolumeTransferTestCase(test.TestCase):
volume = utils.create_volume(self.ctxt, volume = utils.create_volume(self.ctxt,
volume_type_id=fake.VOLUME_TYPE_ID, volume_type_id=fake.VOLUME_TYPE_ID,
updated_at=self.updated_at) updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume.id, 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
self.ctxt.user_id = fake.USER2_ID self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID self.ctxt.project_id = fake.PROJECT2_ID
response = tx_api.accept(self.ctxt, with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
transfer['id'], ) as mock_notify:
transfer['auth_key']) response = tx_api.accept(self.ctxt,
transfer['id'],
transfer['auth_key'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id) volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual(fake.PROJECT2_ID, volume.project_id) self.assertEqual(fake.PROJECT2_ID, volume.project_id)
self.assertEqual(fake.USER2_ID, volume.user_id) self.assertEqual(fake.USER2_ID, volume.user_id)
@ -178,13 +197,6 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(response['id'], transfer['id'], self.assertEqual(response['id'], transfer['id'],
'Unexpected transfer id in response.') 'Unexpected transfer id in response.')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create(),
# and twice at accept().
self.assertEqual(4, mock_notify.call_count)
# Check QUOTAS reservation calls # Check QUOTAS reservation calls
# QUOTAS.add_volume_type_opts # QUOTAS.add_volume_type_opts
reserve_opt = {'volumes': 1, 'gigabytes': 1} reserve_opt = {'volumes': 1, 'gigabytes': 1}
@ -207,8 +219,7 @@ class VolumeTransferTestCase(test.TestCase):
@mock.patch.object(QUOTAS, "reserve") @mock.patch.object(QUOTAS, "reserve")
@mock.patch.object(QUOTAS, "add_volume_type_opts") @mock.patch.object(QUOTAS, "add_volume_type_opts")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_over_quota(self, mock_quota_voltype,
def test_transfer_accept_over_quota(self, mock_notify, mock_quota_voltype,
mock_quota_reserve): mock_quota_reserve):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
@ -216,7 +227,11 @@ class VolumeTransferTestCase(test.TestCase):
volume = utils.create_volume(self.ctxt, volume = utils.create_volume(self.ctxt,
volume_type_id=fake.VOLUME_TYPE_ID, volume_type_id=fake.VOLUME_TYPE_ID,
updated_at=self.updated_at) updated_at=self.updated_at)
transfer = tx_api.create(self.ctxt, volume.id, 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
self.assertEqual(2, mock_notify.call_count)
fake_overs = ['volumes_lvmdriver-3'] fake_overs = ['volumes_lvmdriver-3']
fake_quotas = {'gigabytes_lvmdriver-3': 1, fake_quotas = {'gigabytes_lvmdriver-3': 1,
'volumes_lvmdriver-3': 10} 'volumes_lvmdriver-3': 10}
@ -230,19 +245,19 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt.user_id = fake.USER2_ID self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID self.ctxt.project_id = fake.PROJECT2_ID
self.assertRaises(exception.VolumeLimitExceeded, with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
tx_api.accept, ) as mock_notify:
self.ctxt, self.assertRaises(exception.VolumeLimitExceeded,
transfer['id'], tx_api.accept,
transfer['auth_key']) self.ctxt,
# notification of transfer.accept is sent only after quota check transfer['id'],
# passes transfer['auth_key'])
self.assertEqual(2, mock_notify.call_count) # notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(0, mock_notify.call_count)
@mock.patch.object(QUOTAS, "limit_check") @mock.patch.object(QUOTAS, "limit_check")
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_over_quota_check_limit(self, mock_quota_limit):
def test_transfer_accept_over_quota_check_limit(self, mock_notify,
mock_quota_limit):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -261,14 +276,16 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt.user_id = fake.USER2_ID self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID self.ctxt.project_id = fake.PROJECT2_ID
self.assertRaises(exception.VolumeSizeExceedsLimit, with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
tx_api.accept, ) as mock_notify:
self.ctxt, self.assertRaises(exception.VolumeSizeExceedsLimit,
transfer['id'], tx_api.accept,
transfer['auth_key']) self.ctxt,
# notification of transfer.accept is sent only after quota check transfer['id'],
# passes transfer['auth_key'])
self.assertEqual(2, mock_notify.call_count) # notification of transfer.accept is sent only after quota check
# passes
self.assertEqual(0, mock_notify.call_count)
def test_transfer_get(self): def test_transfer_get(self):
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -323,20 +340,22 @@ class VolumeTransferTestCase(test.TestCase):
sort_keys=mock.sentinel.sort_keys) sort_keys=mock.sentinel.sort_keys)
self.assertEqual(get_all_by_project.return_value, res) self.assertEqual(get_all_by_project.return_value, res)
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_delete_transfer_with_deleted_volume(self):
def test_delete_transfer_with_deleted_volume(self, mock_notify):
# create a volume # create a volume
volume = utils.create_volume(self.ctxt, updated_at=self.updated_at) volume = utils.create_volume(self.ctxt, updated_at=self.updated_at)
# create a transfer # create a transfer
tx_api = transfer_api.API() tx_api = transfer_api.API()
transfer = tx_api.create(self.ctxt, volume['id'], 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
t = tx_api.get(self.ctxt, transfer['id']) ) as mock_notify:
transfer = tx_api.create(self.ctxt, volume['id'], 'Description')
t = tx_api.get(self.ctxt, transfer['id'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
self.assertEqual(t['id'], transfer['id'], 'Unexpected transfer id') self.assertEqual(t['id'], transfer['id'], 'Unexpected transfer id')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
self.assertEqual(2, mock_notify.call_count)
# force delete volume # force delete volume
volume.destroy() volume.destroy()
# Make sure transfer has been deleted. # Make sure transfer has been deleted.
@ -345,8 +364,7 @@ class VolumeTransferTestCase(test.TestCase):
self.ctxt, self.ctxt,
transfer['id']) transfer['id'])
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_with_snapshots(self):
def test_transfer_accept_with_snapshots(self, mock_notify):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()
@ -356,7 +374,14 @@ class VolumeTransferTestCase(test.TestCase):
utils.create_volume_type(self.ctxt.elevated(), utils.create_volume_type(self.ctxt.elevated(),
id=fake.VOLUME_TYPE_ID, name="test_type") id=fake.VOLUME_TYPE_ID, name="test_type")
utils.create_snapshot(self.ctxt, volume.id, status='available') utils.create_snapshot(self.ctxt, volume.id, status='available')
transfer = tx_api.create(self.ctxt, volume.id, 'Description') with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
transfer = tx_api.create(self.ctxt, volume.id, 'Description')
calls = [mock.call(self.ctxt, mock.ANY, "transfer.create.start"),
mock.call(self.ctxt, mock.ANY, "transfer.create.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create().
self.assertEqual(2, mock_notify.call_count)
# Get volume and snapshot quota before accept # Get volume and snapshot quota before accept
self.ctxt.user_id = fake.USER2_ID self.ctxt.user_id = fake.USER2_ID
@ -366,18 +391,19 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(0, usages.get('volumes', {}).get('in_use', 0)) self.assertEqual(0, usages.get('volumes', {}).get('in_use', 0))
self.assertEqual(0, usages.get('snapshots', {}).get('in_use', 0)) self.assertEqual(0, usages.get('snapshots', {}).get('in_use', 0))
tx_api.accept(self.ctxt, transfer['id'], transfer['auth_key']) with mock.patch('cinder.volume.volume_utils.notify_about_volume_usage'
) as mock_notify:
tx_api.accept(self.ctxt, transfer['id'], transfer['auth_key'])
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at accept().
self.assertEqual(2, mock_notify.call_count)
volume = objects.Volume.get_by_id(self.ctxt, volume.id) volume = objects.Volume.get_by_id(self.ctxt, volume.id)
self.assertEqual(fake.PROJECT2_ID, volume.project_id) self.assertEqual(fake.PROJECT2_ID, volume.project_id)
self.assertEqual(fake.USER2_ID, volume.user_id) self.assertEqual(fake.USER2_ID, volume.user_id)
calls = [mock.call(self.ctxt, mock.ANY, "transfer.accept.start"),
mock.call(self.ctxt, mock.ANY, "transfer.accept.end")]
mock_notify.assert_has_calls(calls)
# The notify_about_volume_usage is called twice at create(),
# and twice at accept().
self.assertEqual(4, mock_notify.call_count)
# Get volume and snapshot quota after accept # Get volume and snapshot quota after accept
self.ctxt.user_id = fake.USER2_ID self.ctxt.user_id = fake.USER2_ID
self.ctxt.project_id = fake.PROJECT2_ID self.ctxt.project_id = fake.PROJECT2_ID
@ -386,8 +412,7 @@ class VolumeTransferTestCase(test.TestCase):
self.assertEqual(1, usages.get('volumes', {}).get('in_use', 0)) self.assertEqual(1, usages.get('volumes', {}).get('in_use', 0))
self.assertEqual(1, usages.get('snapshots', {}).get('in_use', 0)) self.assertEqual(1, usages.get('snapshots', {}).get('in_use', 0))
@mock.patch('cinder.volume.volume_utils.notify_about_volume_usage') def test_transfer_accept_with_snapshots_invalid(self):
def test_transfer_accept_with_snapshots_invalid(self, mock_notify):
svc = self.start_service('volume', host='test_host') svc = self.start_service('volume', host='test_host')
self.addCleanup(svc.stop) self.addCleanup(svc.stop)
tx_api = transfer_api.API() tx_api = transfer_api.API()