Respond negatively to tenant detachment of enforced QoS policies

Currently when the tenant attempts to detach an enforced QoS policy
for a port or network set by admin, the attempt fails but the API
feedback indicates that it was successful.  This change will
fix the API response so the failure is accurately signalled
to the tenant.

Co-Authored-By: litong01 <litong01@us.ibm.com>
Co-Authored-By: gong yong sheng <gong.yongsheng@99cloud.net>
Co-Authored-By: Nate Johnston <Nate_Johnston@cable.comcast.com>
Co-Authored-By: Margaret Frances <margaret_frances@cable.comcast.com>

Change-Id: I977feecc6cce378abc1e6092afbaf9f2681b2ec6
Closes-bug: #1486607
This commit is contained in:
Tong Li 2015-09-15 17:23:42 -04:00 committed by Nate Johnston
parent a4985d16f6
commit 4ec6932c93
3 changed files with 111 additions and 2 deletions

View File

@ -52,6 +52,11 @@ class PolicyCheckError(e.NeutronException):
message = _("Failed to check policy %(policy)s because %(reason)s.") message = _("Failed to check policy %(policy)s because %(reason)s.")
class PolicyRemoveAuthorizationError(e.NotAuthorized):
message = _("Failed to remove provided policy %(policy_id)s "
"because you are not authorized.")
class StateInvalid(e.BadRequest): class StateInvalid(e.BadRequest):
message = _("Unsupported port state: %(port_state)s.") message = _("Unsupported port state: %(port_state)s.")

View File

@ -37,10 +37,23 @@ class QosCoreResourceExtension(base.CoreResourceExtension):
raise n_exc.QosPolicyNotFound(policy_id=policy_id) raise n_exc.QosPolicyNotFound(policy_id=policy_id)
return obj return obj
def _check_policy_change_permission(self, context, old_policy):
"""An existing policy can be modified only if one of the following is
true:
the policy's tenant is the context's tenant
the policy is shared with the tenant
Using is_accessible expresses these conditions.
"""
if not (policy_object.QosPolicy.is_accessible(context, old_policy)):
raise n_exc.PolicyRemoveAuthorizationError(policy_id=old_policy.id)
def _update_port_policy(self, context, port, port_changes): def _update_port_policy(self, context, port, port_changes):
old_policy = policy_object.QosPolicy.get_port_policy( old_policy = policy_object.QosPolicy.get_port_policy(
context, port['id']) context.elevated(), port['id'])
if old_policy: if old_policy:
self._check_policy_change_permission(context, old_policy)
old_policy.detach_port(port['id']) old_policy.detach_port(port['id'])
qos_policy_id = port_changes.get(qos_consts.QOS_POLICY_ID) qos_policy_id = port_changes.get(qos_consts.QOS_POLICY_ID)
@ -51,8 +64,9 @@ class QosCoreResourceExtension(base.CoreResourceExtension):
def _update_network_policy(self, context, network, network_changes): def _update_network_policy(self, context, network, network_changes):
old_policy = policy_object.QosPolicy.get_network_policy( old_policy = policy_object.QosPolicy.get_network_policy(
context, network['id']) context.elevated(), network['id'])
if old_policy: if old_policy:
self._check_policy_change_permission(context, old_policy)
old_policy.detach_network(network['id']) old_policy.detach_network(network['id'])
qos_policy_id = network_changes.get(qos_consts.QOS_POLICY_ID) qos_policy_id = network_changes.get(qos_consts.QOS_POLICY_ID)

View File

