Merge "Support pps limitation for openvswitch agent"
This commit is contained in:
commit
0e40dfe862
@ -1,4 +1,5 @@
|
||||
# Copyright (c) 2015 OpenStack Foundation
|
||||
# Copyright (c) 2021-2022 Chinaunicom
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -13,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import random
|
||||
|
||||
from neutron_lib import constants
|
||||
from neutron_lib.services.qos import constants as qos_consts
|
||||
@ -25,8 +27,242 @@ from neutron.services.qos.drivers.openvswitch import driver
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
MAX_RETIES = 1000
|
||||
|
||||
class QosOVSAgentDriver(qos.QosLinuxAgentDriver):
|
||||
|
||||
class MeterRuleManager(object):
|
||||
# This cache will be:
|
||||
# PORT_METER_ID = {"port_id_1_ingress": 1,
|
||||
# "port_id_1_egress: 2,
|
||||
# "port_id_2_ingress": 3,
|
||||
# "port_id_2_egress: 4}
|
||||
PORT_METER_ID = {}
|
||||
# This will be:
|
||||
# PORT_INFO_INGRESS = {"port_id_1": (of_port_name, mac_1, local_vlan_1),
|
||||
# "port_id_2": (of_port_name, mac_2, local_vlan_2),
|
||||
PORT_INFO_INGRESS = {}
|
||||
# PORT_INFO_EGRESS = {"port_id_1": (of_port_name, mac_1, of_port_1),
|
||||
# "port_id_2": (of_port_name, mac_2, of_port_2),
|
||||
PORT_INFO_EGRESS = {}
|
||||
|
||||
def __init__(self, br_int):
|
||||
self.br_int = br_int
|
||||
self._init_max_meter_id()
|
||||
|
||||
def _init_max_meter_id(self):
|
||||
self.max_meter = 0
|
||||
features = self.br_int.list_meter_features()
|
||||
for f in features:
|
||||
if f["max_meter"] > 0:
|
||||
self.max_meter = f["max_meter"]
|
||||
break
|
||||
|
||||
def get_data_key(self, port_id, direction):
|
||||
return "%s_%s" % (port_id, direction)
|
||||
|
||||
def load_port_meter_id(self, port_name, port_id, direction):
|
||||
key = self.get_data_key(port_id, direction)
|
||||
meter_id = self.br_int.get_value_from_other_config(
|
||||
port_name, key, value_type=int)
|
||||
if meter_id:
|
||||
self.PORT_METER_ID[key] = meter_id
|
||||
else:
|
||||
LOG.warning("Failed to load port %(port)s meter id in "
|
||||
"direction %(direction)s",
|
||||
{"direction": direction,
|
||||
"port": port_id})
|
||||
return meter_id
|
||||
|
||||
def store_port_meter_id_to_ovsdb(self, port_name, port_id,
|
||||
direction, meter_id):
|
||||
key = self.get_data_key(port_id, direction)
|
||||
self.br_int.set_value_to_other_config(
|
||||
port_name, key, meter_id)
|
||||
|
||||
def clean_port_meter_id_from_ovsdb(self, port_name, port_id, direction):
|
||||
key = self.get_data_key(port_id, direction)
|
||||
self.br_int.remove_value_from_other_config(
|
||||
port_name, key)
|
||||
|
||||
def generate_meter_id(self):
|
||||
if self.max_meter <= 0:
|
||||
return
|
||||
used_meter_ids = self.PORT_METER_ID.values()
|
||||
cid = None
|
||||
times = 0
|
||||
while not cid or cid in used_meter_ids:
|
||||
cid = random.randint(1, self.max_meter)
|
||||
times += 1
|
||||
if times >= MAX_RETIES:
|
||||
LOG.warning("Failed to allocate meter "
|
||||
"id after %d retries", times)
|
||||
return
|
||||
return cid
|
||||
|
||||
def allocate_meter_id(self, port_id, direction):
|
||||
meter_id = self.generate_meter_id()
|
||||
if not meter_id:
|
||||
return
|
||||
key = self.get_data_key(port_id, direction)
|
||||
self.PORT_METER_ID[key] = meter_id
|
||||
return meter_id
|
||||
|
||||
def remove_port_meter_id(self, port_id, direction):
|
||||
key = self.get_data_key(port_id, direction)
|
||||
return self.PORT_METER_ID.pop(key, None)
|
||||
|
||||
def set_port_info_ingress(self, port_id, port_name, mac, vlan):
|
||||
self.PORT_INFO_INGRESS[port_id] = (port_name, mac, vlan)
|
||||
|
||||
def remove_port_info_ingress(self, port_id):
|
||||
return self.PORT_INFO_INGRESS.pop(port_id, (None, None, None))
|
||||
|
||||
def set_port_info_egress(self, port_id, port_name, mac, ofport):
|
||||
self.PORT_INFO_EGRESS[port_id] = (port_name, mac, ofport)
|
||||
|
||||
def remove_port_info_egress(self, port_id):
|
||||
return self.PORT_INFO_EGRESS.pop(port_id, (None, None, None))
|
||||
|
||||
|
||||
class OVSPacketRatelimitDriver(object):
|
||||
|
||||
SUPPORT_METER = None
|
||||
|
||||
def check_meter_features(self):
|
||||
features = self.br_int.list_meter_features()
|
||||
for f in features:
|
||||
if (f["max_meter"] != 0 and f["band_types"] != 0 and
|
||||
f["capabilities"] != 0 and f["max_bands"] != 0):
|
||||
return True
|
||||
return False
|
||||
|
||||
@property
|
||||
def support_meter(self):
|
||||
if self.SUPPORT_METER is None:
|
||||
self.SUPPORT_METER = self.check_meter_features()
|
||||
return self.SUPPORT_METER
|
||||
|
||||
def create_packet_rate_limit(self, port, rule):
|
||||
self.update_packet_rate_limit(port, rule)
|
||||
|
||||
def update_packet_rate_limit(self, port, rule):
|
||||
if not self.support_meter:
|
||||
LOG.debug("Meter feature is not supported by ovs %s bridge",
|
||||
self.br_int.br_name)
|
||||
return
|
||||
|
||||
LOG.debug("Update packet rate limit for port: %s", port)
|
||||
vif_port = port.get('vif_port')
|
||||
if not vif_port:
|
||||
port_id = port.get('port_id')
|
||||
LOG.debug("update_packet_rate_limit was received for port %s but "
|
||||
"vif_port was not found. It seems that port is already "
|
||||
"deleted", port_id)
|
||||
return
|
||||
self.ports[port['port_id']][(qos_consts.RULE_TYPE_PACKET_RATE_LIMIT,
|
||||
rule.direction)] = port
|
||||
|
||||
self._update_packet_rate_limit(vif_port, rule, rule.direction)
|
||||
|
||||
def delete_packet_rate_limit(self, port):
|
||||
if not self.support_meter:
|
||||
LOG.debug("Meter feature is not supported by ovs bridge")
|
||||
return
|
||||
self._delete_packet_rate_limit(port, constants.EGRESS_DIRECTION)
|
||||
|
||||
def delete_packet_rate_limit_ingress(self, port):
|
||||
if not self.support_meter:
|
||||
LOG.debug("Meter feature is not supported by ovs bridge")
|
||||
return
|
||||
self._delete_packet_rate_limit(port, constants.INGRESS_DIRECTION)
|
||||
|
||||
def _delete_packet_rate_limit(self, port, direction):
|
||||
port_id = port.get('port_id')
|
||||
LOG.debug("Delete %(direction)s packet rate limit for port %(port)s.",
|
||||
{"direction": direction,
|
||||
"port": port_id})
|
||||
|
||||
port = self.ports[port_id].pop(
|
||||
(qos_consts.RULE_TYPE_PACKET_RATE_LIMIT, direction), None)
|
||||
|
||||
meter_id = self.meter_cache.remove_port_meter_id(
|
||||
port_id, direction)
|
||||
|
||||
if direction == constants.INGRESS_DIRECTION:
|
||||
port_name, mac, local_vlan = (
|
||||
self.meter_cache.remove_port_info_ingress(port_id))
|
||||
if mac is not None and local_vlan is not None:
|
||||
self.br_int.remove_meter_from_port(
|
||||
direction, mac, local_vlan=local_vlan)
|
||||
if port_name is not None:
|
||||
self.meter_cache.clean_port_meter_id_from_ovsdb(
|
||||
port_name, port_id, direction)
|
||||
else:
|
||||
port_name, mac, ofport = (
|
||||
self.meter_cache.remove_port_info_egress(port_id))
|
||||
if mac is not None and ofport is not None:
|
||||
self.br_int.remove_meter_from_port(
|
||||
direction, mac, in_port=ofport)
|
||||
if port_name is not None:
|
||||
self.meter_cache.clean_port_meter_id_from_ovsdb(
|
||||
port_name, port_id, direction)
|
||||
|
||||
if meter_id:
|
||||
self.br_int.delete_meter(meter_id)
|
||||
|
||||
def _update_packet_rate_limit(self, vif_port, rule, direction):
|
||||
port_name = vif_port.port_name
|
||||
max_kpps = rule.max_kpps * 1000
|
||||
max_burst_kpps = rule.max_burst_kpps * 1000 or 0
|
||||
LOG.debug("Update port %(port)s %(direction)s packet rate limit "
|
||||
"with rate: %(rate)s, burst: %(burst)s",
|
||||
{"port": vif_port.vif_id,
|
||||
"direction": direction,
|
||||
"rate": rule.max_kpps,
|
||||
"burst": rule.max_burst_kpps})
|
||||
|
||||
meter_id = self.meter_cache.load_port_meter_id(
|
||||
port_name, vif_port.vif_id, direction)
|
||||
if not meter_id:
|
||||
meter_id = self.meter_cache.allocate_meter_id(
|
||||
vif_port.vif_id, direction)
|
||||
if not meter_id:
|
||||
LOG.warning("Failed to retrieve and re-allocate meter id, "
|
||||
"skipping updating port %(port)s "
|
||||
"%(direction)s packet rate limit",
|
||||
{"port": vif_port.vif_id,
|
||||
"direction": direction})
|
||||
return
|
||||
self.meter_cache.store_port_meter_id_to_ovsdb(
|
||||
port_name, vif_port.vif_id, direction, meter_id)
|
||||
|
||||
try:
|
||||
self.br_int.create_meter(meter_id, max_kpps,
|
||||
burst=max_burst_kpps)
|
||||
except Exception:
|
||||
self.br_int.update_meter(meter_id, max_kpps,
|
||||
burst=max_burst_kpps)
|
||||
|
||||
local_vlan = self.br_int.get_port_tag_by_name(port_name)
|
||||
|
||||
if direction == constants.INGRESS_DIRECTION:
|
||||
self.meter_cache.set_port_info_ingress(
|
||||
vif_port.vif_id,
|
||||
port_name, vif_port.vif_mac, local_vlan)
|
||||
self.br_int.apply_meter_to_port(
|
||||
meter_id, direction, vif_port.vif_mac,
|
||||
local_vlan=local_vlan)
|
||||
else:
|
||||
self.meter_cache.set_port_info_egress(
|
||||
vif_port.vif_id,
|
||||
port_name, vif_port.vif_mac, vif_port.ofport)
|
||||
self.br_int.apply_meter_to_port(
|
||||
meter_id, direction, vif_port.vif_mac,
|
||||
in_port=vif_port.ofport)
|
||||
|
||||
|
||||
class QosOVSAgentDriver(qos.QosLinuxAgentDriver,
|
||||
OVSPacketRatelimitDriver):
|
||||
|
||||
SUPPORTED_RULES = driver.SUPPORTED_RULES
|
||||
|
||||
@ -56,6 +292,7 @@ class QosOVSAgentDriver(qos.QosLinuxAgentDriver):
|
||||
self.br_int = self.agent_api.request_int_br()
|
||||
self.cookie = self.br_int.default_cookie
|
||||
self._qos_bandwidth_initialize()
|
||||
self.meter_cache = MeterRuleManager(self.br_int)
|
||||
|
||||
def create_bandwidth_limit(self, port, rule):
|
||||
self.update_bandwidth_limit(port, rule)
|
||||
|
@ -36,6 +36,14 @@ SUPPORTED_RULES = {
|
||||
qos_consts.DIRECTION: {
|
||||
'type:values': constants.VALID_DIRECTIONS}
|
||||
},
|
||||
qos_consts.RULE_TYPE_PACKET_RATE_LIMIT: {
|
||||
qos_consts.MAX_KPPS: {
|
||||
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
|
||||
qos_consts.MAX_BURST_KPPS: {
|
||||
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
|
||||
qos_consts.DIRECTION: {
|
||||
'type:values': constants.VALID_DIRECTIONS}
|
||||
},
|
||||
qos_consts.RULE_TYPE_DSCP_MARKING: {
|
||||
qos_consts.DSCP_MARK: {'type:values': constants.VALID_DSCP_MARKS}
|
||||
},
|
||||
|
@ -51,11 +51,20 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
{'phys1': ovs_bridge.OVSAgentBridge(
|
||||
'br-phys1', os_ken_app=os_ken_app)})
|
||||
self.qos_driver.consume_api(self.agent_api)
|
||||
mock.patch.object(
|
||||
qos_driver.MeterRuleManager, '_init_max_meter_id').start()
|
||||
self.qos_driver.initialize()
|
||||
self.qos_driver.br_int = mock.Mock()
|
||||
self.qos_driver.br_int.get_dp = mock.Mock(return_value=(mock.Mock(),
|
||||
mock.Mock(),
|
||||
mock.Mock()))
|
||||
self.qos_driver.meter_cache.br_int = self.qos_driver.br_int
|
||||
self.qos_driver.meter_cache.max_meter = 65535
|
||||
self.qos_driver.br_int.list_meter_features = mock.Mock(
|
||||
return_value=[{"max_meter": 65535,
|
||||
"band_types": 2,
|
||||
"capabilities": 15,
|
||||
"max_bands": 8}])
|
||||
self.qos_driver.br_int.get_egress_bw_limit_for_port = mock.Mock(
|
||||
return_value=(1000, 10))
|
||||
self.get_egress = self.qos_driver.br_int.get_egress_bw_limit_for_port
|
||||
@ -68,12 +77,28 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
self.qos_driver.br_int.create_egress_bw_limit_for_port)
|
||||
self.update_ingress = (
|
||||
self.qos_driver.br_int.update_ingress_bw_limit_for_port)
|
||||
|
||||
self.apply_meter_to_port = (
|
||||
self.qos_driver.br_int.apply_meter_to_port)
|
||||
self.remove_meter_from_port = (
|
||||
self.qos_driver.br_int.remove_meter_from_port)
|
||||
self.delete_meter = (
|
||||
self.qos_driver.br_int.delete_meter)
|
||||
self.create_meter = (
|
||||
self.qos_driver.br_int.create_meter)
|
||||
self.update_meter = (
|
||||
self.qos_driver.br_int.update_meter)
|
||||
|
||||
self.rules = [
|
||||
self._create_bw_limit_rule_obj(constants.EGRESS_DIRECTION),
|
||||
self._create_bw_limit_rule_obj(constants.INGRESS_DIRECTION),
|
||||
self._create_pps_limit_rule_obj(constants.EGRESS_DIRECTION),
|
||||
self._create_pps_limit_rule_obj(constants.INGRESS_DIRECTION),
|
||||
self._create_dscp_marking_rule_obj()]
|
||||
self.qos_policy = self._create_qos_policy_obj(self.rules)
|
||||
self.port = self._create_fake_port(self.qos_policy.id)
|
||||
self.qos_driver.br_int.get_port_tag_by_name = mock.Mock(
|
||||
return_value=1)
|
||||
|
||||
def _create_bw_limit_rule_obj(self, direction):
|
||||
rule_obj = rule.QosBandwidthLimitRule()
|
||||
@ -91,6 +116,15 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
rule_obj.obj_reset_changes()
|
||||
return rule_obj
|
||||
|
||||
def _create_pps_limit_rule_obj(self, direction):
|
||||
rule_obj = rule.QosPacketRateLimitRule()
|
||||
rule_obj.id = uuidutils.generate_uuid()
|
||||
rule_obj.max_kpps = 2000
|
||||
rule_obj.max_burst_kpps = 200
|
||||
rule_obj.direction = direction
|
||||
rule_obj.obj_reset_changes()
|
||||
return rule_obj
|
||||
|
||||
def _create_qos_policy_obj(self, rules):
|
||||
policy_dict = {'id': uuidutils.generate_uuid(),
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
@ -108,17 +142,24 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
def _create_fake_port(self, policy_id):
|
||||
self.port_name = 'fakeport'
|
||||
|
||||
port_id = uuidutils.generate_uuid()
|
||||
|
||||
class FakeVifPort(object):
|
||||
port_name = self.port_name
|
||||
ofport = 111
|
||||
vif_id = port_id
|
||||
vif_mac = "aa:bb:cc:dd:ee:ff"
|
||||
|
||||
return {'vif_port': FakeVifPort(),
|
||||
'qos_policy_id': policy_id,
|
||||
'qos_network_policy_id': None,
|
||||
'port_id': uuidutils.generate_uuid(),
|
||||
'port_id': port_id,
|
||||
'device_owner': uuidutils.generate_uuid()}
|
||||
|
||||
def test_create_new_rules(self):
|
||||
self.qos_driver.br_int.get_value_from_other_config = mock.Mock()
|
||||
self.qos_driver.br_int.set_value_to_other_config = mock.Mock()
|
||||
|
||||
self.qos_driver.br_int.get_egress_bw_limit_for_port = mock.Mock(
|
||||
return_value=(None, None))
|
||||
self.qos_driver.create(self.port, self.qos_policy)
|
||||
@ -132,6 +173,19 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
self.rules[1].max_burst_kbps)
|
||||
self._assert_dscp_rule_create_updated()
|
||||
|
||||
self.create_meter.assert_has_calls(
|
||||
[mock.call(mock.ANY, self.rules[2].max_kpps * 1000,
|
||||
burst=self.rules[2].max_burst_kpps * 1000),
|
||||
mock.call(mock.ANY, self.rules[3].max_kpps * 1000,
|
||||
burst=self.rules[3].max_burst_kpps * 1000)])
|
||||
self.apply_meter_to_port.assert_has_calls(
|
||||
[mock.call(mock.ANY, constants.EGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
in_port=111),
|
||||
mock.call(mock.ANY, constants.INGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
local_vlan=1)])
|
||||
|
||||
def test_create_existing_rules(self):
|
||||
self.qos_driver.create(self.port, self.qos_policy)
|
||||
self._assert_rules_create_updated()
|
||||
@ -149,12 +203,24 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
self.create_egress.assert_not_called()
|
||||
self.update_ingress.assert_not_called()
|
||||
|
||||
self.create_meter.assert_not_called()
|
||||
self.apply_meter_to_port.assert_not_called()
|
||||
|
||||
def _test_delete_rules(self, qos_policy):
|
||||
self.qos_driver.create(self.port, qos_policy)
|
||||
self.qos_driver.delete(self.port, qos_policy)
|
||||
self.delete_egress.assert_called_once_with(self.port_name)
|
||||
self.delete_ingress.assert_called_once_with(self.port_name)
|
||||
|
||||
self.assertEqual(2, self.delete_meter.call_count)
|
||||
self.remove_meter_from_port.assert_has_calls(
|
||||
[mock.call(constants.EGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
in_port=111),
|
||||
mock.call(constants.INGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
local_vlan=1)])
|
||||
|
||||
def _test_delete_rules_no_policy(self):
|
||||
self.qos_driver.delete(self.port)
|
||||
self.delete_egress.assert_called_once_with(self.port_name)
|
||||
@ -172,6 +238,10 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
self.qos_driver.delete(port, self.qos_policy)
|
||||
self.delete_egress.assert_not_called()
|
||||
self.delete_ingress.assert_not_called()
|
||||
self.delete_meter.assert_not_called()
|
||||
|
||||
self.delete_meter.assert_not_called()
|
||||
self.remove_meter_from_port.assert_not_called()
|
||||
|
||||
def _assert_rules_create_updated(self):
|
||||
self.create_egress.assert_called_once_with(
|
||||
@ -181,6 +251,19 @@ class QosOVSAgentDriverTestCase(ovs_test_base.OVSAgentConfigTestBase):
|
||||
self.port_name, self.rules[1].max_kbps,
|
||||
self.rules[1].max_burst_kbps)
|
||||
|
||||
self.create_meter.assert_has_calls(
|
||||
[mock.call(mock.ANY, self.rules[2].max_kpps * 1000,
|
||||
burst=self.rules[2].max_burst_kpps * 1000),
|
||||
mock.call(mock.ANY, self.rules[3].max_kpps * 1000,
|
||||
burst=self.rules[3].max_burst_kpps * 1000)])
|
||||
self.apply_meter_to_port.assert_has_calls(
|
||||
[mock.call(mock.ANY, constants.EGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
in_port=111),
|
||||
mock.call(mock.ANY, constants.INGRESS_DIRECTION,
|
||||
"aa:bb:cc:dd:ee:ff",
|
||||
local_vlan=1)])
|
||||
|
||||
def _assert_dscp_rule_create_updated(self):
|
||||
# Assert install_instructions is the last call
|
||||
self.assertEqual(
|
||||
|
Loading…
x
Reference in New Issue
Block a user