Merge "[OVN] Sync QoS policies"

This commit is contained in:
Zuul 2022-02-17 18:10:51 +00:00 committed by Gerrit Code Review
commit 4fe4d25f42
10 changed files with 375 additions and 67 deletions

View File

@ -79,7 +79,7 @@ oslo.versionedobjects==1.35.1
oslotest==3.2.0 oslotest==3.2.0
osprofiler==2.3.0 osprofiler==2.3.0
ovs==2.10.0 ovs==2.10.0
ovsdbapp==1.11.0 ovsdbapp==1.15.0
packaging==20.4 packaging==20.4
Paste==2.0.2 Paste==2.0.2
PasteDeploy==1.5.0 PasteDeploy==1.5.0

View File

@ -200,6 +200,7 @@ def main():
'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin', 'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin',
'neutron.services.segments.plugin.Plugin', 'neutron.services.segments.plugin.Plugin',
'port_forwarding', 'port_forwarding',
'qos'
] ]
else: else:
LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin) LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin)

View File

@ -34,13 +34,20 @@ OVN_QOS_DEFAULT_RULE_PRIORITY = 2002
class OVNClientQosExtension(object): class OVNClientQosExtension(object):
"""OVN client QoS extension""" """OVN client QoS extension"""
def __init__(self, driver): def __init__(self, driver=None, nb_idl=None):
LOG.info('Starting OVNClientQosExtension') LOG.info('Starting OVNClientQosExtension')
super(OVNClientQosExtension, self).__init__() super(OVNClientQosExtension, self).__init__()
self._driver = driver self._driver = driver
self._nb_idl = nb_idl
self._plugin_property = None self._plugin_property = None
self._plugin_l3_property = None self._plugin_l3_property = None
@property
def nb_idl(self):
if not self._nb_idl:
self._nb_idl = self._driver._nb_idl
return self._nb_idl
@property @property
def _plugin(self): def _plugin(self):
if self._plugin_property is None: if self._plugin_property is None:
@ -177,7 +184,8 @@ class OVNClientQosExtension(object):
return ovn_qos_rule return ovn_qos_rule
def _port_effective_qos_policy_id(self, port): @staticmethod
def port_effective_qos_policy_id(port):
"""Return port effective QoS policy """Return port effective QoS policy
If the port does not have any QoS policy reference or is a network If the port does not have any QoS policy reference or is a network
@ -193,30 +201,43 @@ class OVNClientQosExtension(object):
else: else:
return port['qos_network_policy_id'], 'network' return port['qos_network_policy_id'], 'network'
def _update_port_qos_rules(self, txn, port_id, network_id, qos_policy_id, def _delete_port_qos_rules(self, txn, port_id, network_id):
qos_rules):
# NOTE(ralonsoh): we don't use the transaction context because the
# QoS policy could belong to another user (network QoS policy).
admin_context = n_context.get_admin_context()
# Generate generic deletion rules for both directions. In case of # Generate generic deletion rules for both directions. In case of
# creating deletion rules, the rule content is irrelevant. # creating deletion rules, the rule content is irrelevant.
for ovn_rule in [self._ovn_qos_rule(direction, {}, port_id, for ovn_rule in [self._ovn_qos_rule(direction, {}, port_id,
network_id, delete=True) network_id, delete=True)
for direction in constants.VALID_DIRECTIONS]: for direction in constants.VALID_DIRECTIONS]:
txn.add(self._driver._nb_idl.qos_del(**ovn_rule)) txn.add(self.nb_idl.qos_del(**ovn_rule))
if not qos_policy_id: def _add_port_qos_rules(self, txn, port_id, network_id, qos_policy_id,
return # If no QoS policy is defined, there are no QoS rules. qos_rules):
# NOTE(ralonsoh): we don't use the transaction context because the
# QoS policy could belong to another user (network QoS policy).
admin_context = n_context.get_admin_context()
# TODO(ralonsoh): for update_network and update_policy operations, # TODO(ralonsoh): for update_network and update_policy operations,
# the QoS rules can be retrieved only once. # the QoS rules can be retrieved only once.
qos_rules = qos_rules or self._qos_rules(admin_context, qos_policy_id) qos_rules = qos_rules or self._qos_rules(admin_context, qos_policy_id)
for direction, rules in qos_rules.items(): for direction, rules in qos_rules.items():
# "delete=not rule": that means, when we don't have rules, we
# generate a "ovn_rule" to be used as input in a "qos_del" method.
ovn_rule = self._ovn_qos_rule(direction, rules, port_id, ovn_rule = self._ovn_qos_rule(direction, rules, port_id,
network_id) network_id, delete=not rules)
if ovn_rule: if rules:
txn.add(self._driver._nb_idl.qos_add(**ovn_rule)) # NOTE(ralonsoh): with "may_exist=True", the "qos_add" will
# create the QoS OVN rule or update the existing one.
txn.add(self.nb_idl.qos_add(**ovn_rule, may_exist=True))
else:
# Delete, if exists, the QoS rule in this direction.
txn.add(self.nb_idl.qos_del(**ovn_rule, if_exists=True))
def _update_port_qos_rules(self, txn, port_id, network_id, qos_policy_id,
qos_rules):
if not qos_policy_id:
self._delete_port_qos_rules(txn, port_id, network_id)
else:
self._add_port_qos_rules(txn, port_id, network_id, qos_policy_id,
qos_rules)
def create_port(self, txn, port): def create_port(self, txn, port):
self.update_port(txn, port, None, reset=True) self.update_port(txn, port, None, reset=True)
@ -239,9 +260,9 @@ class OVNClientQosExtension(object):
return return
qos_policy_id = (None if delete else qos_policy_id = (None if delete else
self._port_effective_qos_policy_id(port)[0]) self.port_effective_qos_policy_id(port)[0])
if not reset and not delete: if not reset and not delete:
original_qos_policy_id = self._port_effective_qos_policy_id( original_qos_policy_id = self.port_effective_qos_policy_id(
original_port)[0] original_port)[0]
if qos_policy_id == original_qos_policy_id: if qos_policy_id == original_qos_policy_id:
return # No QoS policy change return # No QoS policy change
@ -288,6 +309,13 @@ class OVNClientQosExtension(object):
return updated_port_ids, updated_fip_ids return updated_port_ids, updated_fip_ids
def _delete_fip_qos_rules(self, txn, fip_id, network_id):
if network_id:
lswitch_name = utils.ovn_name(network_id)
txn.add(self.nb_idl.qos_del_ext_ids(
lswitch_name,
{ovn_const.OVN_FIP_EXT_ID_KEY: fip_id}))
def create_floatingip(self, txn, floatingip): def create_floatingip(self, txn, floatingip):
self.update_floatingip(txn, floatingip) self.update_floatingip(txn, floatingip)
@ -295,20 +323,17 @@ class OVNClientQosExtension(object):
router_id = floatingip.get('router_id') router_id = floatingip.get('router_id')
qos_policy_id = (floatingip.get('qos_policy_id') or qos_policy_id = (floatingip.get('qos_policy_id') or
floatingip.get('qos_network_policy_id')) floatingip.get('qos_network_policy_id'))
if floatingip['floating_network_id']:
lswitch_name = utils.ovn_name(floatingip['floating_network_id'])
txn.add(self._driver._nb_idl.qos_del_ext_ids(
lswitch_name,
{ovn_const.OVN_FIP_EXT_ID_KEY: floatingip['id']}))
if not (router_id and qos_policy_id): if not (router_id and qos_policy_id):
return return self._delete_fip_qos_rules(
txn, floatingip['id'], floatingip['floating_network_id'])
admin_context = n_context.get_admin_context() admin_context = n_context.get_admin_context()
router_db = self._plugin_l3._get_router(admin_context, router_id) router_db = self._plugin_l3._get_router(admin_context, router_id)
gw_port_id = router_db.get('gw_port_id') gw_port_id = router_db.get('gw_port_id')
if not gw_port_id: if not gw_port_id:
return return self._delete_fip_qos_rules(
txn, floatingip['id'], floatingip['floating_network_id'])
if ovn_conf.is_ovn_distributed_floating_ip(): if ovn_conf.is_ovn_distributed_floating_ip():
# DVR, floating IP GW is in the same compute node as private port. # DVR, floating IP GW is in the same compute node as private port.
@ -319,13 +344,20 @@ class OVNClientQosExtension(object):
qos_rules = self._qos_rules(admin_context, qos_policy_id) qos_rules = self._qos_rules(admin_context, qos_policy_id)
for direction, rules in qos_rules.items(): for direction, rules in qos_rules.items():
# "delete=not rule": that means, when we don't have rules, we
# generate a "ovn_rule" to be used as input in a "qos_del" method.
ovn_rule = self._ovn_qos_rule( ovn_rule = self._ovn_qos_rule(
direction, rules, gw_port_id, direction, rules, gw_port_id,
floatingip['floating_network_id'], fip_id=floatingip['id'], floatingip['floating_network_id'], fip_id=floatingip['id'],
ip_address=floatingip['floating_ip_address'], ip_address=floatingip['floating_ip_address'],
resident_port=resident_port) resident_port=resident_port, delete=not rules)
if ovn_rule: if rules:
txn.add(self._driver._nb_idl.qos_add(**ovn_rule)) # NOTE(ralonsoh): with "may_exist=True", the "qos_add" will
# create the QoS OVN rule or update the existing one.
txn.add(self.nb_idl.qos_add(**ovn_rule, may_exist=True))
else:
# Delete, if exists, the QoS rule in this direction.
txn.add(self.nb_idl.qos_del(**ovn_rule, if_exists=True))
def delete_floatingip(self, txn, floatingip): def delete_floatingip(self, txn, floatingip):
self.update_floatingip(txn, floatingip) self.update_floatingip(txn, floatingip)
@ -343,7 +375,7 @@ class OVNClientQosExtension(object):
# TODO(ralonsoh): we need to benchmark this transaction in systems with # TODO(ralonsoh): we need to benchmark this transaction in systems with
# a huge amount of ports. This can take a while and could block other # a huge amount of ports. This can take a while and could block other
# operations. # operations.
with self._driver._nb_idl.transaction(check_error=True) as txn: with self.nb_idl.transaction(check_error=True) as txn:
for network_id in bound_networks: for network_id in bound_networks:
network = {'qos_policy_id': policy.id, 'id': network_id} network = {'qos_policy_id': policy.id, 'id': network_id}
port_ids, fip_ids = self.update_network( port_ids, fip_ids = self.update_network(

View File

@ -79,7 +79,7 @@ class OVNClient(object):
self._l3_plugin_property = None self._l3_plugin_property = None
# TODO(ralonsoh): handle the OVN client extensions with an ext. manager # TODO(ralonsoh): handle the OVN client extensions with an ext. manager
self._qos_driver = qos_extension.OVNClientQosExtension(self) self._qos_driver = qos_extension.OVNClientQosExtension(driver=self)
self._placement_extension = ( self._placement_extension = (
placement_extension.OVNClientPlacementExtension(self)) placement_extension.OVNClientPlacementExtension(self))
self._ovn_scheduler = l3_ovn_scheduler.get_scheduler() self._ovn_scheduler = l3_ovn_scheduler.get_scheduler()

View File

@ -31,6 +31,8 @@ from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron import manager from neutron import manager
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import qos \
as ovn_qos
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.services.segments import db as segments_db from neutron.services.segments import db as segments_db
@ -104,6 +106,8 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
self.sync_acls(ctx) self.sync_acls(ctx)
self.sync_routers_and_rports(ctx) self.sync_routers_and_rports(ctx)
self.migrate_to_stateless_fips(ctx) self.migrate_to_stateless_fips(ctx)
self.sync_port_qos_policies(ctx)
self.sync_fip_qos_policies(ctx)
def _create_port_in_ovn(self, ctx, port): def _create_port_in_ovn(self, ctx, port):
# Remove any old ACLs for the port to avoid creating duplicate ACLs. # Remove any old ACLs for the port to avoid creating duplicate ACLs.
@ -1217,6 +1221,36 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
LOG.debug('Port Groups Migration task finished') LOG.debug('Port Groups Migration task finished')
def sync_port_qos_policies(self, ctx):
"""Sync port QoS policies.
This method reads the port QoS policy assigned or the one inherited
from the network. Does not apply to "network" owned ports.
"""
LOG.debug('Port QoS policies migration task started')
ovn_qos_ext = ovn_qos.OVNClientQosExtension(nb_idl=self.ovn_api)
with db_api.CONTEXT_READER.using(ctx), \
self.ovn_api.transaction(check_error=True) as txn:
for port in self.core_plugin.get_ports(ctx):
if not ovn_qos_ext.port_effective_qos_policy_id(port)[0]:
continue
ovn_qos_ext.create_port(txn, port)
LOG.debug('Port QoS policies migration task finished')
def sync_fip_qos_policies(self, ctx):
"""Sync floating IP QoS policies."""
LOG.debug('Floating IP QoS policies migration task started')
ovn_qos_ext = ovn_qos.OVNClientQosExtension(nb_idl=self.ovn_api)
with db_api.CONTEXT_READER.using(ctx), \
self.ovn_api.transaction(check_error=True) as txn:
for fip in self.l3_plugin.get_floatingips(ctx):
if not fip.get('qos_policy_id'):
continue
ovn_qos_ext.create_floatingip(txn, fip)
LOG.debug('Floating IP QoS policies migration task finished')
class OvnSbSynchronizer(OvnDbSynchronizer): class OvnSbSynchronizer(OvnDbSynchronizer):
"""Synchronizer class for SB.""" """Synchronizer class for SB."""

View File

@ -46,25 +46,18 @@ QOS_RULES_2 = {
QOS_RULES_3 = { QOS_RULES_3 = {
constants.INGRESS_DIRECTION: { constants.INGRESS_DIRECTION: {
qos_constants.RULE_TYPE_BANDWIDTH_LIMIT: QOS_RULE_BW_1, qos_constants.RULE_TYPE_BANDWIDTH_LIMIT: QOS_RULE_BW_1}
qos_constants.RULE_TYPE_DSCP_MARKING: QOS_RULE_DSCP_1}
} }
class _OVNClient(object):
def __init__(self, nd_idl):
self._nb_idl = nd_idl
class TestOVNClientQosExtension(base.TestOVNFunctionalBase): class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def setUp(self, maintenance_worker=False): def setUp(self, maintenance_worker=False):
super(TestOVNClientQosExtension, self).setUp( super(TestOVNClientQosExtension, self).setUp(
maintenance_worker=maintenance_worker) maintenance_worker=maintenance_worker)
self._add_logical_switch() self._add_logical_switch()
_ovn_client = _OVNClient(self.nb_api) self.qos_driver = qos_extension.OVNClientQosExtension(
self.qos_driver = qos_extension.OVNClientQosExtension(_ovn_client) nb_idl=self.nb_api)
self.gw_port_id = 'gw_port_id' self.gw_port_id = 'gw_port_id'
self._mock_get_router = mock.patch.object(l3_db.L3_NAT_dbonly_mixin, self._mock_get_router = mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_get_router') '_get_router')
@ -93,7 +86,7 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
fip_id=fip_id, ip_address=ip_address) fip_id=fip_id, ip_address=ip_address)
with self.nb_api.transaction(check_error=True): with self.nb_api.transaction(check_error=True):
ls = self.qos_driver._driver._nb_idl.lookup( ls = self.qos_driver.nb_idl.lookup(
'Logical_Switch', ovn_utils.ovn_name(self.network_1)) 'Logical_Switch', ovn_utils.ovn_name(self.network_1))
self.assertEqual(len(rules), len(ls.qos_rules)) self.assertEqual(len(rules), len(ls.qos_rules))
for rule in ls.qos_rules: for rule in ls.qos_rules:
@ -116,7 +109,10 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def update_and_check(qos_rules): def update_and_check(qos_rules):
with self.nb_api.transaction(check_error=True) as txn: with self.nb_api.transaction(check_error=True) as txn:
self.mock_qos_rules.return_value = qos_rules _qos_rules = copy.deepcopy(qos_rules)
for direction in constants.VALID_DIRECTIONS:
_qos_rules[direction] = _qos_rules.get(direction, {})
self.mock_qos_rules.return_value = _qos_rules
self.qos_driver._update_port_qos_rules( self.qos_driver._update_port_qos_rules(
txn, port, self.network_1, 'qos1', None) txn, port, self.network_1, 'qos1', None)
self._check_rules(qos_rules, port, self.network_1) self._check_rules(qos_rules, port, self.network_1)
@ -128,7 +124,10 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def _update_fip_and_check(self, fip, qos_rules): def _update_fip_and_check(self, fip, qos_rules):
with self.nb_api.transaction(check_error=True) as txn: with self.nb_api.transaction(check_error=True) as txn:
self.mock_qos_rules.return_value = qos_rules _qos_rules = copy.deepcopy(qos_rules)
for direction in constants.VALID_DIRECTIONS:
_qos_rules[direction] = _qos_rules.get(direction, {})
self.mock_qos_rules.return_value = _qos_rules
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
self._check_rules(qos_rules, self.gw_port_id, self.network_1, self._check_rules(qos_rules, self.gw_port_id, self.network_1,
fip_id='fip_id', ip_address='1.2.3.4') fip_id='fip_id', ip_address='1.2.3.4')

View File

@ -15,18 +15,6 @@
from collections import namedtuple from collections import namedtuple
import netaddr import netaddr
from neutron.common.ovn import acl as acl_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.services.portforwarding.drivers.ovn.driver import \
OVNPortForwarding as ovn_pf
from neutron.services.segments import db as segments_db
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib.api.definitions import dns as dns_apidef from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import fip_pf_description as ext_pf_def from neutron_lib.api.definitions import fip_pf_description as ext_pf_def
from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def
@ -34,16 +22,33 @@ from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import port_security as ps from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import context from neutron_lib import context
from neutron_lib.services.qos import constants as qos_const
from oslo_utils import uuidutils from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import idlutils from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import constants as ovsdbapp_const from ovsdbapp import constants as ovsdbapp_const
from neutron.common.ovn import acl as acl_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions \
import qos as qos_extension
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.services.portforwarding.drivers.ovn.driver import \
OVNPortForwarding as ovn_pf
from neutron.services.revisions import revision_plugin
from neutron.services.segments import db as segments_db
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
class TestOvnNbSync(base.TestOVNFunctionalBase): class TestOvnNbSync(base.TestOVNFunctionalBase):
_extension_drivers = ['port_security', 'dns'] _extension_drivers = ['port_security', 'dns', 'qos', 'revision_plugin']
def setUp(self): def setUp(self, *args):
ovn_config.cfg.CONF.set_override('dns_domain', 'ovn.test') ovn_config.cfg.CONF.set_override('dns_domain', 'ovn.test')
super(TestOvnNbSync, self).setUp(maintenance_worker=True) super(TestOvnNbSync, self).setUp(maintenance_worker=True)
ext_mgr = test_extraroute.ExtraRouteTestExtensionManager() ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
@ -83,11 +88,18 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.match_old_mac_dhcp_subnets = [] self.match_old_mac_dhcp_subnets = []
self.expected_dns_records = [] self.expected_dns_records = []
self.expected_ports_with_unknown_addr = [] self.expected_ports_with_unknown_addr = []
self.expected_qos_records = []
self.ctx = context.get_admin_context() self.ctx = context.get_admin_context()
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True, ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn') group='ovn')
ovn_config.cfg.CONF.set_override( ovn_config.cfg.CONF.set_override(
'enable_distributed_floating_ip', True, group='ovn') 'enable_distributed_floating_ip', True, group='ovn')
self.rp = revision_plugin.RevisionPlugin()
self.qos_driver = qos_extension.OVNClientQosExtension(
nb_idl=self.nb_api)
def get_additional_service_plugins(self):
return {'qos': 'qos', 'segments': 'segments'}
def _api_for_resource(self, resource): def _api_for_resource(self, resource):
if resource in ['security-groups']: if resource in ['security-groups']:
@ -1499,6 +1511,20 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.assertRaises(AssertionError, self.assertCountEqual, self.assertRaises(AssertionError, self.assertCountEqual,
self.expected_dns_records, observed_dns_records) self.expected_dns_records, observed_dns_records)
def _validate_qos_records(self, should_match=True):
observed_qos_records = []
for qos_row in self.nb_api.tables['QoS'].rows.values():
observed_qos_records.append({
'action': qos_row.action, 'bandwidth': qos_row.bandwidth,
'direction': qos_row.direction, 'match': qos_row.match,
'external_ids': qos_row.external_ids})
if should_match:
self.assertCountEqual(self.expected_qos_records,
observed_qos_records)
else:
self.assertEqual([], observed_qos_records)
def _validate_resources(self, should_match=True): def _validate_resources(self, should_match=True):
self._validate_networks(should_match=should_match) self._validate_networks(should_match=should_match)
self._validate_metadata_ports(should_match=should_match) self._validate_metadata_ports(should_match=should_match)
@ -1553,6 +1579,152 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
def test_ovn_nb_sync_off(self): def test_ovn_nb_sync_off(self):
self._test_ovn_nb_sync_helper('off', should_match_after_sync=False) self._test_ovn_nb_sync_helper('off', should_match_after_sync=False)
def test_sync_port_qos_policies(self):
res = self._create_network(self.fmt, 'n1', True)
net = self.deserialize(self.fmt, res)['network']
self._create_subnet(self.fmt, net['id'], '10.0.0.0/24')
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_maxbw = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
max_kbps=1000, max_burst_kbps=800)
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
direction=constants.INGRESS_DIRECTION,
max_kbps=700, max_burst_kbps=600)
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_dscp = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_dscp['id'],
qos_const.RULE_TYPE_DSCP_MARKING, dscp_mark=14)
res = self._create_port(
self.fmt, net['id'], arg_list=('qos_policy_id', ),
name='n1-port1', device_owner='compute:nova',
qos_policy_id=qos_maxbw['id'])
port_1 = self.deserialize(self.fmt, res)['port']
res = self._create_port(
self.fmt, net['id'], arg_list=('qos_policy_id', ),
name='n1-port2', device_owner='compute:nova',
qos_policy_id=qos_dscp['id'])
port_2 = self.deserialize(self.fmt, res)['port']
# Check QoS policies have been correctly created in OVN DB.
self.expected_qos_records = [
{'action': {}, 'bandwidth': {'burst': 800, 'rate': 1000},
'direction': 'from-lport',
'match': 'inport == "%s"' % port_1['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_1['id']}},
{'action': {}, 'bandwidth': {'burst': 600, 'rate': 700},
'direction': 'to-lport',
'match': 'outport == "%s"' % port_1['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_1['id']}},
{'action': {'dscp': 14}, 'bandwidth': {},
'direction': 'from-lport',
'match': 'inport == "%s"' % port_2['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_2['id']}}]
self._validate_qos_records()
# Delete QoS policies from the OVN DB.
with self.nb_api.transaction(check_error=True) as txn:
for port in (port_1, port_2):
for ovn_rule in [self.qos_driver._ovn_qos_rule(
direction, {}, port['id'], port['network_id'],
delete=True)
for direction in constants.VALID_DIRECTIONS]:
txn.add(self.nb_api.qos_del(**ovn_rule))
self._validate_qos_records(should_match=False)
# Manually sync port QoS registers.
nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
self.plugin, self.mech_driver.nb_ovn, self.mech_driver.sb_ovn,
'log', self.mech_driver)
ctx = context.get_admin_context()
nb_synchronizer.sync_port_qos_policies(ctx)
self._validate_qos_records()
def _create_floatingip(self, fip_network_id, port_id, qos_policy_id):
body = {'tenant_id': self._tenant_id,
'floating_network_id': fip_network_id,
'port_id': port_id,
'qos_policy_id': qos_policy_id}
return self.l3_plugin.create_floatingip(self.context,
{'floatingip': body})
def test_sync_fip_qos_policies(self):
res = self._create_network(self.fmt, 'n1_ext', True,
arg_list=('router:external', ),
**{'router:external': True})
net_ext = self.deserialize(self.fmt, res)['network']
res = self._create_subnet(self.fmt, net_ext['id'], '10.0.0.0/24')
subnet_ext = self.deserialize(self.fmt, res)['subnet']
res = self._create_network(self.fmt, 'n1_int', True)
net_int = self.deserialize(self.fmt, res)['network']
self._create_subnet(self.fmt, net_int['id'], '10.10.0.0/24')
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_maxbw = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
max_kbps=1000, max_burst_kbps=800)
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
direction=constants.INGRESS_DIRECTION,
max_kbps=700, max_burst_kbps=600)
# Create a router with net_ext as GW network and net_int as internal
# one, and a floating IP on the external network.
data = {'name': 'r1', 'admin_state_up': True,
'tenant_id': self._tenant_id,
'external_gateway_info': {
'enable_snat': True,
'network_id': net_ext['id'],
'external_fixed_ips': [{'ip_address': '10.0.0.5',
'subnet_id': subnet_ext['id']}]}
}
router = self.l3_plugin.create_router(self.context, {'router': data})
net_int_prtr = self._make_port(self.fmt, net_int['id'],
name='n1_int-p-rtr')['port']
self.l3_plugin.add_router_interface(
self.context, router['id'], {'port_id': net_int_prtr['id']})
fip = self._create_floatingip(net_ext['id'], net_int_prtr['id'],
qos_maxbw['id'])
# Check QoS policies have been correctly created in OVN DB.
fip_match = ('%s == "%s" && ip4.%s == %s && '
'is_chassis_resident("%s")')
self.expected_qos_records = [
{'action': {}, 'bandwidth': {'burst': 600, 'rate': 700},
'direction': 'to-lport',
'external_ids': {'neutron:fip_id': fip['id']},
'match': fip_match % ('outport', router['gw_port_id'], 'dst',
fip['floating_ip_address'],
net_int_prtr['id'])},
{'action': {}, 'bandwidth': {'burst': 800, 'rate': 1000},
'direction': 'from-lport',
'external_ids': {'neutron:fip_id': fip['id']},
'match': fip_match % ('inport', router['gw_port_id'], 'src',
fip['floating_ip_address'],
net_int_prtr['id'])}]
self._validate_qos_records()
# Delete QoS policies from the OVN DB.
with self.nb_api.transaction(check_error=True) as txn:
lswitch_name = utils.ovn_name(net_ext['id'])
txn.add(self.nb_api.qos_del_ext_ids(
lswitch_name, {ovn_const.OVN_FIP_EXT_ID_KEY: fip['id']}))
self._validate_qos_records(should_match=False)
# Manually sync port QoS registers.
nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
self.plugin, self.mech_driver.nb_ovn, self.mech_driver.sb_ovn,
'log', self.mech_driver)
ctx = context.get_admin_context()
nb_synchronizer.sync_fip_qos_policies(ctx)
self._validate_qos_records()
class TestOvnSbSync(base.TestOVNFunctionalBase): class TestOvnSbSync(base.TestOVNFunctionalBase):

View File

@ -31,6 +31,7 @@ from neutron_lib.db import standard_attr
from neutron_lib import exceptions as lib_exc from neutron_lib import exceptions as lib_exc
from neutron_lib import fixture from neutron_lib import fixture
from neutron_lib.plugins import directory from neutron_lib.plugins import directory
from neutron_lib.services.qos import constants as qos_const
from neutron_lib.tests import tools from neutron_lib.tests import tools
from neutron_lib.utils import helpers from neutron_lib.utils import helpers
from neutron_lib.utils import net from neutron_lib.utils import net
@ -590,6 +591,58 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
raise webob.exc.HTTPClientError(code=res.status_int) raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res) return self.deserialize(fmt, res)
def _create_qos_rule(self, fmt, qos_policy_id, rule_type, max_kbps=None,
max_burst_kbps=None, dscp_mark=None, min_kbps=None,
direction=constants.EGRESS_DIRECTION,
expected_res_status=None, project_id=None,
set_context=False, is_admin=False):
# Accepted rule types: "bandwidth_limit", "dscp_marking" and
# "minimum_bandwidth"
self.assertIn(rule_type, [qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
qos_const.RULE_TYPE_DSCP_MARKING,
qos_const.RULE_TYPE_MINIMUM_BANDWIDTH])
project_id = project_id or self._tenant_id
type_req = rule_type + '_rule'
data = {type_req: {'project_id': project_id}}
if rule_type == qos_const.RULE_TYPE_BANDWIDTH_LIMIT:
data[type_req][qos_const.MAX_KBPS] = max_kbps
data[type_req][qos_const.MAX_BURST] = max_burst_kbps
data[type_req][qos_const.DIRECTION] = direction
elif rule_type == qos_const.RULE_TYPE_DSCP_MARKING:
data[type_req][qos_const.DSCP_MARK] = dscp_mark
else:
data[type_req][qos_const.MIN_KBPS] = min_kbps
data[type_req][qos_const.DIRECTION] = direction
route = 'qos/policies/%s/%s' % (qos_policy_id, type_req + 's')
qos_rule_req = self.new_create_request(route, data, fmt)
if set_context and project_id:
# create a specific auth context for this request
qos_rule_req.environ['neutron.context'] = context.Context(
'', project_id, is_admin=is_admin)
qos_rule_res = qos_rule_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, qos_rule_res.status_int)
return qos_rule_res
def _create_qos_policy(self, fmt, qos_policy_name=None,
expected_res_status=None, project_id=None,
set_context=False, is_admin=False):
project_id = project_id or self._tenant_id
name = qos_policy_name or uuidutils.generate_uuid()
data = {'policy': {'name': name,
'project_id': project_id}}
qos_req = self.new_create_request('policies', data, fmt)
if set_context and project_id:
# create a specific auth context for this request
qos_req.environ['neutron.context'] = context.Context(
'', project_id, is_admin=is_admin)
qos_policy_res = qos_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, qos_policy_res.status_int)
return qos_policy_res
def _api_for_resource(self, resource): def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools', if resource in ['networks', 'subnets', 'ports', 'subnetpools',
'security-groups']: 'security-groups']:

