Merge "Add precommit calls to the QoSDriver class"
This commit is contained in:
commit
e137e63db3
@ -52,8 +52,9 @@ Service side design
|
||||
* neutron.services.qos.drivers.base:
|
||||
the interface class for pluggable QoS drivers that are used to update
|
||||
backends about new {create, update, delete} events on any rule or policy
|
||||
change. The drivers also declare which QoS rules, VIF drivers and VNIC
|
||||
types are supported.
|
||||
change, including precommit events that some backends could need for
|
||||
synchronization reason. The drivers also declare which QoS rules,
|
||||
VIF drivers and VNIC types are supported.
|
||||
|
||||
* neutron.core_extensions.base:
|
||||
Contains an interface class to implement core resource (port/network)
|
||||
|
@ -212,6 +212,11 @@ class DuplicatedExtension(e.NeutronException):
|
||||
message = _("Found duplicate extension: %(alias)s.")
|
||||
|
||||
|
||||
class DriverCallError(e.MultipleExceptions):
|
||||
def __init__(self, exc_list=None):
|
||||
super(DriverCallError, self).__init__(exc_list or [])
|
||||
|
||||
|
||||
class DeviceIDNotOwnedByTenant(e.Conflict):
|
||||
message = _("The following device_id %(device_id)s is not owned by your "
|
||||
"tenant or matches another tenants router.")
|
||||
|
@ -110,6 +110,17 @@ class DriverBase(object):
|
||||
rules.
|
||||
"""
|
||||
|
||||
def create_policy_precommit(self, context, policy):
|
||||
"""Create policy precommit.
|
||||
|
||||
This method can be implemented by the specific driver subclass
|
||||
to handle the precommit event of a policy that is being created.
|
||||
|
||||
:param context: current running context information
|
||||
:param policy: a QoSPolicy object being created, which will have no
|
||||
rules.
|
||||
"""
|
||||
|
||||
def update_policy(self, context, policy):
|
||||
"""Update policy invocation.
|
||||
|
||||
@ -120,6 +131,16 @@ class DriverBase(object):
|
||||
:param policy: a QoSPolicy object being updated.
|
||||
"""
|
||||
|
||||
def update_policy_precommit(self, context, policy):
|
||||
"""Update policy precommit.
|
||||
|
||||
This method can be implemented by the specific driver subclass
|
||||
to handle update precommit event of a policy that is being updated.
|
||||
|
||||
:param context: current running context information
|
||||
:param policy: a QoSPolicy object being updated.
|
||||
"""
|
||||
|
||||
def delete_policy(self, context, policy):
|
||||
"""Delete policy invocation.
|
||||
|
||||
@ -129,3 +150,13 @@ class DriverBase(object):
|
||||
:param context: current running context information
|
||||
:param policy: a QoSPolicy object being deleted
|
||||
"""
|
||||
|
||||
def delete_policy_precommit(self, context, policy):
|
||||
"""Delete policy precommit.
|
||||
|
||||
This method can be implemented by the specific driver subclass
|
||||
to handle delete precommit event of a policy that is being deleted.
|
||||
|
||||
:param context: current running context information
|
||||
:param policy: a QoSPolicy object being deleted
|
||||
"""
|
||||
|
@ -14,13 +14,13 @@ from neutron_lib.api.definitions import portbindings
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import registry
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
|
||||
from neutron._i18n import _LE, _LW
|
||||
from neutron._i18n import _LW
|
||||
from neutron.api.rpc.callbacks import events as rpc_events
|
||||
from neutron.api.rpc.callbacks.producer import registry as rpc_registry
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.common import exceptions
|
||||
from neutron.objects.qos import policy as policy_object
|
||||
from neutron.services.qos import qos_consts
|
||||
|
||||
@ -86,14 +86,19 @@ class QosServiceDriverManager(object):
|
||||
|
||||
def call(self, method_name, *args, **kwargs):
|
||||
"""Helper method for calling a method across all extension drivers."""
|
||||
exc_list = []
|
||||
for driver in self._drivers:
|
||||
try:
|
||||
getattr(driver, method_name)(*args, **kwargs)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE("Extension driver '%(name)s' failed in "
|
||||
"%(method)s"),
|
||||
{'name': driver.name, 'method': method_name})
|
||||
except Exception as exc:
|
||||
exception_msg = ("Extension driver '%(name)s' failed in "
|
||||
"%(method)s")
|
||||
exception_data = {'name': driver.name, 'method': method_name}
|
||||
LOG.exception(exception_msg, exception_data)
|
||||
exc_list.append(exc)
|
||||
|
||||
if exc_list:
|
||||
raise exceptions.DriverCallError(exc_list=exc_list)
|
||||
|
||||
if self.rpc_notifications_required:
|
||||
context = kwargs.get('context') or args[0]
|
||||
@ -102,10 +107,10 @@ class QosServiceDriverManager(object):
|
||||
# we don't push create_policy events since policies are empty
|
||||
# on creation, they only become of any use when rules get
|
||||
# attached to them.
|
||||
if method_name == 'update_policy':
|
||||
if method_name == qos_consts.UPDATE_POLICY:
|
||||
self.push_api.push(context, [policy_obj], rpc_events.UPDATED)
|
||||
|
||||
elif method_name == 'delete_policy':
|
||||
elif method_name == qos_consts.DELETE_POLICY:
|
||||
self.push_api.push(context, [policy_obj], rpc_events.DELETED)
|
||||
|
||||
def register_driver(self, driver):
|
||||
|
@ -37,3 +37,20 @@ QOS_PLUGIN = 'qos_plugin'
|
||||
# as 80% of bw_limit to ensure that at least limits for TCP traffic will work
|
||||
# fine.
|
||||
DEFAULT_BURST_RATE = 0.8
|
||||
|
||||
# Method names for QoSDriver
|
||||
PRECOMMIT_POSTFIX = '_precommit'
|
||||
CREATE_POLICY = 'create_policy'
|
||||
CREATE_POLICY_PRECOMMIT = CREATE_POLICY + PRECOMMIT_POSTFIX
|
||||
UPDATE_POLICY = 'update_policy'
|
||||
UPDATE_POLICY_PRECOMMIT = UPDATE_POLICY + PRECOMMIT_POSTFIX
|
||||
DELETE_POLICY = 'delete_policy'
|
||||
DELETE_POLICY_PRECOMMIT = DELETE_POLICY + PRECOMMIT_POSTFIX
|
||||
|
||||
QOS_CALL_METHODS = (
|
||||
CREATE_POLICY,
|
||||
CREATE_POLICY_PRECOMMIT,
|
||||
UPDATE_POLICY,
|
||||
UPDATE_POLICY_PRECOMMIT,
|
||||
DELETE_POLICY,
|
||||
DELETE_POLICY_PRECOMMIT, )
|
||||
|
@ -159,11 +159,13 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
# This cannot be done in other place of stacktrace, because neutron
|
||||
# needs to be backward compatible.
|
||||
policy['policy'].pop('tenant_id', None)
|
||||
|
||||
policy_obj = policy_object.QosPolicy(context, **policy['policy'])
|
||||
with db_api.context_manager.writer.using(context):
|
||||
policy_obj.create()
|
||||
self.driver_manager.call(qos_consts.CREATE_POLICY_PRECOMMIT,
|
||||
context, policy_obj)
|
||||
|
||||
self.driver_manager.call('create_policy', context, policy_obj)
|
||||
self.driver_manager.call(qos_consts.CREATE_POLICY, context, policy_obj)
|
||||
|
||||
return policy_obj
|
||||
|
||||
@ -181,11 +183,15 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
:returns: a QosPolicy object
|
||||
"""
|
||||
policy_data = policy['policy']
|
||||
with db_api.context_manager.writer.using(context):
|
||||
policy_obj = policy_object.QosPolicy(context, id=policy_id)
|
||||
policy_obj.update_fields(policy_data, reset_changes=True)
|
||||
policy_obj.update()
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY_PRECOMMIT,
|
||||
context, policy_obj)
|
||||
|
||||
self.driver_manager.call('update_policy', context, policy_obj)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY,
|
||||
context, policy_obj)
|
||||
|
||||
return policy_obj
|
||||
|
||||
@ -199,11 +205,15 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
|
||||
:returns: None
|
||||
"""
|
||||
with db_api.context_manager.writer.using(context):
|
||||
policy = policy_object.QosPolicy(context)
|
||||
policy.id = policy_id
|
||||
policy.delete()
|
||||
self.driver_manager.call(qos_consts.DELETE_POLICY_PRECOMMIT,
|
||||
context, policy)
|
||||
|
||||
self.driver_manager.call('delete_policy', context, policy)
|
||||
self.driver_manager.call(qos_consts.DELETE_POLICY,
|
||||
context, policy)
|
||||
|
||||
def _get_policy_obj(self, context, policy_id):
|
||||
"""Fetch a QoS policy.
|
||||
@ -291,8 +301,10 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
rule.create()
|
||||
policy.reload_rules()
|
||||
self.validate_policy(context, policy)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY_PRECOMMIT,
|
||||
context, policy)
|
||||
|
||||
self.driver_manager.call('update_policy', context, policy)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY, context, policy)
|
||||
|
||||
return rule
|
||||
|
||||
@ -328,8 +340,10 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
rule.update()
|
||||
policy.reload_rules()
|
||||
self.validate_policy(context, policy)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY_PRECOMMIT,
|
||||
context, policy)
|
||||
|
||||
self.driver_manager.call('update_policy', context, policy)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY, context, policy)
|
||||
|
||||
return rule
|
||||
|
||||
@ -353,8 +367,10 @@ class QoSPlugin(qos.QoSPluginBase):
|
||||
rule = policy.get_rule_by_id(rule_id)
|
||||
rule.delete()
|
||||
policy.reload_rules()
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY_PRECOMMIT,
|
||||
context, policy)
|
||||
|
||||
self.driver_manager.call('update_policy', context, policy)
|
||||
self.driver_manager.call(qos_consts.UPDATE_POLICY, context, policy)
|
||||
|
||||
@db_base_plugin_common.filter_fields
|
||||
@db_base_plugin_common.convert_result_to_dict
|
||||
|
@ -16,6 +16,7 @@ from neutron_lib import context
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.common import constants
|
||||
from neutron.common import exceptions
|
||||
from neutron.objects import ports as ports_object
|
||||
from neutron.objects.qos import rule as rule_object
|
||||
from neutron.services.qos.drivers import base as qos_driver_base
|
||||
@ -178,3 +179,25 @@ class TestQosDriversManagerRules(TestQosDriversManagerBase):
|
||||
set([qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
|
||||
qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH,
|
||||
qos_consts.RULE_TYPE_DSCP_MARKING]))
|
||||
|
||||
|
||||
class TestQosDriversCalls(TestQosDriversManagerBase):
|
||||
"""Test QoS driver calls"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestQosDriversCalls, self).setUp()
|
||||
self.driver_manager = self._create_manager_with_drivers(
|
||||
{'driver-A': {'is_loaded': True}})
|
||||
|
||||
def test_implemented_call_methods(self):
|
||||
for method in qos_consts.QOS_CALL_METHODS:
|
||||
with mock.patch.object(qos_driver_base.DriverBase, method) as \
|
||||
method_fnc:
|
||||
context = mock.Mock()
|
||||
policy = mock.Mock()
|
||||
self.driver_manager.call(method, context, policy)
|
||||
method_fnc.assert_called_once_with(context, policy)
|
||||
|
||||
def test_not_implemented_call_methods(self):
|
||||
self.assertRaises(exceptions.DriverCallError, self.driver_manager.call,
|
||||
'wrong_method', mock.Mock(), mock.Mock())
|
||||
|
@ -311,9 +311,24 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
@mock.patch(
|
||||
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
|
||||
'.create_rbac_policy')
|
||||
def test_add_policy(self, *mocks):
|
||||
@mock.patch('neutron.objects.qos.policy.QosPolicy')
|
||||
def test_add_policy(self, mock_qos_policy, mock_create_rbac_policy):
|
||||
mock_manager = mock.Mock()
|
||||
mock_manager.attach_mock(mock_qos_policy, 'QosPolicy')
|
||||
mock_manager.attach_mock(self.qos_plugin.driver_manager, 'driver')
|
||||
mock_manager.reset_mock()
|
||||
|
||||
self.qos_plugin.create_policy(self.ctxt, self.policy_data)
|
||||
self._validate_driver_params('create_policy')
|
||||
|
||||
policy_mock_call = mock.call.QosPolicy().create()
|
||||
create_precommit_mock_call = mock.call.driver.call(
|
||||
'create_policy_precommit', self.ctxt, mock.ANY)
|
||||
create_mock_call = mock.call.driver.call(
|
||||
'create_policy', self.ctxt, mock.ANY)
|
||||
self.assertTrue(
|
||||
mock_manager.mock_calls.index(policy_mock_call) <
|
||||
mock_manager.mock_calls.index(create_precommit_mock_call) <
|
||||
mock_manager.mock_calls.index(create_mock_call))
|
||||
|
||||
def test_add_policy_with_extra_tenant_keyword(self, *mocks):
|
||||
policy_id = uuidutils.generate_uuid()
|
||||
@ -340,18 +355,51 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
@mock.patch(
|
||||
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
|
||||
'.create_rbac_policy')
|
||||
def test_update_policy(self, *mocks):
|
||||
@mock.patch.object(policy_object.QosPolicy, 'update')
|
||||
def test_update_policy(self, mock_qos_policy_update,
|
||||
mock_create_rbac_policy):
|
||||
mock_manager = mock.Mock()
|
||||
mock_manager.attach_mock(mock_qos_policy_update, 'update')
|
||||
mock_manager.attach_mock(self.qos_plugin.driver_manager, 'driver')
|
||||
mock_manager.reset_mock()
|
||||
|
||||
fields = base_object.get_updatable_fields(
|
||||
policy_object.QosPolicy, self.policy_data['policy'])
|
||||
self.qos_plugin.update_policy(
|
||||
self.ctxt, self.policy.id, {'policy': fields})
|
||||
self._validate_driver_params('update_policy')
|
||||
|
||||
policy_update_mock_call = mock.call.update()
|
||||
update_precommit_mock_call = mock.call.driver.call(
|
||||
'update_policy_precommit', self.ctxt, mock.ANY)
|
||||
update_mock_call = mock.call.driver.call(
|
||||
'update_policy', self.ctxt, mock.ANY)
|
||||
self.assertTrue(
|
||||
mock_manager.mock_calls.index(policy_update_mock_call) <
|
||||
mock_manager.mock_calls.index(update_precommit_mock_call) <
|
||||
mock_manager.mock_calls.index(update_mock_call))
|
||||
|
||||
@mock.patch('neutron.objects.db.api.get_object', return_value=None)
|
||||
def test_delete_policy(self, *mocks):
|
||||
@mock.patch.object(policy_object.QosPolicy, 'delete')
|
||||
def test_delete_policy(self, mock_qos_policy_delete, mock_api_get_policy):
|
||||
mock_manager = mock.Mock()
|
||||
mock_manager.attach_mock(mock_qos_policy_delete, 'delete')
|
||||
mock_manager.attach_mock(self.qos_plugin.driver_manager, 'driver')
|
||||
mock_manager.reset_mock()
|
||||
|
||||
self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
|
||||
self._validate_driver_params('delete_policy')
|
||||
|
||||
policy_delete_mock_call = mock.call.delete()
|
||||
delete_precommit_mock_call = mock.call.driver.call(
|
||||
'delete_policy_precommit', self.ctxt, mock.ANY)
|
||||
delete_mock_call = mock.call.driver.call(
|
||||
'delete_policy', self.ctxt, mock.ANY)
|
||||
self.assertTrue(
|
||||
mock_manager.mock_calls.index(policy_delete_mock_call) <
|
||||
mock_manager.mock_calls.index(delete_precommit_mock_call) <
|
||||
mock_manager.mock_calls.index(delete_mock_call))
|
||||
|
||||
def test_create_policy_rule(self):
|
||||
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
|
||||
return_value=self.policy), mock.patch(
|
||||
@ -409,7 +457,13 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
self.ctxt, self.policy.id, self.rule_data)
|
||||
mock_qos_get_obj.assert_called_once_with(self.ctxt, id=_policy.id)
|
||||
|
||||
def test_update_policy_rule(self):
|
||||
@mock.patch.object(rule_object.QosBandwidthLimitRule, 'update')
|
||||
def test_update_policy_rule(self, mock_qos_rule_update):
|
||||
mock_manager = mock.Mock()
|
||||
mock_manager.attach_mock(mock_qos_rule_update, 'update')
|
||||
mock_manager.attach_mock(self.qos_plugin.driver_manager, 'driver')
|
||||
mock_manager.reset_mock()
|
||||
|
||||
_policy = policy_object.QosPolicy(
|
||||
self.ctxt, **self.policy_data['policy'])
|
||||
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
|
||||
@ -419,6 +473,16 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
self.ctxt, self.rule.id, self.policy.id, self.rule_data)
|
||||
self._validate_driver_params('update_policy')
|
||||
|
||||
rule_update_mock_call = mock.call.update()
|
||||
update_precommit_mock_call = mock.call.driver.call(
|
||||
'update_policy_precommit', self.ctxt, mock.ANY)
|
||||
update_mock_call = mock.call.driver.call(
|
||||
'update_policy', self.ctxt, mock.ANY)
|
||||
self.assertTrue(
|
||||
mock_manager.mock_calls.index(rule_update_mock_call) <
|
||||
mock_manager.mock_calls.index(update_precommit_mock_call) <
|
||||
mock_manager.mock_calls.index(update_mock_call))
|
||||
|
||||
def test_update_policy_rule_check_rule_min_less_than_max(self):
|
||||
_policy = self._get_policy()
|
||||
setattr(_policy, "rules", [self.rule])
|
||||
@ -501,7 +565,13 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
self.ctxt, self.rule.id, self.policy.id,
|
||||
self.rule_data)
|
||||
|
||||
def test_delete_policy_rule(self):
|
||||
@mock.patch.object(rule_object.QosBandwidthLimitRule, 'delete')
|
||||
def test_delete_policy_rule(self, mock_qos_rule_delete):
|
||||
mock_manager = mock.Mock()
|
||||
mock_manager.attach_mock(mock_qos_rule_delete, 'delete')
|
||||
mock_manager.attach_mock(self.qos_plugin.driver_manager, 'driver')
|
||||
mock_manager.reset_mock()
|
||||
|
||||
_policy = policy_object.QosPolicy(
|
||||
self.ctxt, **self.policy_data['policy'])
|
||||
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
|
||||
@ -511,6 +581,16 @@ class TestQosPlugin(base.BaseQosTestCase):
|
||||
self.ctxt, self.rule.id, _policy.id)
|
||||
self._validate_driver_params('update_policy')
|
||||
|
||||
rule_delete_mock_call = mock.call.delete()
|
||||
update_precommit_mock_call = mock.call.driver.call(
|
||||
'update_policy_precommit', self.ctxt, mock.ANY)
|
||||
update_mock_call = mock.call.driver.call(
|
||||
'update_policy', self.ctxt, mock.ANY)
|
||||
self.assertTrue(
|
||||
mock_manager.mock_calls.index(rule_delete_mock_call) <
|
||||
mock_manager.mock_calls.index(update_precommit_mock_call) <
|
||||
mock_manager.mock_calls.index(update_mock_call))
|
||||
|
||||
def test_delete_policy_rule_bad_policy(self):
|
||||
_policy = policy_object.QosPolicy(
|
||||
self.ctxt, **self.policy_data['policy'])
|
||||
|
Loading…
Reference in New Issue
Block a user