@ -15,6 +15,7 @@
import mock import mock
from neutron.common import exceptions as n_exc
from neutron import context from neutron import context
from neutron.core_extensions import base as base_core from neutron.core_extensions import base as base_core
from neutron.core_extensions import qos as qos_core from neutron.core_extensions import qos as qos_core
@ -36,6 +37,7 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase):
policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy') policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy')
self.policy_m = policy_p.start() self.policy_m = policy_p.start()
self.context = context.get_admin_context() self.context = context.get_admin_context()
self.non_admin_context = context.Context('user_id', 'tenant_id')
def test_process_fields_no_qos_policy_id(self): def test_process_fields_no_qos_policy_id(self):
self.core_extension.process_fields( self.core_extension.process_fields(
@ -110,6 +112,51 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase):
old_qos_policy.detach_port.assert_called_once_with(port_id) old_qos_policy.detach_port.assert_called_once_with(port_id)
self.assertIsNone(actual_port['qos_policy_id']) self.assertIsNone(actual_port['qos_policy_id'])
def _process_port_updated_policy(self, context, shared,
policy_tenant_id):
with self._mock_plugin_loaded(True):
port_id = mock.sentinel.port_id
qos_policy_id = mock.sentinel.policy_id
actual_port = {'id': port_id,
qos_consts.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
old_qos_policy.shared = shared
old_qos_policy.tenant_id = policy_tenant_id
self.policy_m.get_port_policy = mock.Mock(
return_value=old_qos_policy)
self.core_extension.process_fields(
context, base_core.PORT,
{qos_consts.QOS_POLICY_ID: None},
actual_port)
old_qos_policy.detach_port.assert_called_once_with(port_id)
def test_process_resource_port_updated_remove_own_policy(self):
self._process_port_updated_policy(
context=self.non_admin_context,
shared=False,
policy_tenant_id=self.non_admin_context.tenant_id)
def test_process_resource_port_updated_admin_remove_provided_policy(self):
self._process_port_updated_policy(
context=self.context,
shared=False,
policy_tenant_id=self.non_admin_context.tenant_id)
def test_process_resource_port_updated_remove_shared_policy(self):
self._process_port_updated_policy(
context=self.non_admin_context,
shared=True,
policy_tenant_id=self.context.tenant_id)
def test_process_resource_port_updated_remove_provided_policy(self):
self.policy_m.is_accessible.return_value = False
self.assertRaises(n_exc.PolicyRemoveAuthorizationError,
self._process_port_updated_policy,
context=self.non_admin_context,
shared=False,
policy_tenant_id=self.context.tenant_id)
def test_process_resource_network_updated_no_policy(self): def test_process_resource_network_updated_no_policy(self):
with self._mock_plugin_loaded(True): with self._mock_plugin_loaded(True):
network_id = mock.Mock() network_id = mock.Mock()
@ -161,6 +208,49 @@ class QosCoreResourceExtensionTestCase(base.BaseTestCase):
old_qos_policy.detach_network.assert_called_once_with(network_id) old_qos_policy.detach_network.assert_called_once_with(network_id)
new_qos_policy.attach_network.assert_called_once_with(network_id) new_qos_policy.attach_network.assert_called_once_with(network_id)
def _process_network_updated_policy(self, context, shared,
policy_tenant_id):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.sentinel.policy_id
network_id = mock.sentinel.net_id
actual_network = {'id': network_id,
qos_consts.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
old_qos_policy.shared = shared
old_qos_policy.tenant_id = policy_tenant_id
self.policy_m.get_network_policy.return_value = old_qos_policy
self.core_extension.process_fields(
context, base_core.NETWORK,
{qos_consts.QOS_POLICY_ID: None}, actual_network)
old_qos_policy.detach_network.assert_called_once_with(network_id)
def test_process_fields_network_updated_remove_shared_policy(self):
self._process_network_updated_policy(
context=self.non_admin_context,
shared=True,
policy_tenant_id=self.context.tenant_id)
def test_process_fields_network_updated_remove_own_policy(self):
self._process_network_updated_policy(
context=self.non_admin_context,
shared=True,
policy_tenant_id=self.non_admin_context.tenant_id)
def test_process_fields_network_updated_admin_remove_provided_policy(self):
self._process_network_updated_policy(
context=self.context,
shared=True,
policy_tenant_id=self.non_admin_context.tenant_id)
def test_process_fields_network_updated_remove_provided_policy(self):
self.policy_m.is_accessible.return_value = False
self.assertRaises(n_exc.PolicyRemoveAuthorizationError,
self._process_network_updated_policy,
context=self.non_admin_context,
shared=False,
policy_tenant_id=self.context.tenant_id)
def test_extract_fields_plugin_not_loaded(self): def test_extract_fields_plugin_not_loaded(self):
with self._mock_plugin_loaded(False): with self._mock_plugin_loaded(False):
fields = self.core_extension.extract_fields(None, None) fields = self.core_extension.extract_fields(None, None)