Merge "LinuxBridge agent's QoS driver bw limit for egress traffic"

This commit is contained in:
Jenkins 2016-04-19 10:58:54 +00:00 committed by Gerrit Code Review
commit 16346b906e
7 changed files with 272 additions and 86 deletions

View File

@ -21,5 +21,8 @@ ip_exec: IpNetnsExecFilter, ip, root
# tc commands needed for QoS support
tc_replace_tbf: RegExpFilter, tc, root, tc, qdisc, replace, dev, .+, root, tbf, rate, .+, latency, .+, burst, .+
tc_delete: RegExpFilter, tc, root, tc, qdisc, del, dev, .+, root
tc_show: RegExpFilter, tc, root, tc, qdisc, show, dev, .+
tc_add_ingress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, ingress, handle, .+
tc_delete: RegExpFilter, tc, root, tc, qdisc, del, dev, .+, .+
tc_show_qdisc: RegExpFilter, tc, root, tc, qdisc, show, dev, .+
tc_show_filters: RegExpFilter, tc, root, tc, filter, show, dev, .+, parent, .+
tc_add_filter: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, all, prio, .+, basic, police, rate, .+, burst, .+, mtu, .+, drop

View File

@ -20,6 +20,9 @@ from neutron.agent.linux import ip_lib
from neutron.common import exceptions
INGRESS_QDISC_ID = "ffff:"
MAX_MTU_VALUE = 65535
SI_BASE = 1000
IEC_BASE = 1024
@ -35,6 +38,10 @@ UNITS = {
"t": 4
}
filters_pattern = re.compile(r"police \w+ rate (\w+) burst (\w+)")
tbf_pattern = re.compile(
r"qdisc (\w+) \w+: \w+ refcnt \d rate (\w+) burst (\w+) \w*")
class InvalidKernelHzValue(exceptions.NeutronException):
message = _("Kernel HZ value %(value)s is not valid. This value must be "
@ -94,39 +101,29 @@ class TcCommand(ip_lib.IPDevice):
ip_wrapper = ip_lib.IPWrapper(self.namespace)
return ip_wrapper.netns.execute(cmd, run_as_root=True, **kwargs)
def get_bw_limits(self):
return self._get_tbf_limits()
def get_filters_bw_limits(self, qdisc_id=INGRESS_QDISC_ID):
cmd = ['filter', 'show', 'dev', self.name, 'parent', qdisc_id]
cmd_result = self._execute_tc_cmd(cmd)
if not cmd_result:
return None, None
for line in cmd_result.split("\n"):
m = filters_pattern.match(line.strip())
if m:
#NOTE(slaweq): because tc is giving bw limit in SI units
# we need to calculate it as 1000bit = 1kbit:
bw_limit = convert_to_kilobits(m.group(1), SI_BASE)
#NOTE(slaweq): because tc is giving burst limit in IEC units
# we need to calculate it as 1024bit = 1kbit:
burst_limit = convert_to_kilobits(m.group(2), IEC_BASE)
return bw_limit, burst_limit
return None, None
def set_bw_limit(self, bw_limit, burst_limit, latency_value):
return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value)
def update_bw_limit(self, bw_limit, burst_limit, latency_value):
return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value)
def delete_bw_limit(self):
cmd = ['qdisc', 'del', 'dev', self.name, 'root']
# Return_code=2 is fine because it means
# "RTNETLINK answers: No such file or directory" what is fine when we
# are trying to delete qdisc
return self._execute_tc_cmd(cmd, extra_ok_codes=[2])
def get_burst_value(self, bw_limit, burst_limit):
min_burst_value = self._get_min_burst_value(bw_limit)
return max(min_burst_value, burst_limit)
def _get_min_burst_value(self, bw_limit):
# bw_limit [kbit] / HZ [1/s] = burst [kbit]
return float(bw_limit) / float(self.kernel_hz)
def _get_tbf_limits(self):
def get_tbf_bw_limits(self):
cmd = ['qdisc', 'show', 'dev', self.name]
cmd_result = self._execute_tc_cmd(cmd)
if not cmd_result:
return None, None
pattern = re.compile(
r"qdisc (\w+) \w+: \w+ refcnt \d rate (\w+) burst (\w+) \w*"
)
m = pattern.match(cmd_result)
m = tbf_pattern.match(cmd_result)
if not m:
return None, None
qdisc_name = m.group(1)
@ -140,9 +137,73 @@ class TcCommand(ip_lib.IPDevice):
burst_limit = convert_to_kilobits(m.group(3), IEC_BASE)
return bw_limit, burst_limit
def set_filters_bw_limit(self, bw_limit, burst_limit):
"""Set ingress qdisc and filter for police ingress traffic on device
This will allow to police traffic incoming to interface. It
means that it is fine to limit egress traffic from instance point of
view.
"""
#because replace of tc filters is not working properly and it's adding
# new filters each time instead of replacing existing one first old
# ingress qdisc should be deleted and then added new one so update will
# be called to do that:
return self.update_filters_bw_limit(bw_limit, burst_limit)
def set_tbf_bw_limit(self, bw_limit, burst_limit, latency_value):
"""Set token bucket filter qdisc on device
This will allow to limit speed of packets going out from interface. It
means that it is fine to limit ingress traffic from instance point of
view.
"""
return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value)
def update_filters_bw_limit(self, bw_limit, burst_limit,
qdisc_id=INGRESS_QDISC_ID):
self.delete_filters_bw_limit()
return self._set_filters_bw_limit(bw_limit, burst_limit, qdisc_id)
def update_tbf_bw_limit(self, bw_limit, burst_limit, latency_value):
return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value)
def delete_filters_bw_limit(self):
#NOTE(slaweq): For limit traffic egress from instance we need to use
# qdisc "ingress" because it is ingress traffic from interface POV:
self._delete_qdisc("ingress")
def delete_tbf_bw_limit(self):
self._delete_qdisc("root")
def _set_filters_bw_limit(self, bw_limit, burst_limit,
qdisc_id=INGRESS_QDISC_ID):
cmd = ['qdisc', 'add', 'dev', self.name, 'ingress',
'handle', qdisc_id]
self._execute_tc_cmd(cmd)
return self._add_policy_filter(bw_limit, burst_limit)
def _delete_qdisc(self, qdisc_name):
cmd = ['qdisc', 'del', 'dev', self.name, qdisc_name]
# Return_code=2 is fine because it means
# "RTNETLINK answers: No such file or directory" what is fine when we
# are trying to delete qdisc
return self._execute_tc_cmd(cmd, extra_ok_codes=[2])
def _get_filters_burst_value(self, bw_limit, burst_limit):
if not burst_limit:
# NOTE(slaweq): If burst value was not specified by user than it
# will be set as 80% of bw_limit to ensure that limit for TCP
# traffic will work well:
return float(bw_limit) * 0.8
return burst_limit
def _get_tbf_burst_value(self, bw_limit, burst_limit):
min_burst_value = float(bw_limit) / float(self.kernel_hz)
return max(min_burst_value, burst_limit)
def _replace_tbf_qdisc(self, bw_limit, burst_limit, latency_value):
burst = "%s%s" % (
self.get_burst_value(bw_limit, burst_limit), BURST_UNIT)
self._get_tbf_burst_value(bw_limit, burst_limit), BURST_UNIT)
latency = "%s%s" % (latency_value, LATENCY_UNIT)
rate_limit = "%s%s" % (bw_limit, BW_LIMIT_UNIT)
cmd = [
@ -153,3 +214,21 @@ class TcCommand(ip_lib.IPDevice):
'burst', burst
]
return self._execute_tc_cmd(cmd)
def _add_policy_filter(self, bw_limit, burst_limit,
qdisc_id=INGRESS_QDISC_ID):
rate_limit = "%s%s" % (bw_limit, BW_LIMIT_UNIT)
burst = "%s%s" % (
self._get_filters_burst_value(bw_limit, burst_limit), BURST_UNIT)
#NOTE(slaweq): it is made in exactly same way how openvswitch is doing
# it when configuing ingress traffic limit on port. It can be found in
# lib/netdev-linux.c#L4698 in openvswitch sources:
cmd = [
'filter', 'add', 'dev', self.name,
'parent', qdisc_id, 'protocol', 'all',
'prio', '49', 'basic', 'police',
'rate', rate_limit,
'burst', burst,
'mtu', MAX_MTU_VALUE,
'drop']
return self._execute_tc_cmd(cmd)