View File

@ -75,7 +75,8 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.txn = _Context() self.txn = _Context()
mock_driver = mock.Mock() mock_driver = mock.Mock()
mock_driver._nb_idl.transaction.return_value = self.txn mock_driver._nb_idl.transaction.return_value = self.txn
self.qos_driver = qos_extension.OVNClientQosExtension(mock_driver) self.qos_driver = qos_extension.OVNClientQosExtension(
driver=mock_driver)
self._mock_rules = mock.patch.object(self.qos_driver, self._mock_rules = mock.patch.object(self.qos_driver,
'_update_port_qos_rules') '_update_port_qos_rules')
self.mock_rules = self._mock_rules.start() self.mock_rules = self._mock_rules.start()
@ -246,28 +247,28 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
def test__port_effective_qos_policy_id(self): def test__port_effective_qos_policy_id(self):
port = {'qos_policy_id': 'qos1'} port = {'qos_policy_id': 'qos1'}
self.assertEqual(('qos1', 'port'), self.assertEqual(('qos1', 'port'),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_network_policy_id': 'qos1'} port = {'qos_network_policy_id': 'qos1'}
self.assertEqual(('qos1', 'network'), self.assertEqual(('qos1', 'network'),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': 'qos_port', port = {'qos_policy_id': 'qos_port',
'qos_network_policy_id': 'qos_network'} 'qos_network_policy_id': 'qos_network'}
self.assertEqual(('qos_port', 'port'), self.assertEqual(('qos_port', 'port'),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
port = {} port = {}
self.assertEqual((None, None), self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': None, 'qos_network_policy_id': None} port = {'qos_policy_id': None, 'qos_network_policy_id': None}
self.assertEqual((None, None), self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': 'qos1', 'device_owner': 'neutron:port'} port = {'qos_policy_id': 'qos1', 'device_owner': 'neutron:port'}
self.assertEqual((None, None), self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port)) self.qos_driver.port_effective_qos_policy_id(port))
def test_update_port(self): def test_update_port(self):
port = self.ports[0] port = self.ports[0]
@ -512,6 +513,11 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
mock_update_fip.assert_called_once_with(self.txn, fip) mock_update_fip.assert_called_once_with(self.txn, fip)
def test_update_floatingip(self): def test_update_floatingip(self):
# NOTE(ralonsoh): this rule will always apply:
# - If the FIP is being deleted, "qos_del_ext_ids" is called;
# "qos_add" and "qos_del" won't.
# - If the FIP is added or updated, "qos_del_ext_ids" won't be called
# and "qos_add" or "qos_del" will, depending on the rule directions.
nb_idl = self.qos_driver._driver._nb_idl nb_idl = self.qos_driver._driver._nb_idl
fip = self.fips[0] fip = self.fips[0]
original_fip = self.fips[1] original_fip = self.fips[1]
@ -521,6 +527,7 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called() nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock() nb_idl.reset_mock()
# Attach a port and a router, not QoS policy # Attach a port and a router, not QoS policy
@ -530,14 +537,19 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called() nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock() nb_idl.reset_mock()
# Add a QoS policy # Add a QoS policy
fip.qos_policy_id = self.qos_policies[0].id fip.qos_policy_id = self.qos_policies[0].id
fip.update() fip.update()
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_not_called()
# QoS DSCP rule has only egress direction, ingress one is deleted.
# Check "OVNClientQosExtension.update_floatingip" and how the OVN QoS
# rules are added (if there is a rule in this direction) or deleted.
nb_idl.qos_add.assert_called_once() nb_idl.qos_add.assert_called_once()
nb_idl.qos_del.assert_called_once()
nb_idl.reset_mock() nb_idl.reset_mock()
# Remove QoS # Remove QoS
@ -548,14 +560,16 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called() nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock() nb_idl.reset_mock()
# Add network QoS policy # Add network QoS policy
fip.qos_network_policy_id = self.qos_policies[0].id fip.qos_network_policy_id = self.qos_policies[0].id
fip.update() fip.update()
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_not_called()
nb_idl.qos_add.assert_called_once() nb_idl.qos_add.assert_called_once()
nb_idl.qos_del.assert_called_once()
nb_idl.reset_mock() nb_idl.reset_mock()
# Add again another QoS policy # Add again another QoS policy
@ -564,8 +578,9 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
original_fip.qos_policy_id = None original_fip.qos_policy_id = None
original_fip.update() original_fip.update()
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_not_called()
nb_idl.qos_add.assert_called_once() nb_idl.qos_add.assert_called_once()
nb_idl.qos_del.assert_called_once()
nb_idl.reset_mock() nb_idl.reset_mock()
# Detach the port and the router # Detach the port and the router
@ -579,6 +594,7 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip) self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called() nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock() nb_idl.reset_mock()
# Force reset (delete any QoS) # Force reset (delete any QoS)
@ -587,3 +603,4 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip_dict) self.qos_driver.update_floatingip(txn, fip_dict)
nb_idl.qos_del_ext_ids.assert_called_once() nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called() nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()

View File

@ -46,7 +46,7 @@ osprofiler>=2.3.0 # Apache-2.0
os-ken>=2.2.0 # Apache-2.0 os-ken>=2.2.0 # Apache-2.0
os-resource-classes>=1.1.0 # Apache-2.0 os-resource-classes>=1.1.0 # Apache-2.0
ovs>=2.10.0 # Apache-2.0 ovs>=2.10.0 # Apache-2.0
ovsdbapp>=1.11.0 # Apache-2.0 ovsdbapp>=1.15.0 # Apache-2.0
packaging>=20.4 # Apache-2.0 packaging>=20.4 # Apache-2.0
psutil>=5.3.0 # BSD psutil>=5.3.0 # BSD
pyroute2>=0.6.4;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2) pyroute2>=0.6.4;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2)