View File

@ -37,21 +37,21 @@ class QosLinuxbridgeAgentDriver(qos.QosAgentDriver):
@log_helpers.log_method_call
def create_bandwidth_limit(self, port, rule):
tc_wrapper = self._get_tc_wrapper(port)
tc_wrapper.set_bw_limit(
rule.max_kbps, rule.max_burst_kbps, cfg.CONF.QOS.tbf_latency
tc_wrapper.set_filters_bw_limit(
rule.max_kbps, rule.max_burst_kbps
)
@log_helpers.log_method_call
def update_bandwidth_limit(self, port, rule):
tc_wrapper = self._get_tc_wrapper(port)
tc_wrapper.update_bw_limit(
rule.max_kbps, rule.max_burst_kbps, cfg.CONF.QOS.tbf_latency
tc_wrapper.update_filters_bw_limit(
rule.max_kbps, rule.max_burst_kbps
)
@log_helpers.log_method_call
def delete_bandwidth_limit(self, port):
tc_wrapper = self._get_tc_wrapper(port)
tc_wrapper.delete_bw_limit()
tc_wrapper.delete_filters_bw_limit()
def _get_tc_wrapper(self, port):
return tc_lib.TcCommand(

View File

@ -55,7 +55,7 @@ def _wait_for_rule_applied_linuxbridge_agent(vm, limit, burst):
namespace=vm.host.host_namespace
)
utils.wait_until_true(
lambda: tc.get_bw_limits() == (limit, burst))
lambda: tc.get_filters_bw_limits() == (limit, burst))
def _wait_for_rule_applied(vm, limit, burst):

View File

@ -26,44 +26,64 @@ LATENCY = 50
BW_LIMIT = 1024
BURST = 512
DEV_NAME = "test_tap"
MAC_ADDRESS = "fa:16:3e:01:01:01"
BASE_DEV_NAME = "test_tap"
class TcLibTestCase(functional_base.BaseSudoTestCase):
def setUp(self):
super(TcLibTestCase, self).setUp()
self.create_device()
self.tc = tc_lib.TcCommand(DEV_NAME, TEST_HZ_VALUE)
def create_device(self):
"""Create a tuntap with the specified attributes.
def create_device(self, name):
"""Create a tuntap with the specified name.
The device is cleaned up at the end of the test.
"""
ip = ip_lib.IPWrapper()
tap_device = ip.add_tuntap(DEV_NAME)
tap_device = ip.add_tuntap(name)
self.addCleanup(tap_device.link.delete)
tap_device.link.set_address(MAC_ADDRESS)
tap_device.link.set_up()
def test_bandwidth_limit(self):
self.tc.set_bw_limit(BW_LIMIT, BURST, LATENCY)
bw_limit, burst = self.tc.get_bw_limits()
def test_filters_bandwidth_limit(self):
device_name = "%s_filters" % BASE_DEV_NAME
self.create_device(device_name)
tc = tc_lib.TcCommand(device_name, TEST_HZ_VALUE)
tc.set_filters_bw_limit(BW_LIMIT, BURST)
bw_limit, burst = tc.get_filters_bw_limits()
self.assertEqual(BW_LIMIT, bw_limit)
self.assertEqual(BURST, burst)
new_bw_limit = BW_LIMIT + 500
new_burst = BURST + 50
self.tc.update_bw_limit(new_bw_limit, new_burst, LATENCY)
bw_limit, burst = self.tc.get_bw_limits()
tc.update_filters_bw_limit(new_bw_limit, new_burst)
bw_limit, burst = tc.get_filters_bw_limits()
self.assertEqual(new_bw_limit, bw_limit)
self.assertEqual(new_burst, burst)
self.tc.delete_bw_limit()
bw_limit, burst = self.tc.get_bw_limits()
tc.delete_filters_bw_limit()
bw_limit, burst = tc.get_filters_bw_limits()
self.assertIsNone(bw_limit)
self.assertIsNone(burst)
def test_tbf_bandwidth_limit(self):
device_name = "%s_tbf" % BASE_DEV_NAME
self.create_device(device_name)
tc = tc_lib.TcCommand(device_name, TEST_HZ_VALUE)
tc.set_tbf_bw_limit(BW_LIMIT, BURST, LATENCY)
bw_limit, burst = tc.get_tbf_bw_limits()
self.assertEqual(BW_LIMIT, bw_limit)
self.assertEqual(BURST, burst)
new_bw_limit = BW_LIMIT + 500
new_burst = BURST + 50
tc.update_tbf_bw_limit(new_bw_limit, new_burst, LATENCY)
bw_limit, burst = tc.get_tbf_bw_limits()
self.assertEqual(new_bw_limit, bw_limit)
self.assertEqual(new_burst, burst)
tc.delete_tbf_bw_limit()
bw_limit, burst = tc.get_tbf_bw_limits()
self.assertIsNone(bw_limit)
self.assertIsNone(burst)

View File

@ -24,10 +24,18 @@ BW_LIMIT = 2000 # [kbps]
BURST = 100 # [kbit]
LATENCY = 50 # [ms]
TC_OUTPUT = (
TC_QDISC_OUTPUT = (
'qdisc tbf 8011: root refcnt 2 rate %(bw)skbit burst %(burst)skbit '
'lat 50.0ms \n') % {'bw': BW_LIMIT, 'burst': BURST}
TC_FILTERS_OUTPUT = (
'filter protocol all pref 49152 u32 \nfilter protocol all pref '
'49152 u32 fh 800: ht divisor 1 \nfilter protocol all pref 49152 u32 fh '
'800::800 order 2048 key ht 800 \n match 00000000/00000000 at 0\n '
'police 0x1e rate %(bw)skbit burst %(burst)skbit mtu 2Kb action \n'
'drop overhead 0b \n ref 1 bind 1'
) % {'bw': BW_LIMIT, 'burst': BURST}
class BaseUnitConversionTest(object):
@ -148,26 +156,48 @@ class TestTcCommand(base.BaseTestCase):
tc_lib.TcCommand, DEVICE_NAME, -100
)
def test_get_bw_limits(self):
self.execute.return_value = TC_OUTPUT
bw_limit, burst_limit = self.tc.get_bw_limits()
def test_get_filters_bw_limits(self):
self.execute.return_value = TC_FILTERS_OUTPUT
bw_limit, burst_limit = self.tc.get_filters_bw_limits()
self.assertEqual(BW_LIMIT, bw_limit)
self.assertEqual(BURST, burst_limit)
def test_get_bw_limits_when_wrong_qdisc(self):
output = TC_OUTPUT.replace("tbf", "different_qdisc")
def test_get_filters_bw_limits_when_output_not_match(self):
output = (
"Some different "
"output from command:"
"tc filters show dev XXX parent ffff:"
)
self.execute.return_value = output
bw_limit, burst_limit = self.tc.get_bw_limits()
bw_limit, burst_limit = self.tc.get_filters_bw_limits()
self.assertIsNone(bw_limit)
self.assertIsNone(burst_limit)
def test_get_bw_limits_when_wrong_units(self):
output = TC_OUTPUT.replace("kbit", "Xbit")
def test_get_filters_bw_limits_when_wrong_units(self):
output = TC_FILTERS_OUTPUT.replace("kbit", "Xbit")
self.execute.return_value = output
self.assertRaises(tc_lib.InvalidUnit, self.tc.get_bw_limits)
self.assertRaises(tc_lib.InvalidUnit, self.tc.get_filters_bw_limits)
def test_set_bw_limit(self):
self.tc.set_bw_limit(BW_LIMIT, BURST, LATENCY)
def test_get_tbf_bw_limits(self):
self.execute.return_value = TC_QDISC_OUTPUT
bw_limit, burst_limit = self.tc.get_tbf_bw_limits()
self.assertEqual(BW_LIMIT, bw_limit)
self.assertEqual(BURST, burst_limit)
def test_get_tbf_bw_limits_when_wrong_qdisc(self):
output = TC_QDISC_OUTPUT.replace("tbf", "different_qdisc")
self.execute.return_value = output
bw_limit, burst_limit = self.tc.get_tbf_bw_limits()
self.assertIsNone(bw_limit)
self.assertIsNone(burst_limit)
def test_get_tbf_bw_limits_when_wrong_units(self):
output = TC_QDISC_OUTPUT.replace("kbit", "Xbit")
self.execute.return_value = output
self.assertRaises(tc_lib.InvalidUnit, self.tc.get_tbf_bw_limits)
def test_set_tbf_bw_limit(self):
self.tc.set_tbf_bw_limit(BW_LIMIT, BURST, LATENCY)
self.execute.assert_called_once_with(
["tc", "qdisc", "replace", "dev", DEVICE_NAME,
"root", "tbf", "rate", self.bw_limit,
@ -179,8 +209,41 @@ class TestTcCommand(base.BaseTestCase):
extra_ok_codes=None
)
def test_update_bw_limit(self):
self.tc.update_bw_limit(BW_LIMIT, BURST, LATENCY)
def test_update_filters_bw_limit(self):
self.tc.update_filters_bw_limit(BW_LIMIT, BURST)
self.execute.assert_has_calls([
mock.call(
["tc", "qdisc", "del", "dev", DEVICE_NAME, "ingress"],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=[2]
),
mock.call(
['tc', 'qdisc', 'add', 'dev', DEVICE_NAME, "ingress",
"handle", tc_lib.INGRESS_QDISC_ID],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
),
mock.call(
['tc', 'filter', 'add', 'dev', DEVICE_NAME,
'parent', tc_lib.INGRESS_QDISC_ID, 'protocol', 'all',
'prio', '49', 'basic', 'police',
'rate', self.bw_limit,
'burst', self.burst,
'mtu', tc_lib.MAX_MTU_VALUE,
'drop'],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=None
)]
)
def test_update_tbf_bw_limit(self):
self.tc.update_tbf_bw_limit(BW_LIMIT, BURST, LATENCY)
self.execute.assert_called_once_with(
["tc", "qdisc", "replace", "dev", DEVICE_NAME,
"root", "tbf", "rate", self.bw_limit,
@ -192,8 +255,18 @@ class TestTcCommand(base.BaseTestCase):
extra_ok_codes=None
)
def test_delete_bw_limit(self):
self.tc.delete_bw_limit()
def test_delete_filters_bw_limit(self):
self.tc.delete_filters_bw_limit()
self.execute.assert_called_once_with(
["tc", "qdisc", "del", "dev", DEVICE_NAME, "ingress"],
run_as_root=True,
check_exit_code=True,
log_fail_as_error=True,
extra_ok_codes=[2]
)
def test_delete_tbf_bw_limit(self):
self.tc.delete_tbf_bw_limit()
self.execute.assert_called_once_with(
["tc", "qdisc", "del", "dev", DEVICE_NAME, "root"],
run_as_root=True,
@ -202,16 +275,29 @@ class TestTcCommand(base.BaseTestCase):
extra_ok_codes=[2]
)
def test_burst_value_when_burst_bigger_then_minimal(self):
result = self.tc.get_burst_value(BW_LIMIT, BURST)
def test__get_filters_burst_value_burst_not_none(self):
self.assertEqual(
BURST, self.tc._get_filters_burst_value(BW_LIMIT, BURST)
)
def test__get_filters_burst_no_burst_value_given(self):
expected_burst = BW_LIMIT * 0.8
self.assertEqual(
expected_burst,
self.tc._get_filters_burst_value(BW_LIMIT, None)
)
def test__get_filters_burst_burst_value_zero(self):
expected_burst = BW_LIMIT * 0.8
self.assertEqual(
expected_burst,
self.tc._get_filters_burst_value(BW_LIMIT, 0)
)
def test__get_tbf_burst_value_when_burst_bigger_then_minimal(self):
result = self.tc._get_tbf_burst_value(BW_LIMIT, BURST)
self.assertEqual(BURST, result)
def test_burst_value_when_burst_smaller_then_minimal(self):
result = self.tc.get_burst_value(BW_LIMIT, 0)
self.assertEqual(2, result)
def test__get_min_burst_value_in_bits(self):
result = self.tc._get_min_burst_value(BW_LIMIT)
#if input is 2000kbit and kernel_hz is configured to 1000 then
# min_burst should be 2 kbit
def test__get_tbf_burst_value_when_burst_smaller_then_minimal(self):
result = self.tc._get_tbf_burst_value(BW_LIMIT, 0)
self.assertEqual(2, result)

View File

@ -53,27 +53,25 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase):
def test_create_rule(self):
with mock.patch.object(
tc_lib.TcCommand, "set_bw_limit"
tc_lib.TcCommand, "set_filters_bw_limit"
) as set_bw_limit:
self.qos_driver.create_bandwidth_limit(self.port, self.rule)
set_bw_limit.assert_called_once_with(
self.rule.max_kbps, self.rule.max_burst_kbps,
TEST_LATENCY_VALUE
)
def test_update_rule(self):
with mock.patch.object(
tc_lib.TcCommand, "update_bw_limit"
tc_lib.TcCommand, "update_filters_bw_limit"
) as update_bw_limit:
self.qos_driver.update_bandwidth_limit(self.port, self.rule)
update_bw_limit.assert_called_once_with(
self.rule.max_kbps, self.rule.max_burst_kbps,
TEST_LATENCY_VALUE
)
def test_delete_rule(self):
with mock.patch.object(
tc_lib.TcCommand, "delete_bw_limit"
tc_lib.TcCommand, "delete_filters_bw_limit"
) as delete_bw_limit:
self.qos_driver.delete_bandwidth_limit(self.port)
delete_bw_limit.assert_called_once_with()