diff --git a/doc/source/devref/quality_of_service.rst b/doc/source/devref/quality_of_service.rst index 97170cd7afc..788e05e8b02 100644 --- a/doc/source/devref/quality_of_service.rst +++ b/doc/source/devref/quality_of_service.rst @@ -362,17 +362,44 @@ Linux bridge The Linux bridge implementation relies on the new tc_lib functions: -* set_bw_limit -* update_bw_limit -* delete_bw_limit +* set_bw +* delete_bw -The ingress bandwidth limit is configured on the tap port by setting a simple -`tc-tbf `_ queueing discipline (qdisc) on the -port. It requires a value of HZ parameter configured in kernel on the host. -This value is necessary to calculate the minimal burst value which is set in -tc. Details about how it is calculated can be found in -`here `_. This solution is similar to Open -vSwitch implementation. +Only egress direction traffic shaping, from the instance point of view, is +implemented. Traffic shaping is done by a classful Traffic Control qdisq +called Class Based Queueing +(`Classful Queueing Disciplines `_). +This shaping algorithm is implemented by +`tc-htb `_, replacing the former one used +`tc-tbf `_. + +Traffic shaping is executed only for interface egress traffic. Because the +traffic coming from the instance is considered as ingress traffic by the +interface, shaping is not possible. To solve this, a new element is introduced: +`Intermediate Functional Block `_, +a pseudo network interface used to redirect all the ingress traffic from the TAP +port to the egress IFB queue. The QoS rules are applied on this IFB +interface associated to the TAP port:: + + +-----------------+ + | [Instance veth] | + +-----------------+ + | (egress traffic, instance point of view) + | + | +--------------------------+ + \ / [tap port] | [IFB] \ / + +-------------------------------+ | +-------------------------------+ + | ingress queue | egress queue | | | ingress queue | egress queue | + | (filter: | | | | | (QoS rules, | + | redir IFB) | | | | | tc-htb) | + +-------------------------------+ | +-------------------------------+ + | | | shaped + +--------------------------------+ | traffic + \ / + +Traffic in the tap port is redirected (mirrored) to the IFB using a Traffic +Control filter +(`Filter Actions `_). Notification driver design -------------------------- diff --git a/etc/neutron/rootwrap.d/linuxbridge-plugin.filters b/etc/neutron/rootwrap.d/linuxbridge-plugin.filters index f0934357ba6..90cd4c53584 100644 --- a/etc/neutron/rootwrap.d/linuxbridge-plugin.filters +++ b/etc/neutron/rootwrap.d/linuxbridge-plugin.filters @@ -20,9 +20,12 @@ find: RegExpFilter, find, root, find, /sys/class/net, -maxdepth, 1, -type, l, -p ip_exec: IpNetnsExecFilter, ip, root # tc commands needed for QoS support -tc_replace_tbf: RegExpFilter, tc, root, tc, qdisc, replace, dev, .+, root, tbf, rate, .+, latency, .+, burst, .+ -tc_add_ingress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, ingress, handle, .+ -tc_delete: RegExpFilter, tc, root, tc, qdisc, del, dev, .+, .+ +tc_add_qdisc: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, (root|parent .+), handle, .+, htb +tc_add_qdisc_ingress: RegExpFilter, tc, root, tc, qdisc, add, dev, .+, ingress, handle, .+ tc_show_qdisc: RegExpFilter, tc, root, tc, qdisc, show, dev, .+ -tc_show_filters: RegExpFilter, tc, root, tc, filter, show, dev, .+, parent, .+ -tc_add_filter: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, all, prio, .+, basic, police, rate, .+, burst, .+, mtu, .+, drop +tc_del_qdisc: RegExpFilter, tc, root, tc, qdisc, del, dev, .+, (root|ingress|parent .+) +tc_add_class: RegExpFilter, tc, root, tc, class, replace, dev, .+, parent, .+, classid, .+, .+, rate, .+ +tc_add_class_max: RegExpFilter, tc, root, tc, class, replace, dev, .+, parent, .+, classid, .+, .+, rate, .+, ceil, .+, burst, .+ +tc_show_class: RegExpFilter, tc, root, tc, class, show, dev, .+ +tc_show_filter: RegExpFilter, tc, root, tc, filter, show, dev, .+, parent, .+ +tc_add_filter_ifb: RegExpFilter, tc, root, tc, filter, add, dev, .+, parent, .+, protocol, all, u32, match, u32, 0, 0, action, mirred, egress, redirect, dev, .+ diff --git a/neutron/agent/linux/ip_lib.py b/neutron/agent/linux/ip_lib.py index fce568fda28..2b27fcc41c9 100644 --- a/neutron/agent/linux/ip_lib.py +++ b/neutron/agent/linux/ip_lib.py @@ -208,6 +208,15 @@ class IPWrapper(SubProcessBase): self._as_root([], 'link', ('add', name, 'type', 'dummy')) return IPDevice(name, namespace=self.namespace) + def add_ifb(self, name): + """Create a Linux IFB type interface with the given name.""" + self._as_root([], 'link', ('add', name, 'type', 'ifb')) + return IPDevice(name, namespace=self.namespace) + + def del_ifb(self, name): + """Delete a Linux IFB type interface with the given name.""" + self._as_root([], 'link', ('del', name)) + def ensure_namespace(self, name): if not self.netns.exists(name): ip = self.netns.add(name) diff --git a/neutron/agent/linux/tc_lib.py b/neutron/agent/linux/tc_lib.py index df6de290b04..16459f03b20 100644 --- a/neutron/agent/linux/tc_lib.py +++ b/neutron/agent/linux/tc_lib.py @@ -13,22 +13,31 @@ # License for the specific language governing permissions and limitations # under the License. +import collections +import math import re from neutron_lib import exceptions +from oslo_log import log as logging from neutron._i18n import _ from neutron.agent.linux import ip_lib +from neutron.common import constants from neutron.services.qos import qos_consts -INGRESS_QDISC_ID = "ffff:" -MAX_MTU_VALUE = 65535 +LOG = logging.getLogger(__name__) + +ROOT_QDISC = "root" +INGRESS_QDISC = "ingress" +INGRESS_QDISC_HEX = "ffff:fff1" +INGRESS_QDISC_HANDLE = "ffff:" +QDISC_TYPE_HTB = "htb" +QDISC_TYPE_DEFAULT = "pfifo_fast" SI_BASE = 1000 IEC_BASE = 1024 -LATENCY_UNIT = "ms" BW_LIMIT_UNIT = "kbit" # kilobits per second in tc's notation BURST_UNIT = "kbit" # kilobits in tc's notation @@ -40,21 +49,32 @@ UNITS = { "t": 4 } -filters_pattern = re.compile(r"police \w+ rate (\w+) burst (\w+)") -tbf_pattern = re.compile( - r"qdisc (\w+) \w+: \w+ refcnt \d rate (\w+) burst (\w+) \w*") - - -class InvalidKernelHzValue(exceptions.NeutronException): - message = _("Kernel HZ value %(value)s is not valid. This value must be " - "greater than 0.") - class InvalidUnit(exceptions.NeutronException): message = _("Unit name '%(unit)s' is not valid.") -def convert_to_kilobits(value, base): +class InvalidPolicyClassParameters(exceptions.NeutronException): + message = _("'rate' or 'ceil' parameters must be defined") + + +def kilobits_to_bits(value, base): + return value * base + + +def bits_to_kilobits(value, base): + return int(math.ceil(float(value) / base)) + + +def bytes_to_bits(value): + return value * 8 + + +def bits_to_bytes(value): + return int(value / 8) + + +def convert_to_kilo(value, base): value = value.lower() if "bit" in value: input_in_bits = True @@ -81,23 +101,8 @@ def convert_to_kilobits(value, base): return bits_to_kilobits(bits_value, base) -def bytes_to_bits(value): - return value * 8 - - -def bits_to_kilobits(value, base): - #NOTE(slaweq): round up that even 1 bit will give 1 kbit as a result - return int((value + (base - 1)) / base) - - class TcCommand(ip_lib.IPDevice): - def __init__(self, name, kernel_hz, namespace=None): - if kernel_hz <= 0: - raise InvalidKernelHzValue(value=kernel_hz) - super(TcCommand, self).__init__(name, namespace=namespace) - self.kernel_hz = kernel_hz - def _execute_tc_cmd(self, cmd, **kwargs): cmd = ['tc'] + cmd ip_wrapper = ip_lib.IPWrapper(self.namespace) @@ -111,131 +116,248 @@ class TcCommand(ip_lib.IPDevice): rate to ensure that limit for TCP traffic will work well """ if not burst_limit: - return float(bw_limit) * qos_consts.DEFAULT_BURST_RATE + return int(float(bw_limit) * qos_consts.DEFAULT_BURST_RATE) return burst_limit - def get_filters_bw_limits(self, qdisc_id=INGRESS_QDISC_ID): - cmd = ['filter', 'show', 'dev', self.name, 'parent', qdisc_id] - cmd_result = self._execute_tc_cmd(cmd) - if not cmd_result: - return None, None - for line in cmd_result.split("\n"): - m = filters_pattern.match(line.strip()) - if m: - #NOTE(slaweq): because tc is giving bw limit in SI units - # we need to calculate it as 1000bit = 1kbit: - bw_limit = convert_to_kilobits(m.group(1), SI_BASE) - #NOTE(slaweq): because tc is giving burst limit in IEC units - # we need to calculate it as 1024bit = 1kbit: - burst_limit = convert_to_kilobits(m.group(2), IEC_BASE) - return bw_limit, burst_limit - return None, None + def set_bw(self, max, burst, min, direction): + max = kilobits_to_bits(max, SI_BASE) if max else max + burst = (bits_to_bytes(kilobits_to_bits(burst, IEC_BASE)) if burst + else burst) + min = kilobits_to_bits(min, SI_BASE) if min else min + if direction == constants.EGRESS_DIRECTION: + return self._set_ingress_bw(max, burst, min) + else: + raise NotImplementedError() - def get_tbf_bw_limits(self): - cmd = ['qdisc', 'show', 'dev', self.name] - cmd_result = self._execute_tc_cmd(cmd) - if not cmd_result: - return None, None - m = tbf_pattern.match(cmd_result) - if not m: - return None, None - qdisc_name = m.group(1) - if qdisc_name != "tbf": - return None, None - #NOTE(slaweq): because tc is giving bw limit in SI units - # we need to calculate it as 1000bit = 1kbit: - bw_limit = convert_to_kilobits(m.group(2), SI_BASE) - #NOTE(slaweq): because tc is giving burst limit in IEC units - # we need to calculate it as 1024bit = 1kbit: - burst_limit = convert_to_kilobits(m.group(3), IEC_BASE) - return bw_limit, burst_limit + def delete_bw(self, direction): + if direction == constants.EGRESS_DIRECTION: + return self._delete_ingress() + else: + raise NotImplementedError() - def set_filters_bw_limit(self, bw_limit, burst_limit): - """Set ingress qdisc and filter for police ingress traffic on device + def get_limits(self, direction): + if direction == constants.EGRESS_DIRECTION: + return self._get_ingress_limits() + else: + raise NotImplementedError() - This will allow to police traffic incoming to interface. It - means that it is fine to limit egress traffic from instance point of - view. - """ - #because replace of tc filters is not working properly and it's adding - # new filters each time instead of replacing existing one first old - # ingress qdisc should be deleted and then added new one so update will - # be called to do that: - return self.update_filters_bw_limit(bw_limit, burst_limit) + def _set_ingress_bw(self, max, burst, min): + self._add_policy_qdisc(INGRESS_QDISC, INGRESS_QDISC_HANDLE) + self._configure_ifb(max=max, burst=burst, min=min) - def set_tbf_bw_limit(self, bw_limit, burst_limit, latency_value): - """Set token bucket filter qdisc on device + def _delete_ingress(self): + ifb = self._find_mirrored_ifb() + if ifb: + self._del_ifb(ifb) + self._del_policy_qdisc(INGRESS_QDISC) - This will allow to limit speed of packets going out from interface. It - means that it is fine to limit ingress traffic from instance point of - view. - """ - return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value) + def _add_policy_qdisc(self, parent, handle, qdisc_type=None, dev=None): + def check_qdisc(qdisc, qdisc_type, handle, parent, device): + if not qdisc or qdisc.get('type') == QDISC_TYPE_DEFAULT: + return False + elif ((qdisc_type and (qdisc.get('type') != qdisc_type or + qdisc.get('handle') != handle)) or + (not qdisc_type and qdisc.get('handle') != handle)): + self._del_policy_qdisc(parent, dev=device) + return False + return True - def update_filters_bw_limit(self, bw_limit, burst_limit, - qdisc_id=INGRESS_QDISC_ID): - self.delete_filters_bw_limit() - return self._set_filters_bw_limit(bw_limit, burst_limit, qdisc_id) + device = str(dev) if dev else self.name + qdisc = self._show_policy_qdisc(parent, dev=device) + if check_qdisc(qdisc, qdisc_type, handle, parent, device): + return + cmd = ['qdisc', 'add', 'dev', device] + if parent in [ROOT_QDISC, INGRESS_QDISC]: + cmd += [parent] + else: + cmd += ['parent', parent] + cmd += ['handle', handle] + if qdisc_type: + cmd += [qdisc_type] - def update_tbf_bw_limit(self, bw_limit, burst_limit, latency_value): - return self._replace_tbf_qdisc(bw_limit, burst_limit, latency_value) + LOG.debug("Add policy qdisc cmd: %s", cmd) + return self._execute_tc_cmd(cmd) - def delete_filters_bw_limit(self): - #NOTE(slaweq): For limit traffic egress from instance we need to use - # qdisc "ingress" because it is ingress traffic from interface POV: - self._delete_qdisc("ingress") + def _del_policy_qdisc(self, parent, dev=None): + device = str(dev) if dev else self.name + if not self._show_policy_qdisc(parent, dev=device): + return + cmd = ['qdisc', 'del', 'dev', device] + if parent in [ROOT_QDISC, INGRESS_QDISC]: + cmd += [parent] + else: + cmd += ['parent', parent] - def delete_tbf_bw_limit(self): - self._delete_qdisc("root") - - def _set_filters_bw_limit(self, bw_limit, burst_limit, - qdisc_id=INGRESS_QDISC_ID): - cmd = ['qdisc', 'add', 'dev', self.name, 'ingress', - 'handle', qdisc_id] + LOG.debug("Delete policy qdisc cmd: %s", cmd) self._execute_tc_cmd(cmd) - return self._add_policy_filter(bw_limit, burst_limit) - def _delete_qdisc(self, qdisc_name): - cmd = ['qdisc', 'del', 'dev', self.name, qdisc_name] - # Return_code=2 is fine because it means - # "RTNETLINK answers: No such file or directory" what is fine when we - # are trying to delete qdisc - return self._execute_tc_cmd(cmd, extra_ok_codes=[2]) + def _list_policy_qdisc(self, dev=None): + device = str(dev) if dev else self.name + cmd = ['qdisc', 'show', 'dev', device] + LOG.debug("List policy qdisc cmd: %s", cmd) + result = self._execute_tc_cmd(cmd) + pat = re.compile(r'qdisc (\w+) (\w+\:) (root|parent (\w*\:\w+))') + qdiscs = collections.defaultdict(dict) + for match in (pat.match(line) for line in result.splitlines() + if pat.match(line)): + qdisc = {} + qdisc['type'] = match.groups()[0] + qdisc['handle'] = match.groups()[1] + if match.groups()[2] == ROOT_QDISC: + qdisc['parentid'] = ROOT_QDISC + else: + qdisc['parentid'] = match.groups()[3] + qdisc_ref = INGRESS_QDISC if qdisc['parentid'] == \ + INGRESS_QDISC_HEX else qdisc['parentid'] + qdiscs[qdisc_ref] = qdisc - def _get_tbf_burst_value(self, bw_limit, burst_limit): - min_burst_value = float(bw_limit) / float(self.kernel_hz) - return max(min_burst_value, burst_limit) + LOG.debug("List of policy qdiscs: %s", qdiscs) + return qdiscs - def _replace_tbf_qdisc(self, bw_limit, burst_limit, latency_value): - burst = "%s%s" % ( - self._get_tbf_burst_value(bw_limit, burst_limit), BURST_UNIT) - latency = "%s%s" % (latency_value, LATENCY_UNIT) - rate_limit = "%s%s" % (bw_limit, BW_LIMIT_UNIT) - cmd = [ - 'qdisc', 'replace', 'dev', self.name, - 'root', 'tbf', - 'rate', rate_limit, - 'latency', latency, - 'burst', burst - ] + def _show_policy_qdisc(self, parent, dev=None): + device = str(dev) if dev else self.name + return self._list_policy_qdisc(device).get(parent) + + def _add_policy_class(self, parent, classid, qdisc_type, rate=None, + ceil=None, burst=None, dev=None): + """Add new TC class""" + device = str(dev) if dev else self.name + policy = self._show_policy_class(classid, dev=device) + if policy: + rate = (kilobits_to_bits(policy['rate'], SI_BASE) if not rate + else rate) + ceil = (kilobits_to_bits(policy['ceil'], SI_BASE) if not ceil + else ceil) + burst = (bits_to_bytes(kilobits_to_bits(policy['burst'], IEC_BASE)) + if not burst else burst) + + if not rate and not ceil: + raise InvalidPolicyClassParameters + if not rate: + rate = ceil + + cmd = self._cmd_policy_class(classid, qdisc_type, rate, device, parent, + ceil, burst) + LOG.debug("Add/replace policy class cmd: %s", cmd) return self._execute_tc_cmd(cmd) - def _add_policy_filter(self, bw_limit, burst_limit, - qdisc_id=INGRESS_QDISC_ID): - rate_limit = "%s%s" % (bw_limit, BW_LIMIT_UNIT) - burst = "%s%s" % ( - self.get_ingress_qdisc_burst_value(bw_limit, burst_limit), - BURST_UNIT - ) - #NOTE(slaweq): it is made in exactly same way how openvswitch is doing - # it when configuing ingress traffic limit on port. It can be found in - # lib/netdev-linux.c#L4698 in openvswitch sources: - cmd = [ - 'filter', 'add', 'dev', self.name, - 'parent', qdisc_id, 'protocol', 'all', - 'prio', '49', 'basic', 'police', - 'rate', rate_limit, - 'burst', burst, - 'mtu', MAX_MTU_VALUE, - 'drop'] + def _cmd_policy_class(self, classid, qdisc_type, rate, device, parent, + ceil, burst): + cmd = ['class', 'replace', 'dev', device] + if parent: + cmd += ['parent', parent] + rate = 8 if rate < 8 else rate + cmd += ['classid', classid, qdisc_type, 'rate', rate] + if ceil: + ceil = rate if ceil < rate else ceil + cmd += ['ceil', ceil] + if burst: + cmd += ['burst', burst] + return cmd + + def _list_policy_class(self, dev=None): + device = str(dev) if dev else self.name + cmd = ['class', 'show', 'dev', device] + result = self._execute_tc_cmd(cmd, check_exit_code=False) + if not result: + return {} + classes = collections.defaultdict(dict) + pat = re.compile(r'class (\S+) ([0-9a-fA-F]+\:[0-9a-fA-F]+) ' + r'(root|parent ([0-9a-fA-F]+\:[0-9a-fA-F]+))' + r'( prio ([0-9]+))* rate (\w+) ceil (\w+) burst (\w+)' + r' cburst (\w+)') + for match in (pat.match(line) for line in result.splitlines() + if pat.match(line)): + _class = {} + _class['type'] = match.groups()[0] + classid = match.groups()[1] + if match.groups()[2] == ROOT_QDISC: + _class['parentid'] = None + else: + _class['parentid'] = match.groups()[3] + _class['prio'] = match.groups()[5] + _class['rate'] = convert_to_kilo(match.groups()[6], SI_BASE) + _class['ceil'] = convert_to_kilo(match.groups()[7], SI_BASE) + _class['burst'] = convert_to_kilo(match.groups()[8], IEC_BASE) + _class['cburst'] = convert_to_kilo(match.groups()[9], IEC_BASE) + classes[classid] = _class + LOG.debug("Policy classes: %s", classes) + return classes + + def _show_policy_class(self, classid, dev=None): + device = str(dev) if dev else self.name + return self._list_policy_class(device).get(classid) + + def _add_policy_filter(self, parent, protocol, filter, dev=None, + action=None): + """Add a new filter""" + device = str(dev) if dev else self.name + cmd = ['filter', 'add', 'dev', device, 'parent', parent] + cmd += ['protocol'] + protocol + cmd += filter + if action: + cmd += ['action'] + action + + LOG.debug("Add policy filter cmd: %s", cmd) return self._execute_tc_cmd(cmd) + + def _list_policy_filters(self, parent, dev=None): + """Returns the output of showing the filters in a device""" + device = dev if dev else self.name + cmd = ['filter', 'show', 'dev', device, 'parent', parent] + LOG.debug("List policy filter cmd: %s", cmd) + return self._execute_tc_cmd(cmd) + + def _add_ifb(self, dev_name): + """Create a new IFB device""" + ns_ip = ip_lib.IPWrapper(namespace=self.namespace) + if self._find_mirrored_ifb(): + ifb = ip_lib.IPDevice(dev_name, namespace=self.namespace) + if not ifb.exists(): + self._del_ifb(dev_name=dev_name) + ifb = ns_ip.add_ifb(dev_name) + else: + self._del_ifb(dev_name=dev_name) + ifb = ns_ip.add_ifb(dev_name) + + ifb.disable_ipv6() + ifb.link.set_up() + return ifb + + def _del_ifb(self, dev_name): + """Delete a IFB device""" + ns_ip = ip_lib.IPWrapper(namespace=self.namespace) + devices = ns_ip.get_devices(exclude_loopback=True) + for device in (dev for dev in devices if dev.name == dev_name): + ns_ip.del_ifb(device.name) + + def _find_mirrored_ifb(self): + """Return the name of the IFB device where the traffic is mirrored""" + ifb_name = self.name.replace("tap", "ifb") + ifb = ip_lib.IPDevice(ifb_name, namespace=self.namespace) + if not ifb.exists(): + return None + return ifb_name + + def _configure_ifb(self, max=None, burst=None, min=None): + ifb = self._find_mirrored_ifb() + if not ifb: + ifb = self.name.replace("tap", "ifb") + self._add_ifb(ifb) + protocol = ['all', 'u32'] + filter = ['match', 'u32', '0', '0'] + action = ['mirred', 'egress', 'redirect', 'dev', '%s' % ifb] + self._add_policy_filter(INGRESS_QDISC_HANDLE, protocol, filter, + dev=self.name, action=action) + self._add_policy_qdisc(ROOT_QDISC, "1:", qdisc_type=QDISC_TYPE_HTB, + dev=ifb) + self._add_policy_class("1:", "1:1", QDISC_TYPE_HTB, rate=min, + ceil=max, burst=burst, dev=ifb) + + def _get_ingress_limits(self): + ifb = self._find_mirrored_ifb() + if ifb: + policy = self._show_policy_class("1:1", dev=ifb) + if policy: + return policy['ceil'], policy['burst'], policy['rate'] + return None, None, None diff --git a/neutron/common/utils.py b/neutron/common/utils.py index 5bb069fccde..fddfea2f0f4 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -718,7 +718,8 @@ def transaction_guard(f): return inner -def wait_until_true(predicate, timeout=60, sleep=1, exception=None): +def wait_until_true(predicate, timeout=60, sleep=1, exception=None, + initial_sleep=0): """ Wait until callable predicate is evaluated as True @@ -730,6 +731,7 @@ def wait_until_true(predicate, timeout=60, sleep=1, exception=None): (default) then WaitTimeout exception is raised. """ try: + eventlet.sleep(initial_sleep) with eventlet.timeout.Timeout(timeout): while not predicate(): eventlet.sleep(sleep) diff --git a/neutron/conf/plugins/ml2/drivers/linuxbridge.py b/neutron/conf/plugins/ml2/drivers/linuxbridge.py index b3659dbf1ba..f9049ca647b 100644 --- a/neutron/conf/plugins/ml2/drivers/linuxbridge.py +++ b/neutron/conf/plugins/ml2/drivers/linuxbridge.py @@ -19,8 +19,6 @@ from neutron._i18n import _ DEFAULT_BRIDGE_MAPPINGS = [] DEFAULT_INTERFACE_MAPPINGS = [] DEFAULT_VXLAN_GROUP = '224.0.0.1' -DEFAULT_KERNEL_HZ_VALUE = 250 # [Hz] -DEFAULT_TC_TBF_LATENCY = 50 # [ms] vxlan_opts = [ cfg.BoolOpt('enable_vxlan', default=True, @@ -76,20 +74,7 @@ bridge_opts = [ help=_("List of :")), ] -qos_options = [ - cfg.IntOpt('kernel_hz', default=DEFAULT_KERNEL_HZ_VALUE, - help=_("Value of host kernel tick rate (hz) for calculating " - "minimum burst value in bandwidth limit rules for " - "a port with QoS. See kernel configuration file for " - "HZ value and tc-tbf manual for more information.")), - cfg.IntOpt('tbf_latency', default=DEFAULT_TC_TBF_LATENCY, - help=_("Value of latency (ms) for calculating size of queue " - "for a port with QoS. See tc-tbf manual for more " - "information.")) -] - def register_linuxbridge_opts(cfg=cfg.CONF): cfg.register_opts(vxlan_opts, "VXLAN") cfg.register_opts(bridge_opts, "LINUX_BRIDGE") - cfg.register_opts(qos_options, "QOS") diff --git a/neutron/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/qos_driver.py b/neutron/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/qos_driver.py index dae46c792bb..7eb48a6ee70 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/qos_driver.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/qos_driver.py @@ -12,7 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_config import cfg +import collections + from oslo_log import helpers as log_helpers from oslo_log import log @@ -23,12 +24,18 @@ from neutron.agent.linux import tc_lib import neutron.common.constants as const from neutron.plugins.ml2.drivers.linuxbridge.mech_driver import ( mech_linuxbridge) +from neutron.services.qos import qos_consts LOG = log.getLogger(__name__) class QosLinuxbridgeAgentDriver(qos.QosLinuxAgentDriver): + # TODO(ralonsoh): + # - All driver calls should include the rule parameter, including + # the delete function, to have the 'direction' parameter. This QoS + # extension modification is going to be implemented in + # https://review.openstack.org/#/c/341186/ SUPPORTED_RULES = ( mech_linuxbridge.LinuxbridgeMechanismDriver.supported_qos_rule_types ) @@ -38,6 +45,10 @@ class QosLinuxbridgeAgentDriver(qos.QosLinuxAgentDriver): IPTABLES_DIRECTION_PREFIX = {const.INGRESS_DIRECTION: "i", const.EGRESS_DIRECTION: "o"} + def __init__(self): + super(QosLinuxbridgeAgentDriver, self).__init__() + self._port_rules = collections.defaultdict(dict) + def initialize(self): LOG.info(_LI("Initializing Linux bridge QoS extension")) self.iptables_manager = iptables_manager.IptablesManager(use_ipv6=True) @@ -58,22 +69,41 @@ class QosLinuxbridgeAgentDriver(qos.QosLinuxAgentDriver): @log_helpers.log_method_call def create_bandwidth_limit(self, port, rule): - tc_wrapper = self._get_tc_wrapper(port) - tc_wrapper.set_filters_bw_limit( - rule.max_kbps, self._get_egress_burst_value(rule) - ) + self.update_bandwidth_limit(port, rule) @log_helpers.log_method_call def update_bandwidth_limit(self, port, rule): - tc_wrapper = self._get_tc_wrapper(port) - tc_wrapper.update_filters_bw_limit( - rule.max_kbps, self._get_egress_burst_value(rule) - ) + device = port.get('device') + port_id = port.get('port_id') + if not device: + LOG.debug("update_bandwidth_limit was received for port %s but " + "device was not found. It seems that port is already " + "deleted", port_id) + return + + self._port_rules[port_id][qos_consts.RULE_TYPE_BANDWIDTH_LIMIT] = rule + max, burst, min = self._get_port_bw_parameters(port_id) + tc_wrapper = tc_lib.TcCommand(device) + tc_wrapper.set_bw(max, burst, min, const.EGRESS_DIRECTION) @log_helpers.log_method_call def delete_bandwidth_limit(self, port): - tc_wrapper = self._get_tc_wrapper(port) - tc_wrapper.delete_filters_bw_limit() + device = port.get('device') + port_id = port.get('port_id') + if not device: + LOG.debug("delete_bandwidth_limit was received for port %s but " + "device was not found. It seems that port is already " + "deleted", port_id) + return + + self._port_rules[port_id].pop(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, + None) + max, burst, min = self._get_port_bw_parameters(port_id) + tc_wrapper = tc_lib.TcCommand(device) + if not min: + tc_wrapper.delete_bw(const.EGRESS_DIRECTION) + else: + tc_wrapper.set_bw(max, burst, min, const.EGRESS_DIRECTION) @log_helpers.log_method_call def create_dscp_marking(self, port, rule): @@ -143,8 +173,53 @@ class QosLinuxbridgeAgentDriver(qos.QosLinuxAgentDriver): "mangle", chain_name, ip_version=ip_version) return len(rules_in_chain) == 0 - def _get_tc_wrapper(self, port): - return tc_lib.TcCommand( - port['device'], - cfg.CONF.QOS.kernel_hz, - ) + @log_helpers.log_method_call + def create_minimum_bandwidth(self, port, rule): + self.update_minimum_bandwidth(port, rule) + + @log_helpers.log_method_call + def update_minimum_bandwidth(self, port, rule): + device = port.get('device') + port_id = port.get('port_id') + if not device: + LOG.debug("update_minimum_bandwidth was received for port %s but " + "device was not found. It seems that port is already " + "deleted", port_id) + return + + self._port_rules[port_id][ + qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH] = rule + max, burst, min = self._get_port_bw_parameters(port_id) + tc_wrapper = tc_lib.TcCommand(device) + tc_wrapper.set_bw(max, burst, min, const.EGRESS_DIRECTION) + + @log_helpers.log_method_call + def delete_minimum_bandwidth(self, port): + device = port.get('device') + port_id = port.get('port_id') + if not device: + LOG.debug("delete_minimum_bandwidth was received for port %s but " + "device was not found. It seems that port is already " + "deleted", port_id) + return + + self._port_rules[port_id].pop(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH, + None) + max, burst, min = self._get_port_bw_parameters(port_id) + tc_wrapper = tc_lib.TcCommand(device) + if not max and not burst: + tc_wrapper.delete_bw(const.EGRESS_DIRECTION) + else: + tc_wrapper.set_bw(max, burst, min, const.EGRESS_DIRECTION) + + def _get_port_bw_parameters(self, port_id): + rules = self._port_rules[port_id] + if not rules: + return None, None, None + rule_min = rules.get(qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH) + rule_limit = rules.get(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT) + min = rule_min.min_kbps if rule_min else None + max = rule_limit.max_kbps if rule_limit else None + burst = (self._get_egress_burst_value(rule_limit) if rule_limit else + None) + return max, burst, min diff --git a/neutron/plugins/ml2/drivers/linuxbridge/mech_driver/mech_linuxbridge.py b/neutron/plugins/ml2/drivers/linuxbridge/mech_driver/mech_linuxbridge.py index c1eb7b3ee81..1eaac071bc7 100644 --- a/neutron/plugins/ml2/drivers/linuxbridge/mech_driver/mech_linuxbridge.py +++ b/neutron/plugins/ml2/drivers/linuxbridge/mech_driver/mech_linuxbridge.py @@ -33,7 +33,8 @@ class LinuxbridgeMechanismDriver(mech_agent.SimpleAgentMechanismDriverBase): """ supported_qos_rule_types = [qos_consts.RULE_TYPE_BANDWIDTH_LIMIT, - qos_consts.RULE_TYPE_DSCP_MARKING] + qos_consts.RULE_TYPE_DSCP_MARKING, + qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH] def __init__(self): sg_enabled = securitygroups_rpc.is_firewall_enabled() diff --git a/neutron/tests/fullstack/resources/client.py b/neutron/tests/fullstack/resources/client.py index 7f14f3a4428..ed3b14e7d7d 100644 --- a/neutron/tests/fullstack/resources/client.py +++ b/neutron/tests/fullstack/resources/client.py @@ -176,6 +176,22 @@ class ClientFixture(fixtures.Fixture): return rule['dscp_marking_rule'] + def create_minimum_bandwidth_rule(self, tenant_id, qos_policy_id, + min_bw=None): + rule = {'tenant_id': tenant_id} + if min_bw: + rule['min_kbps'] = min_bw + rule = self.client.create_minimum_bandwidth_rule( + policy=qos_policy_id, + body={'minimum_bandwidth_rule': rule}) + + self.addCleanup( + _safe_method(self.client.delete_minimum_bandwidth_rule), + rule['minimum_bandwidth_rule']['id'], + qos_policy_id) + + return rule['minimum_bandwidth_rule'] + def create_trunk(self, tenant_id, port_id, name=None, admin_state_up=None, sub_ports=None): """Create a trunk via API. diff --git a/neutron/tests/fullstack/test_qos.py b/neutron/tests/fullstack/test_qos.py index 3c6239b52be..5f894a317c2 100644 --- a/neutron/tests/fullstack/test_qos.py +++ b/neutron/tests/fullstack/test_qos.py @@ -18,6 +18,7 @@ from neutron_lib import constants from oslo_utils import uuidutils from neutron.agent.linux import tc_lib +from neutron.common import constants as common_consts from neutron.common import utils from neutron.services.qos import qos_consts from neutron.tests.common.agents import l2_extensions @@ -27,8 +28,6 @@ from neutron.tests.fullstack.resources import machine from neutron.tests.fullstack import utils as fullstack_utils from neutron.tests.unit import testlib_api -from neutron.conf.plugins.ml2.drivers import linuxbridge as \ - linuxbridge_agent_config from neutron.plugins.ml2.drivers.linuxbridge.agent import \ linuxbridge_neutron_agent as linuxbridge_agent from neutron.plugins.ml2.drivers.openvswitch.mech_driver import \ @@ -39,9 +38,22 @@ load_tests = testlib_api.module_load_tests BANDWIDTH_BURST = 100 BANDWIDTH_LIMIT = 500 +MINIMUM_BANDWIDTH = 200 DSCP_MARK = 16 +def _check_bw_limits(tc, limit, burst, min): + # NOTE(ralonsoh): once QoS bw limit rule has 'direction' parameter, this + # should be included in the function call. Now EGRESS_DIRECTION is forced. + observed = tc.get_limits(common_consts.EGRESS_DIRECTION) + if not (limit or burst or min): + return observed == (limit, burst, min) + elif not min and (limit or burst): + return observed == (limit, burst, limit) + elif not (limit or burst) and min: + return observed[2] == min + + class BaseQoSRuleTestCase(object): of_interface = None ovsdb_interface = None @@ -104,7 +116,7 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase): def _wait_for_bw_rule_removed(self, vm): # No values are provided when port doesn't have qos policy - self._wait_for_bw_rule_applied(vm, None, None) + self._wait_for_bw_rule_applied(vm) def _add_bw_limit_rule(self, limit, burst, qos_policy): qos_policy_id = qos_policy['id'] @@ -124,7 +136,8 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase): BANDWIDTH_LIMIT, BANDWIDTH_BURST)]) bw_rule = qos_policy['rules'][0] - self._wait_for_bw_rule_applied(vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST) + self._wait_for_bw_rule_applied(vm, limit=BANDWIDTH_LIMIT, + burst=BANDWIDTH_BURST) qos_policy_id = qos_policy['id'] self.client.delete_bandwidth_limit_rule(bw_rule['id'], qos_policy_id) @@ -138,14 +151,64 @@ class _TestBwLimitQoS(BaseQoSRuleTestCase): ) new_rule = self.safe_client.create_bandwidth_limit_rule( self.tenant_id, qos_policy_id, new_limit) - self._wait_for_bw_rule_applied(vm, new_limit, new_expected_burst) + self._wait_for_bw_rule_applied(vm, limit=new_limit, + burst=new_expected_burst) # Update qos policy rule id self.client.update_bandwidth_limit_rule( new_rule['id'], qos_policy_id, body={'bandwidth_limit_rule': {'max_kbps': BANDWIDTH_LIMIT, 'max_burst_kbps': BANDWIDTH_BURST}}) - self._wait_for_bw_rule_applied(vm, BANDWIDTH_LIMIT, BANDWIDTH_BURST) + self._wait_for_bw_rule_applied(vm, limit=BANDWIDTH_LIMIT, + burst=BANDWIDTH_BURST) + + # Remove qos policy from port + self.client.update_port( + vm.neutron_port['id'], + body={'port': {'qos_policy_id': None}}) + self._wait_for_bw_rule_removed(vm) + + +class _TestMinimumBwQoS(BaseQoSRuleTestCase): + + number_of_hosts = 1 + + def _wait_for_bw_rule_removed(self, vm): + # No values are provided when port doesn't have qos policy + self._wait_for_bw_rule_applied(vm) + + def _add_min_bw_rule(self, min, qos_policy): + qos_policy_id = qos_policy['id'] + rule = self.safe_client.create_minimum_bandwidth_rule( + self.tenant_id, qos_policy_id, min) + # Make it consistent with GET reply + rule['type'] = qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH + rule['qos_policy_id'] = qos_policy_id + qos_policy['rules'].append(rule) + + def test_min_bw_qos_policy_rule_lifecycle(self): + new_min = MINIMUM_BANDWIDTH + 100 + + # Create port with qos policy attached + vm, qos_policy = self._prepare_vm_with_qos_policy( + [functools.partial(self._add_min_bw_rule, MINIMUM_BANDWIDTH)]) + bw_rule = qos_policy['rules'][0] + + self._wait_for_bw_rule_applied(vm, min=MINIMUM_BANDWIDTH) + qos_policy_id = qos_policy['id'] + + self.client.delete_minimum_bandwidth_rule(bw_rule['id'], qos_policy_id) + self._wait_for_bw_rule_removed(vm) + + new_rule = self.safe_client.create_minimum_bandwidth_rule( + self.tenant_id, qos_policy_id, new_min) + self._wait_for_bw_rule_applied(vm, min=new_min) + + # Update qos policy rule id + self.client.update_minimum_bandwidth_rule( + new_rule['id'], qos_policy_id, + body={'minimum_bandwidth_rule': {'min_kbps': MINIMUM_BANDWIDTH}}) + self._wait_for_bw_rule_applied(vm, min=MINIMUM_BANDWIDTH) # Remove qos policy from port self.client.update_port( @@ -158,25 +221,32 @@ class TestBwLimitQoSOvs(_TestBwLimitQoS, base.BaseFullStackTestCase): l2_agent_type = constants.AGENT_TYPE_OVS scenarios = fullstack_utils.get_ovs_interface_scenarios() - def _wait_for_bw_rule_applied(self, vm, limit, burst): + def _wait_for_bw_rule_applied(self, vm, limit=None, burst=None): utils.wait_until_true( lambda: vm.bridge.get_egress_bw_limit_for_port( - vm.port.name) == (limit, burst)) + vm.port.name) == (limit, burst), + initial_sleep=2) class TestBwLimitQoSLinuxbridge(_TestBwLimitQoS, base.BaseFullStackTestCase): l2_agent_type = constants.AGENT_TYPE_LINUXBRIDGE - def _wait_for_bw_rule_applied(self, vm, limit, burst): + def _wait_for_bw_rule_applied(self, vm, limit=None, burst=None, min=None): port_name = linuxbridge_agent.LinuxBridgeManager.get_tap_device_name( vm.neutron_port['id']) - tc = tc_lib.TcCommand( - port_name, - linuxbridge_agent_config.DEFAULT_KERNEL_HZ_VALUE, - namespace=vm.host.host_namespace - ) - utils.wait_until_true( - lambda: tc.get_filters_bw_limits() == (limit, burst)) + tc = tc_lib.TcCommand(port_name, namespace=vm.host.host_namespace) + utils.wait_until_true(lambda: _check_bw_limits(tc, limit, burst, min)) + + +class TestMinimumBwQoSLinuxbridge(_TestMinimumBwQoS, + base.BaseFullStackTestCase): + l2_agent_type = constants.AGENT_TYPE_LINUXBRIDGE + + def _wait_for_bw_rule_applied(self, vm, limit=None, burst=None, min=None): + port_name = linuxbridge_agent.LinuxBridgeManager.get_tap_device_name( + vm.neutron_port['id']) + tc = tc_lib.TcCommand(port_name, namespace=vm.host.host_namespace) + utils.wait_until_true(lambda: _check_bw_limits(tc, limit, burst, min)) class _TestDscpMarkingQoS(BaseQoSRuleTestCase): diff --git a/neutron/tests/functional/agent/linux/test_tc_lib.py b/neutron/tests/functional/agent/linux/test_tc_lib.py index ebba13d13b6..5395c409a98 100644 --- a/neutron/tests/functional/agent/linux/test_tc_lib.py +++ b/neutron/tests/functional/agent/linux/test_tc_lib.py @@ -17,12 +17,10 @@ from neutron.agent.linux import ip_lib from neutron.agent.linux import tc_lib from neutron.tests.functional import base as functional_base -TEST_HZ_VALUE = 250 -LATENCY = 50 -BW_LIMIT = 1024 -BURST = 512 - -BASE_DEV_NAME = "test_tap" +BW_LIMIT = 100 +BURST = 50 +BW_MIN = 25 +DIRECTION_EGRESS = 'egress' class TcLibTestCase(functional_base.BaseSudoTestCase): @@ -38,48 +36,44 @@ class TcLibTestCase(functional_base.BaseSudoTestCase): self.addCleanup(tap_device.link.delete) tap_device.link.set_up() - def test_filters_bandwidth_limit(self): - device_name = "%s_filters" % BASE_DEV_NAME + def test_bandwidth_limit(self): + device_name = "tap_testmax" self.create_device(device_name) - tc = tc_lib.TcCommand(device_name, TEST_HZ_VALUE) + tc = tc_lib.TcCommand(device_name) - tc.set_filters_bw_limit(BW_LIMIT, BURST) - bw_limit, burst = tc.get_filters_bw_limits() + tc.set_bw(BW_LIMIT, BURST, None, DIRECTION_EGRESS) + bw_limit, burst, _ = tc.get_limits(DIRECTION_EGRESS) self.assertEqual(BW_LIMIT, bw_limit) self.assertEqual(BURST, burst) - new_bw_limit = BW_LIMIT + 500 + new_bw_limit = BW_LIMIT + 100 new_burst = BURST + 50 - tc.update_filters_bw_limit(new_bw_limit, new_burst) - bw_limit, burst = tc.get_filters_bw_limits() + tc.set_bw(new_bw_limit, new_burst, None, DIRECTION_EGRESS) + bw_limit, burst, _ = tc.get_limits(DIRECTION_EGRESS) self.assertEqual(new_bw_limit, bw_limit) self.assertEqual(new_burst, burst) - tc.delete_filters_bw_limit() - bw_limit, burst = tc.get_filters_bw_limits() + tc.delete_bw(DIRECTION_EGRESS) + bw_limit, burst, _ = tc.get_limits(DIRECTION_EGRESS) self.assertIsNone(bw_limit) self.assertIsNone(burst) - def test_tbf_bandwidth_limit(self): - device_name = "%s_tbf" % BASE_DEV_NAME + def test_minimum_bandwidth(self): + device_name = "tap_testmin" self.create_device(device_name) - tc = tc_lib.TcCommand(device_name, TEST_HZ_VALUE) + tc = tc_lib.TcCommand(device_name) - tc.set_tbf_bw_limit(BW_LIMIT, BURST, LATENCY) - bw_limit, burst = tc.get_tbf_bw_limits() - self.assertEqual(BW_LIMIT, bw_limit) - self.assertEqual(BURST, burst) + tc.set_bw(None, None, BW_MIN, DIRECTION_EGRESS) + _, _, bw_min = tc.get_limits(DIRECTION_EGRESS) + self.assertEqual(BW_MIN, bw_min) - new_bw_limit = BW_LIMIT + 500 - new_burst = BURST + 50 + new_bw_min = BW_MIN + 50 - tc.update_tbf_bw_limit(new_bw_limit, new_burst, LATENCY) - bw_limit, burst = tc.get_tbf_bw_limits() - self.assertEqual(new_bw_limit, bw_limit) - self.assertEqual(new_burst, burst) + tc.set_bw(None, None, new_bw_min, DIRECTION_EGRESS) + _, _, bw_min = tc.get_limits(DIRECTION_EGRESS) + self.assertEqual(new_bw_min, bw_min) - tc.delete_tbf_bw_limit() - bw_limit, burst = tc.get_tbf_bw_limits() - self.assertIsNone(bw_limit) - self.assertIsNone(burst) + tc.delete_bw(DIRECTION_EGRESS) + _, _, bw_min = tc.get_limits(DIRECTION_EGRESS) + self.assertIsNone(bw_min) diff --git a/neutron/tests/unit/agent/linux/test_ip_lib.py b/neutron/tests/unit/agent/linux/test_ip_lib.py index 067247324c2..f13fe25561f 100644 --- a/neutron/tests/unit/agent/linux/test_ip_lib.py +++ b/neutron/tests/unit/agent/linux/test_ip_lib.py @@ -389,6 +389,21 @@ class TestIpWrapper(base.BaseTestCase): run_as_root=True, namespace=None, log_fail_as_error=True) + def test_add_ifb(self): + ip_lib.IPWrapper().add_ifb('ifb-dummy0') + self.execute.assert_called_once_with([], 'link', + ('add', 'ifb-dummy0', + 'type', 'ifb'), + run_as_root=True, namespace=None, + log_fail_as_error=True) + + def test_del_ifb(self): + ip_lib.IPWrapper().del_ifb('ifb-dummy0') + self.execute.assert_called_once_with([], 'link', + ('del', 'ifb-dummy0'), + run_as_root=True, namespace=None, + log_fail_as_error=True) + def test_get_device(self): dev = ip_lib.IPWrapper(namespace='ns').device('eth0') self.assertEqual(dev.namespace, 'ns') diff --git a/neutron/tests/unit/agent/linux/test_tc_lib.py b/neutron/tests/unit/agent/linux/test_tc_lib.py index 4a3fcf726ba..f33aaa9231d 100644 --- a/neutron/tests/unit/agent/linux/test_tc_lib.py +++ b/neutron/tests/unit/agent/linux/test_tc_lib.py @@ -13,80 +13,69 @@ # License for the specific language governing permissions and limitations # under the License. +import math import mock +import testtools +from neutron.agent.linux import ip_lib from neutron.agent.linux import tc_lib from neutron.services.qos import qos_consts from neutron.tests import base DEVICE_NAME = "tap_device" -KERNEL_HZ_VALUE = 1000 BW_LIMIT = 2000 # [kbps] BURST = 100 # [kbit] -LATENCY = 50 # [ms] - -TC_QDISC_OUTPUT = ( - 'qdisc tbf 8011: root refcnt 2 rate %(bw)skbit burst %(burst)skbit ' - 'lat 50.0ms \n') % {'bw': BW_LIMIT, 'burst': BURST} - -TC_FILTERS_OUTPUT = ( - 'filter protocol all pref 49152 u32 \nfilter protocol all pref ' - '49152 u32 fh 800: ht divisor 1 \nfilter protocol all pref 49152 u32 fh ' - '800::800 order 2048 key ht 800 \n match 00000000/00000000 at 0\n ' - 'police 0x1e rate %(bw)skbit burst %(burst)skbit mtu 2Kb action \n' - 'drop overhead 0b \n ref 1 bind 1' -) % {'bw': BW_LIMIT, 'burst': BURST} class BaseUnitConversionTest(object): - def test_convert_to_kilobits_bare_value(self): - value = "1000" - expected_value = 8 # kbit + def test_convert_to_kilo_bare_value(self): + value = "10000" + expected_value = int(math.ceil(float(80000) / self.base_unit)) # kbit self.assertEqual( expected_value, - tc_lib.convert_to_kilobits(value, self.base_unit) + tc_lib.convert_to_kilo(value, self.base_unit) ) - def test_convert_to_kilobits_bytes_value(self): - value = "1000b" - expected_value = 8 # kbit + def test_convert_to_kilo_bytes_value(self): + value = "10000b" + expected_value = int(math.ceil(float(80000) / self.base_unit)) # kbit self.assertEqual( expected_value, - tc_lib.convert_to_kilobits(value, self.base_unit) + tc_lib.convert_to_kilo(value, self.base_unit) ) - def test_convert_to_kilobits_bits_value(self): + def test_convert_to_kilo_bits_value(self): value = "1000bit" - expected_value = tc_lib.bits_to_kilobits(1000, self.base_unit) + expected_value = int(math.ceil(float(1000) / self.base_unit)) self.assertEqual( expected_value, - tc_lib.convert_to_kilobits(value, self.base_unit) + tc_lib.convert_to_kilo(value, self.base_unit) ) - def test_convert_to_kilobits_megabytes_value(self): + def test_convert_to_kilo_megabytes_value(self): value = "1m" - expected_value = tc_lib.bits_to_kilobits( - self.base_unit ** 2 * 8, self.base_unit) + expected_value = int(math.ceil(float(self.base_unit ** 2 * 8) / + self.base_unit)) self.assertEqual( expected_value, - tc_lib.convert_to_kilobits(value, self.base_unit) + tc_lib.convert_to_kilo(value, self.base_unit) ) - def test_convert_to_kilobits_megabits_value(self): + def test_convert_to_kilo_megabits_value(self): value = "1mbit" - expected_value = tc_lib.bits_to_kilobits( - self.base_unit ** 2, self.base_unit) + expected_value = int(math.ceil(float(self.base_unit ** 2) / + self.base_unit)) self.assertEqual( expected_value, - tc_lib.convert_to_kilobits(value, self.base_unit) + tc_lib.convert_to_kilo(value, self.base_unit) ) def test_convert_to_bytes_wrong_unit(self): value = "1Zbit" self.assertRaises( tc_lib.InvalidUnit, - tc_lib.convert_to_kilobits, value, self.base_unit + tc_lib.convert_to_kilo, value, self.base_unit ) def test_bytes_to_bits(self): @@ -139,166 +128,659 @@ class TestIECUnitConversions(BaseUnitConversionTest, base.BaseTestCase): class TestTcCommand(base.BaseTestCase): + MAX_RATE = 10000 + BURST_RATE = 8000 + CBURST_RATE = 1500 + MIN_RATE = 1500 + RATE_LIMIT = 8 + DIRECTION_EGRESS = 'egress' + DIRECTION_INGRESS = 'ingress' + DEVICE_NAME = 'tap-test-dev' + IFB_NAME = 'ifb-test-dev' + CLASS_PARENT = '10:' + CLASSID = '10:1' + QDISC_PARENT = '20:2' + QDISC_HANDLE = '30:' + QDISC_ROOT = 'root' + QDISC_INGRESS = 'ingress' + QDISC_INGRESS_HANDLE = 'ffff:' + FILTER_PARENT = CLASS_PARENT + FILTER_PROTOCOL = ['all', 'u32'] + FILTER_FILTER = ['match', 'u32', '0', '0'] + FILTER_ACTION = ['mirred', 'egress', 'redirect', 'dev', IFB_NAME] + TYPE_HTB = 'htb' + + def _call_qdisc_add(self, device, parent, handle, qdisc_type): + cmd = ['tc', 'qdisc', 'add', 'dev', device] + if parent in [self.QDISC_ROOT, self.QDISC_INGRESS]: + cmd += [parent] + else: + cmd += ['parent', parent] + qdisc_type = '' if qdisc_type is None else qdisc_type + cmd += ['handle', handle, qdisc_type] + return cmd + + def _call_qdisc_del(self, device, parent): + cmd = ['tc', 'qdisc', 'del', 'dev', device] + if parent in [self.QDISC_ROOT, self.QDISC_INGRESS]: + cmd += [parent] + else: + cmd += ['parent', parent] + return cmd + + @staticmethod + def _call_qdisc_show(device): + return ['tc', 'qdisc', 'show', 'dev', device] + + def _call_class_replace(self, device, parent, classid, type, rate, ceil, + burst): + cmd = ['class', 'replace', 'dev', device] + if parent: + cmd += ['parent', parent] + rate = self.RATE_LIMIT if rate < self.RATE_LIMIT else rate + cmd += ['classid', classid, type, 'rate', rate] + if ceil: + ceil = rate if ceil < rate else ceil + cmd += ['ceil', ceil] + if burst: + cmd += ['burst', burst] + return cmd + + @staticmethod + def _call_class_show(device): + return ['tc', 'class', 'show', 'dev', device] + + @staticmethod + def _call_filter_add(device, parent, protocol, filter, action): + cmd = ['tc', 'filter', 'add', 'dev', device, 'parent', parent, + 'protocol'] + protocol + filter + if action: + cmd += ['action'] + action + return cmd + + @staticmethod + def _call_filter_show(device, parent): + return ['tc', 'filter', 'show', 'dev', device, 'parent', parent] + def setUp(self): super(TestTcCommand, self).setUp() - self.tc = tc_lib.TcCommand(DEVICE_NAME, KERNEL_HZ_VALUE) - self.bw_limit = "%s%s" % (BW_LIMIT, tc_lib.BW_LIMIT_UNIT) - self.burst = "%s%s" % (BURST, tc_lib.BURST_UNIT) - self.latency = "%s%s" % (LATENCY, tc_lib.LATENCY_UNIT) + self.tc = tc_lib.TcCommand(self.DEVICE_NAME) self.execute = mock.patch('neutron.agent.common.utils.execute').start() - def test_check_kernel_hz_lower_then_zero(self): - self.assertRaises( - tc_lib.InvalidKernelHzValue, - tc_lib.TcCommand, DEVICE_NAME, 0 - ) - self.assertRaises( - tc_lib.InvalidKernelHzValue, - tc_lib.TcCommand, DEVICE_NAME, -100 - ) + def test_set_bw_egress(self): + with mock.patch.object(self.tc, '_set_ingress_bw') as \ + mock_set_ingress_bw: + self.tc.set_bw(self.MAX_RATE, + self.BURST_RATE, + self.MIN_RATE, + self.DIRECTION_EGRESS) + mock_set_ingress_bw.assert_called_once_with( + self.MAX_RATE * tc_lib.SI_BASE, + (self.BURST_RATE * tc_lib.IEC_BASE) / 8, + self.MIN_RATE * tc_lib.SI_BASE) - def test_get_filters_bw_limits(self): - self.execute.return_value = TC_FILTERS_OUTPUT - bw_limit, burst_limit = self.tc.get_filters_bw_limits() - self.assertEqual(BW_LIMIT, bw_limit) - self.assertEqual(BURST, burst_limit) + def test_set_bw_ingress(self): + with testtools.ExpectedException(NotImplementedError): + self.tc.set_bw(self.MAX_RATE, self.BURST_RATE, self.MIN_RATE, + self.DIRECTION_INGRESS) - def test_get_filters_bw_limits_when_output_not_match(self): - output = ( - "Some different " - "output from command:" - "tc filters show dev XXX parent ffff:" - ) - self.execute.return_value = output - bw_limit, burst_limit = self.tc.get_filters_bw_limits() - self.assertIsNone(bw_limit) - self.assertIsNone(burst_limit) + def test_delete_bw_egress(self): + with mock.patch.object(self.tc, '_delete_ingress') as \ + mock_delete_ingress: + self.tc.delete_bw(self.DIRECTION_EGRESS) + mock_delete_ingress.assert_called_once_with() - def test_get_filters_bw_limits_when_wrong_units(self): - output = TC_FILTERS_OUTPUT.replace("kbit", "Xbit") - self.execute.return_value = output - self.assertRaises(tc_lib.InvalidUnit, self.tc.get_filters_bw_limits) + def test_delete_bw_ingress(self): + with testtools.ExpectedException(NotImplementedError): + self.tc.delete_bw(self.DIRECTION_INGRESS) - def test_get_tbf_bw_limits(self): - self.execute.return_value = TC_QDISC_OUTPUT - bw_limit, burst_limit = self.tc.get_tbf_bw_limits() - self.assertEqual(BW_LIMIT, bw_limit) - self.assertEqual(BURST, burst_limit) + def test_set_ingress_bw(self): + with mock.patch.object(self.tc, '_add_policy_qdisc') as \ + mock_add_policy_qdisc, \ + mock.patch.object(self.tc, '_configure_ifb') as \ + mock_configure_ifb: + self.tc._set_ingress_bw(self.MAX_RATE, self.BURST_RATE, + self.MIN_RATE) + mock_add_policy_qdisc.assert_called_once_with( + tc_lib.INGRESS_QDISC, tc_lib.INGRESS_QDISC_HANDLE) + mock_configure_ifb.assert_called_once_with( + max=self.MAX_RATE, burst=self.BURST_RATE, + min=self.MIN_RATE) - def test_get_tbf_bw_limits_when_wrong_qdisc(self): - output = TC_QDISC_OUTPUT.replace("tbf", "different_qdisc") - self.execute.return_value = output - bw_limit, burst_limit = self.tc.get_tbf_bw_limits() - self.assertIsNone(bw_limit) - self.assertIsNone(burst_limit) + def test_delete_ingress_no_ifb(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=None) as mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_del_policy_qdisc') as \ + mock_del_policy_qdisc: + self.tc._delete_ingress() + mock_find_mirrored_ifb.assert_called_once_with() + mock_del_policy_qdisc.assert_called_once_with(tc_lib.INGRESS_QDISC) - def test_get_tbf_bw_limits_when_wrong_units(self): - output = TC_QDISC_OUTPUT.replace("kbit", "Xbit") - self.execute.return_value = output - self.assertRaises(tc_lib.InvalidUnit, self.tc.get_tbf_bw_limits) + def test_delete_ingress_with_ifb(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=self.IFB_NAME) as mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_del_policy_qdisc') as \ + mock_del_policy_qdisc, \ + mock.patch.object(self.tc, '_del_ifb') as mock_del_ifb: + self.tc._delete_ingress() + mock_find_mirrored_ifb.assert_called_once_with() + mock_del_policy_qdisc.assert_called_once_with(tc_lib.INGRESS_QDISC) + mock_del_ifb.assert_called_once_with(self.IFB_NAME) - def test_set_tbf_bw_limit(self): - self.tc.set_tbf_bw_limit(BW_LIMIT, BURST, LATENCY) - self.execute.assert_called_once_with( - ["tc", "qdisc", "replace", "dev", DEVICE_NAME, - "root", "tbf", "rate", self.bw_limit, - "latency", self.latency, - "burst", self.burst], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=None - ) + def test_add_policy_qdisc_no_qdisc(self): + with mock.patch.object(self.tc, '_show_policy_qdisc', + return_value=None) as \ + mock_show_policy_qdisc: + self.tc._add_policy_qdisc(self.QDISC_PARENT, self.QDISC_HANDLE) + mock_show_policy_qdisc.assert_called_once_with( + self.QDISC_PARENT, dev=self.DEVICE_NAME) - def test_update_filters_bw_limit(self): - self.tc.update_filters_bw_limit(BW_LIMIT, BURST) - self.execute.assert_has_calls([ - mock.call( - ["tc", "qdisc", "del", "dev", DEVICE_NAME, "ingress"], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=[2] - ), - mock.call( - ['tc', 'qdisc', 'add', 'dev', DEVICE_NAME, "ingress", - "handle", tc_lib.INGRESS_QDISC_ID], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=None - ), - mock.call( - ['tc', 'filter', 'add', 'dev', DEVICE_NAME, - 'parent', tc_lib.INGRESS_QDISC_ID, 'protocol', 'all', - 'prio', '49', 'basic', 'police', - 'rate', self.bw_limit, - 'burst', self.burst, - 'mtu', tc_lib.MAX_MTU_VALUE, - 'drop'], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=None - )] - ) + def test_add_policy_qdisc_existing_qdisc(self): + with mock.patch.object(self.tc, '_show_policy_qdisc') as \ + mock_show_policy_qdisc, \ + mock.patch.object(self.tc, '_del_policy_qdisc') as \ + mock_del_policy_qdisc: + qdisc = {'type': self.TYPE_HTB, + 'handle': self.QDISC_HANDLE, + 'parentid': 'parent1'} + mock_show_policy_qdisc.return_value = qdisc + self.tc._add_policy_qdisc(self.QDISC_PARENT, + self.QDISC_HANDLE, qdisc_type=self.TYPE_HTB) + mock_show_policy_qdisc.assert_called_once_with( + self.QDISC_PARENT, dev=self.DEVICE_NAME) + mock_del_policy_qdisc.assert_not_called() - def test_update_tbf_bw_limit(self): - self.tc.update_tbf_bw_limit(BW_LIMIT, BURST, LATENCY) - self.execute.assert_called_once_with( - ["tc", "qdisc", "replace", "dev", DEVICE_NAME, - "root", "tbf", "rate", self.bw_limit, - "latency", self.latency, - "burst", self.burst], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=None - ) + def _add_policy_qdisc_parent_type(self, parent, type): + with mock.patch.object(self.tc, '_show_policy_qdisc') as \ + mock_show_policy_qdisc, \ + mock.patch.object(self.tc, '_del_policy_qdisc') as \ + mock_del_policy_qdisc: + qdisc = {'type': 'type1', + 'handle': 'handle1', + 'parentid': 'parent1'} + mock_show_policy_qdisc.return_value = qdisc + self.tc._add_policy_qdisc(parent, self.QDISC_HANDLE, + qdisc_type=type) + mock_show_policy_qdisc.assert_called_once_with( + parent, dev=self.DEVICE_NAME) + mock_del_policy_qdisc.assert_called_once_with(parent, + dev=self.DEVICE_NAME) + cmd = self._call_qdisc_add(self.DEVICE_NAME, parent, + self.QDISC_HANDLE, type) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, log_fail_as_error=True, run_as_root=True) - def test_delete_filters_bw_limit(self): - self.tc.delete_filters_bw_limit() - self.execute.assert_called_once_with( - ["tc", "qdisc", "del", "dev", DEVICE_NAME, "ingress"], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=[2] - ) + def test_add_policy_qdisc_root_parent(self): + self._add_policy_qdisc_parent_type(self.QDISC_ROOT, self.TYPE_HTB) - def test_delete_tbf_bw_limit(self): - self.tc.delete_tbf_bw_limit() - self.execute.assert_called_once_with( - ["tc", "qdisc", "del", "dev", DEVICE_NAME, "root"], - run_as_root=True, - check_exit_code=True, - log_fail_as_error=True, - extra_ok_codes=[2] - ) + def test_add_policy_qdisc_ingress_parent(self): + self._add_policy_qdisc_parent_type(self.QDISC_INGRESS, self.TYPE_HTB) + + def test_add_policy_qdisc_other_parent(self): + self._add_policy_qdisc_parent_type(self.QDISC_PARENT, self.TYPE_HTB) + + def _add_policy_qdisc_no_qdisc_type(self): + self._add_policy_qdisc_parent_type(self.QDISC_PARENT, None) + + def test_del_policy_qdisc(self): + with mock.patch.object(self.tc, '_show_policy_qdisc', + return_value=True): + self.tc._del_policy_qdisc(self.QDISC_PARENT) + cmd = self._call_qdisc_del(self.DEVICE_NAME, self.QDISC_PARENT) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, log_fail_as_error=True, run_as_root=True) + + def test_del_policy_qdisc_root_parent(self): + with mock.patch.object(self.tc, '_show_policy_qdisc', + return_value=True): + self.tc._del_policy_qdisc(self.QDISC_ROOT) + cmd = self._call_qdisc_del(self.DEVICE_NAME, self.QDISC_ROOT) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, log_fail_as_error=True, run_as_root=True) + + def test_del_policy_qdisc_no_qdisc(self): + with mock.patch.object(self.tc, '_show_policy_qdisc', + return_value=False): + self.tc._del_policy_qdisc(self.QDISC_ROOT) + self.execute.assert_not_called() + + def test_list_policy_qdisc(self): + qdisc_out = 'qdisc htb 1: root refcnt 2 r2q 10 default 0 ' + qdisc_out += 'direct_packets_stat 138 direct_qlen 32\n' + qdisc_out += 'qdisc htb 10: parent 1:1 r2q 10 default 0 ' + qdisc_out += 'direct_packets_stat 0 direct_qlen 32\n' + qdisc_out += 'qdisc ingress ffff: parent ffff:fff1 ----------------' + self.execute.return_value = qdisc_out + ret_value = self.tc._list_policy_qdisc() + cmd = self._call_qdisc_show(self.DEVICE_NAME) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, + log_fail_as_error=True, + run_as_root=True) + qdiscs = {'1:1': {'handle': '10:', + 'type': 'htb', + 'parentid': '1:1'}, + 'root': {'handle': '1:', + 'type': 'htb', + 'parentid': 'root'}, + 'ingress': {'handle': 'ffff:', + 'type': 'ingress', + 'parentid': 'ffff:fff1'}} + self.assertEqual(qdiscs, ret_value) + + def test_list_policy_qdisc_no_match(self): + self.execute.return_value = 'no matches' + ret_value = self.tc._list_policy_qdisc() + cmd = self._call_qdisc_show(self.DEVICE_NAME) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, + log_fail_as_error=True, + run_as_root=True) + qdiscs = {} + self.assertEqual(qdiscs, ret_value) + + def test_show_policy_qdisc(self): + with mock.patch.object(self.tc, '_list_policy_qdisc') as \ + mock_list_policy_qdisc: + self.tc._show_policy_qdisc(self.QDISC_PARENT) + mock_list_policy_qdisc.assert_called_once_with(self.DEVICE_NAME) + + def test_add_policy_class_existing_class_set_min_bw(self): + with mock.patch.object(self.tc, '_show_policy_class') as \ + mock_show_policy_class, \ + mock.patch.object(self.tc, '_cmd_policy_class') as \ + mock_cmd_policy_class: + classes = {'type': self.TYPE_HTB, + 'parentid': self.CLASS_PARENT, + 'prio': 0, + 'rate': self.MIN_RATE + 1, + 'ceil': self.MAX_RATE, + 'burst': self.BURST_RATE, + 'cburst': self.CBURST_RATE} + mock_show_policy_class.return_value = classes + _min = tc_lib.kilobits_to_bits(self.MIN_RATE, tc_lib.SI_BASE) + _max = tc_lib.kilobits_to_bits(self.MAX_RATE, tc_lib.SI_BASE) + _burst = tc_lib.bits_to_bytes(tc_lib.kilobits_to_bits( + self.BURST_RATE, tc_lib.IEC_BASE)) + cmd = self._call_class_replace(self.DEVICE_NAME, + self.CLASS_PARENT, self.CLASSID, self.TYPE_HTB, _min, + None, None) + mock_cmd_policy_class.return_value = cmd + self.tc._add_policy_class(self.CLASS_PARENT, self.CLASSID, + self.TYPE_HTB, rate=_min) + mock_show_policy_class.assert_called_once_with( + self.CLASSID, dev=self.DEVICE_NAME) + mock_cmd_policy_class.assert_called_once_with(self.CLASSID, + self.TYPE_HTB, _min, self.DEVICE_NAME, self.CLASS_PARENT, + _max, _burst) + self.execute.assert_called_once_with(['tc'] + cmd, + check_exit_code=True, extra_ok_codes=None, + log_fail_as_error=True, run_as_root=True) + + def test_add_policy_class_existing_class_set_bw_limit(self): + with mock.patch.object(self.tc, '_show_policy_class') as \ + mock_show_policy_class, \ + mock.patch.object(self.tc, '_cmd_policy_class') as \ + mock_cmd_policy_class: + classes = {'type': self.TYPE_HTB, + 'parentid': self.CLASS_PARENT, + 'prio': 0, + 'rate': self.MIN_RATE, + 'ceil': self.MAX_RATE + 1, + 'burst': self.BURST_RATE + 1, + 'cburst': self.CBURST_RATE} + mock_show_policy_class.return_value = classes + _min = tc_lib.kilobits_to_bits(self.MIN_RATE, tc_lib.SI_BASE) + _max = tc_lib.kilobits_to_bits(self.MAX_RATE, tc_lib.SI_BASE) + _burst = tc_lib.bits_to_bytes(tc_lib.kilobits_to_bits( + self.BURST_RATE, tc_lib.IEC_BASE)) + cmd = ['tc'] + self._call_class_replace(self.DEVICE_NAME, + self.CLASS_PARENT, self.CLASSID, self.TYPE_HTB, _min, + _max, _burst) + mock_cmd_policy_class.return_value = cmd + self.tc._add_policy_class(self.CLASS_PARENT, self.CLASSID, + self.TYPE_HTB, ceil=_max, burst=_burst) + mock_show_policy_class.assert_called_once_with( + self.CLASSID, dev=self.DEVICE_NAME) + mock_cmd_policy_class.assert_called_once_with(self.CLASSID, + self.TYPE_HTB, _min, self.DEVICE_NAME, self.CLASS_PARENT, + _max, _burst) + self.execute.assert_called_once_with(['tc'] + cmd, + check_exit_code=True, extra_ok_codes=None, + log_fail_as_error=True, run_as_root=True) + + def test_add_policy_class_non_existing_class(self): + with mock.patch.object(self.tc, '_show_policy_class', + return_value={}) as mock_show_policy_class, \ + mock.patch.object(self.tc, '_cmd_policy_class') as \ + mock_cmd_policy_class: + _min = tc_lib.kilobits_to_bits(self.MIN_RATE, tc_lib.SI_BASE) + cmd = ['tc'] + self._call_class_replace(self.DEVICE_NAME, + self.CLASS_PARENT, self.CLASSID, self.TYPE_HTB, _min, + None, None) + mock_cmd_policy_class.return_value = cmd + self.tc._add_policy_class(self.CLASS_PARENT, self.CLASSID, + self.TYPE_HTB, rate=_min) + mock_show_policy_class.assert_called_once_with( + self.CLASSID, dev=self.DEVICE_NAME) + mock_cmd_policy_class.assert_called_once_with(self.CLASSID, + self.TYPE_HTB, _min, self.DEVICE_NAME, self.CLASS_PARENT, + None, None) + self.execute.assert_called_once_with(['tc'] + cmd, + check_exit_code=True, extra_ok_codes=None, + log_fail_as_error=True, run_as_root=True) + + def test_add_policy_class_no_rate_no_ceil(self): + with testtools.ExpectedException(tc_lib.InvalidPolicyClassParameters): + self.tc._add_policy_class(self.CLASS_PARENT, self.CLASSID, + self.TYPE_HTB, rate=None, ceil=None) + + def test_cmd_policy_class(self): + cmd_out = self.tc._cmd_policy_class(self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.DEVICE_NAME, + self.CLASS_PARENT, self.MAX_RATE, + self.BURST_RATE) + cmd_ref = self._call_class_replace(self.DEVICE_NAME, self.CLASS_PARENT, + self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.MAX_RATE, + self.BURST_RATE) + self.assertEqual(cmd_ref, cmd_out) + + def test_cmd_policy_class_no_parent(self): + cmd_out = self.tc._cmd_policy_class(self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.DEVICE_NAME, + None, self.MAX_RATE, + self.BURST_RATE) + cmd_ref = self._call_class_replace(self.DEVICE_NAME, None, + self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.MAX_RATE, + self.BURST_RATE) + self.assertEqual(cmd_ref, cmd_out) + + def test_cmd_policy_class_rate_less_8(self): + cmd_out = self.tc._cmd_policy_class(self.CLASSID, self.TYPE_HTB, + 5, self.DEVICE_NAME, + self.CLASS_PARENT, None, None) + cmd_ref = self._call_class_replace(self.DEVICE_NAME, self.CLASS_PARENT, + self.CLASSID, self.TYPE_HTB, + self.RATE_LIMIT, None, None) + self.assertEqual(cmd_ref, cmd_out) + + def test_cmd_policy_class_no_ceil(self): + cmd_out = self.tc._cmd_policy_class(self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.DEVICE_NAME, + self.CLASS_PARENT, None, + self.BURST_RATE) + cmd_ref = self._call_class_replace(self.DEVICE_NAME, self.CLASS_PARENT, + self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, None, + self.BURST_RATE) + self.assertEqual(cmd_ref, cmd_out) + + def test_cmd_policy_class_no_burst(self): + cmd_out = self.tc._cmd_policy_class(self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, self.DEVICE_NAME, + self.CLASS_PARENT, None, None) + cmd_ref = self._call_class_replace(self.DEVICE_NAME, self.CLASS_PARENT, + self.CLASSID, self.TYPE_HTB, + self.MIN_RATE, None, None) + self.assertEqual(cmd_ref, cmd_out) + + def test_list_policy_class(self): + class_out = 'class htb 1:1 root rate 300000bit ceil 300000bit burst ' + class_out += '2560b cburst 2688b\n' + class_out += 'class htb 1:10 parent 1:1 prio 0 rate 24000bit ceil ' + class_out += '300000bit burst 2560b cburst 2688b\n' + class_out += 'class htb 1:20 parent 1:1 prio 1 rate 24000bit ceil ' + class_out += '300000bit burst 2560b cburst 2688b' + self.execute.return_value = class_out + ret_val = self.tc._list_policy_class() + cmd = self._call_class_show(self.DEVICE_NAME) + self.execute.assert_called_once_with(cmd, check_exit_code=False, + extra_ok_codes=None, + log_fail_as_error=True, + run_as_root=True) + expected = {'1:1': {'prio': None, 'burst': 20, 'ceil': 300, + 'rate': 300, 'parentid': None, 'cburst': 21, + 'type': 'htb'}, + '1:10': {'prio': '0', 'burst': 20, 'ceil': 300, 'rate': 24, + 'parentid': '1:1', 'cburst': 21, 'type': 'htb'}, + '1:20': {'prio': '1', 'burst': 20, 'ceil': 300, 'rate': 24, + 'parentid': '1:1', 'cburst': 21, 'type': 'htb'}} + self.assertEqual(expected, ret_val) + + def test_show_policy_class(self): + with mock.patch.object(self.tc, '_list_policy_class') as \ + mock_list_policy_class: + classes = {self.CLASSID: {'prio': None, 'burst': 20, 'ceil': 300, + 'rate': 300, 'parentid': None, + 'cburst': 21, 'type': 'htb'}} + mock_list_policy_class.return_value = classes + ret_val = self.tc._show_policy_class(self.CLASSID) + mock_list_policy_class.assert_called_once_with(self.DEVICE_NAME) + self.assertEqual(classes[self.CLASSID], ret_val) + + def test_add_policy_filter_with_action(self): + self.tc._add_policy_filter(self.FILTER_PARENT, self.FILTER_PROTOCOL, + self.FILTER_FILTER, + action=self.FILTER_ACTION) + cmd = self._call_filter_add(self.DEVICE_NAME, self.FILTER_PARENT, + self.FILTER_PROTOCOL, self.FILTER_FILTER, + self.FILTER_ACTION) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, + log_fail_as_error=True, + run_as_root=True) + + def test_add_policy_filter_without_action(self): + self.tc._add_policy_filter(self.FILTER_PARENT, self.FILTER_PROTOCOL, + self.FILTER_FILTER) + cmd = self._call_filter_add(self.DEVICE_NAME, self.FILTER_PARENT, + self.FILTER_PROTOCOL, self.FILTER_FILTER, + None) + self.execute.assert_called_once_with(cmd, check_exit_code=True, + extra_ok_codes=None, + log_fail_as_error=True, + run_as_root=True) + + def test_list_policy_filters_root_parent(self): + self.tc._list_policy_filters(self.QDISC_ROOT) + cmd = self._call_filter_show(self.DEVICE_NAME, + self.QDISC_ROOT) + self.execute.assert_called_once_with(cmd, extra_ok_codes=None, + log_fail_as_error=True, + check_exit_code=True, + run_as_root=True) + + def test_list_policy_filters_other_parent(self): + self.tc._list_policy_filters(self.QDISC_INGRESS_HANDLE) + cmd = self._call_filter_show(self.DEVICE_NAME, + self.QDISC_INGRESS_HANDLE) + self.execute.assert_called_once_with(cmd, extra_ok_codes=None, + log_fail_as_error=True, + check_exit_code=True, + run_as_root=True) + + @mock.patch.object(ip_lib.IPWrapper, "add_ifb") + @mock.patch.object(ip_lib.IPDevice, "exists") + @mock.patch.object(ip_lib.IPDevice, "disable_ipv6") + @mock.patch.object(ip_lib.IpLinkCommand, "set_up") + def test_add_ifb_existing_ifb(self, mock_set_up, mock_disable_ipv6, + mock_exists, mock_add_ifb): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=True): + mock_exists.return_value = True + self.tc._add_ifb(self.DEVICE_NAME) + mock_add_ifb.assert_not_called() + mock_exists.assert_called_once_with() + mock_disable_ipv6.assert_called_once_with() + mock_set_up.assert_called_once_with() + + @mock.patch.object(ip_lib.IPWrapper, "add_ifb") + @mock.patch.object(ip_lib.IPDevice, "exists") + @mock.patch.object(ip_lib.IPDevice, "disable_ipv6") + @mock.patch.object(ip_lib.IpLinkCommand, "set_up") + def test_add_ifb_non_existing_ifb(self, mock_set_up, mock_disable_ipv6, + mock_exists, + mock_add_ifb): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=True), \ + mock.patch.object(self.tc, '_del_ifb') as mock_del_ifb: + mock_exists.return_value = False + mock_add_ifb.return_value = ip_lib.IPDevice(self.DEVICE_NAME) + self.tc._add_ifb(self.DEVICE_NAME) + mock_add_ifb.assert_called_once_with(self.DEVICE_NAME) + mock_exists.assert_called_once_with() + mock_del_ifb.assert_called_once_with(dev_name=self.DEVICE_NAME) + mock_disable_ipv6.assert_called_once_with() + mock_set_up.assert_called_once_with() + + @mock.patch.object(ip_lib.IPWrapper, "add_ifb") + @mock.patch.object(ip_lib.IPDevice, "disable_ipv6") + @mock.patch.object(ip_lib.IpLinkCommand, "set_up") + def test_add_ifb_not_found(self, mock_set_up, mock_disable_ipv6, + mock_add_ifb): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=False), \ + mock.patch.object(self.tc, '_del_ifb') as mock_del_ifb: + mock_add_ifb.return_value = ip_lib.IPDevice(self.DEVICE_NAME) + self.tc._add_ifb(self.DEVICE_NAME) + mock_add_ifb.assert_called_once_with(self.DEVICE_NAME) + mock_del_ifb.assert_called_once_with(dev_name=self.DEVICE_NAME) + mock_disable_ipv6.assert_called_once_with() + mock_set_up.assert_called_once_with() + + @mock.patch.object(ip_lib.IPWrapper, "del_ifb") + @mock.patch.object(ip_lib.IPWrapper, "get_devices") + def test_del_ifb_existing_netdevice(self, mock_get_devices, mock_del_ifb): + ret_val = [ip_lib.IPDevice('other_name'), + ip_lib.IPDevice(self.DEVICE_NAME)] + mock_get_devices.return_value = ret_val + self.tc._del_ifb(self.DEVICE_NAME) + mock_del_ifb.assert_called_once_with(self.DEVICE_NAME) + + @mock.patch.object(ip_lib.IPWrapper, "del_ifb") + @mock.patch.object(ip_lib.IPWrapper, "get_devices") + def test_del_ifb_not_existing_netdevice(self, mock_get_devices, + mock_del_ifb): + ret_val = [ip_lib.IPDevice('other_name'), + ip_lib.IPDevice('another_name')] + mock_get_devices.return_value = ret_val + self.tc._del_ifb(self.DEVICE_NAME) + mock_del_ifb.assert_not_called() + + @mock.patch.object(ip_lib.IPWrapper, "del_ifb") + @mock.patch.object(ip_lib.IPWrapper, "get_devices") + def test_del_ifb_no_netdevices(self, mock_get_devices, mock_del_ifb): + mock_get_devices.return_value = [] + self.tc._del_ifb(self.DEVICE_NAME) + mock_del_ifb.assert_not_called() + + @mock.patch.object(ip_lib.IPDevice, "exists") + def test_find_mirrored_ifb(self, mock_ipdevice_exists): + ifb_name = self.tc._name.replace("tap", "ifb") + mock_ipdevice_exists.return_value = True + ret = self.tc._find_mirrored_ifb() + self.assertEqual(ifb_name, ret) + mock_ipdevice_exists.return_value = False + ret = self.tc._find_mirrored_ifb() + self.assertIsNone(ret) + + def test_configure_ifb_non_existing_ifb(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=None) as \ + mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_add_ifb', + return_value=self.IFB_NAME) as \ + mock_add_ifb, \ + mock.patch.object(self.tc, '_add_policy_qdisc') as \ + mock_add_policy_qdisc, \ + mock.patch.object(self.tc, '_add_policy_class') as \ + mock_add_policy_class, \ + mock.patch.object(self.tc, '_add_policy_filter') as \ + mock_add_policy_filter: + self.tc._configure_ifb(max=self.MAX_RATE, burst=self.BURST_RATE, + min=self.MIN_RATE) + mock_find_mirrored_ifb.assert_called_once_with() + mock_add_ifb.assert_called_once_with(self.IFB_NAME) + mock_add_policy_filter.assert_called_once_with( + self.QDISC_INGRESS_HANDLE, self.FILTER_PROTOCOL, + self.FILTER_FILTER, dev=self.DEVICE_NAME, + action=self.FILTER_ACTION) + mock_add_policy_qdisc.assert_called_once_with( + self.QDISC_ROOT, "1:", qdisc_type=self.TYPE_HTB, + dev=self.IFB_NAME) + mock_add_policy_class.assert_called_once_with("1:", "1:1", + self.TYPE_HTB, rate=self.MIN_RATE, ceil=self.MAX_RATE, + burst=self.BURST_RATE, dev=self.IFB_NAME) + + def test_configure_ifb_existing_ifb(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=self.IFB_NAME) as \ + mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_add_ifb', + return_value=self.IFB_NAME) as \ + mock_add_ifb, \ + mock.patch.object(self.tc, '_add_policy_qdisc') as \ + mock_add_policy_qdisc, \ + mock.patch.object(self.tc, '_add_policy_class') as \ + mock_add_policy_class: + self.tc._configure_ifb(max=self.MAX_RATE, burst=self.BURST_RATE, + min=self.MIN_RATE) + mock_find_mirrored_ifb.assert_called_once_with() + mock_add_ifb.assert_not_called() + mock_add_policy_qdisc.assert_called_once_with( + self.QDISC_ROOT, "1:", qdisc_type=self.TYPE_HTB, + dev=self.IFB_NAME) + mock_add_policy_class.assert_called_once_with("1:", "1:1", + self.TYPE_HTB, rate=self.MIN_RATE, ceil=self.MAX_RATE, + burst=self.BURST_RATE, dev=self.IFB_NAME) def test_get_ingress_qdisc_burst_value_burst_not_none(self): self.assertEqual( BURST, self.tc.get_ingress_qdisc_burst_value(BW_LIMIT, BURST) ) - def test_get_ingress_qdisc_burst_no_burst_value_given(self): + def test_get_ingress_qdisc_burst_value_no_burst_value_given(self): expected_burst = BW_LIMIT * qos_consts.DEFAULT_BURST_RATE self.assertEqual( expected_burst, self.tc.get_ingress_qdisc_burst_value(BW_LIMIT, None) ) - def test_get_ingress_qdisc_burst_burst_value_zero(self): + def test_get_ingress_qdisc_burst_value_burst_value_zero(self): expected_burst = BW_LIMIT * qos_consts.DEFAULT_BURST_RATE self.assertEqual( expected_burst, self.tc.get_ingress_qdisc_burst_value(BW_LIMIT, 0) ) - def test__get_tbf_burst_value_when_burst_bigger_then_minimal(self): - result = self.tc._get_tbf_burst_value(BW_LIMIT, BURST) - self.assertEqual(BURST, result) + def test_get_ingress_limits_no_ifb(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=None) as \ + mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_show_policy_class') as \ + mock_show_policy_class: + max_bw, burst, min_bw = self.tc._get_ingress_limits() + mock_find_mirrored_ifb.assert_called_once_with() + mock_show_policy_class.assert_not_called() + self.assertIsNone(max_bw) + self.assertIsNone(burst) + self.assertIsNone(min_bw) - def test__get_tbf_burst_value_when_burst_smaller_then_minimal(self): - result = self.tc._get_tbf_burst_value(BW_LIMIT, 0) - self.assertEqual(2, result) + def test_get_ingress_limits_ifb_present(self): + with mock.patch.object(self.tc, '_find_mirrored_ifb', + return_value=self.IFB_NAME) as \ + mock_find_mirrored_ifb, \ + mock.patch.object(self.tc, '_show_policy_class') as \ + mock_show_policy_class: + classes = {'rate': self.MIN_RATE, + 'ceil': self.MAX_RATE, + 'burst': self.BURST_RATE} + mock_show_policy_class.return_value = classes + max_bw, burst, min_bw = self.tc._get_ingress_limits() + mock_find_mirrored_ifb.assert_called_once_with() + mock_show_policy_class.assert_called_once_with("1:1", + dev=self.IFB_NAME) + self.assertEqual((self.MAX_RATE, self.BURST_RATE, self.MIN_RATE), + (max_bw, burst, min_bw)) diff --git a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/test_qos_driver.py b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/test_qos_driver.py index 46ec14eb8e6..4f58bee56b5 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/test_qos_driver.py +++ b/neutron/tests/unit/plugins/ml2/drivers/linuxbridge/agent/extension_drivers/test_qos_driver.py @@ -12,39 +12,68 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import mock +import uuid -from oslo_config import cfg from oslo_utils import uuidutils +from neutron.agent.l2.extensions import qos_linux as qos_extensions from neutron.agent.linux import tc_lib from neutron.objects.qos import rule -from neutron.plugins.ml2.drivers.linuxbridge.agent.common import config # noqa from neutron.plugins.ml2.drivers.linuxbridge.agent.extension_drivers import ( qos_driver) +from neutron.services.qos import qos_consts from neutron.tests import base -TEST_LATENCY_VALUE = 100 DSCP_VALUE = 32 +class FakeVifPort(object): + ofport = 99 + port_name = 'name' + vif_mac = 'aa:bb:cc:11:22:33' + + class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase): + POLICY_ID = uuid.uuid4().hex + DEVICE_NAME = 'fake_tap' + ACTION_CREATE = 'create' + ACTION_DELETE = 'delete' + RULE_MAX = 4000 + RULE_MIN = 1000 + RULE_BURST = 800 + RULE_DIRECTION_EGRESS = 'egress' def setUp(self): super(QosLinuxbridgeAgentDriverTestCase, self).setUp() - cfg.CONF.set_override("tbf_latency", TEST_LATENCY_VALUE, "QOS") self.qos_driver = qos_driver.QosLinuxbridgeAgentDriver() self.qos_driver.initialize() self.rule_bw_limit = self._create_bw_limit_rule_obj() self.rule_dscp_marking = self._create_dscp_marking_rule_obj() + self.get_egress_burst_value = mock.patch.object( + qos_extensions.QosLinuxAgentDriver, "_get_egress_burst_value") + self.mock_get_egress_burst_value = self.get_egress_burst_value.start() + self.mock_get_egress_burst_value.return_value = self.RULE_BURST + self.rule_bw_limit = self._create_bw_limit_rule_obj() + self.rule_min_bw = self._create_min_bw_rule_obj() self.port = self._create_fake_port(uuidutils.generate_uuid()) + self._ports = collections.defaultdict(dict) def _create_bw_limit_rule_obj(self): rule_obj = rule.QosBandwidthLimitRule() rule_obj.id = uuidutils.generate_uuid() - rule_obj.max_kbps = 2 - rule_obj.max_burst_kbps = 200 + rule_obj.max_kbps = self.RULE_MAX + rule_obj.max_burst_kbps = self.RULE_BURST + rule_obj.obj_reset_changes() + return rule_obj + + def _create_min_bw_rule_obj(self): + rule_obj = rule.QosMinimumBandwidthRule() + rule_obj.id = uuidutils.generate_uuid() + rule_obj.min_kbps = self.RULE_MAX + rule_obj.direction = self.RULE_DIRECTION_EGRESS rule_obj.obj_reset_changes() return rule_obj @@ -58,7 +87,9 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase): def _create_fake_port(self, policy_id): return {'qos_policy_id': policy_id, 'network_qos_policy_id': None, - 'device': 'fake_tap'} + 'device': self.DEVICE_NAME, + 'port_id': uuid.uuid4(), + 'vif_port': FakeVifPort()} def _dscp_mark_chain_name(self, device): return "qos-o%s" % device[3:] @@ -73,32 +104,49 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase): def _dscp_rule_tag(self, device): return "dscp-%s" % device - def test_create_bandwidth_limit(self): - with mock.patch.object( - tc_lib.TcCommand, "set_filters_bw_limit" - ) as set_bw_limit: - self.qos_driver.create_bandwidth_limit(self.port, - self.rule_bw_limit) - set_bw_limit.assert_called_once_with( - self.rule_bw_limit.max_kbps, self.rule_bw_limit.max_burst_kbps, - ) - - def test_update_bandwidth_limit(self): - with mock.patch.object( - tc_lib.TcCommand, "update_filters_bw_limit" - ) as update_bw_limit: + @mock.patch.object(tc_lib.TcCommand, "set_bw") + def test_update_bandwidth_limit(self, mock_set_bw): + with mock.patch.object(self.qos_driver, '_get_port_bw_parameters') as \ + mock_bw_param: + mock_bw_param.return_value = (self.rule_bw_limit.max_kbps, + self.rule_bw_limit.max_burst_kbps, + None) self.qos_driver.update_bandwidth_limit(self.port, self.rule_bw_limit) - update_bw_limit.assert_called_once_with( + mock_set_bw.assert_called_once_with( self.rule_bw_limit.max_kbps, self.rule_bw_limit.max_burst_kbps, - ) + None, self.RULE_DIRECTION_EGRESS) + mock_bw_param.assert_called_once_with(self.port['port_id']) - def test_delete_bandwidth_limit(self): - with mock.patch.object( - tc_lib.TcCommand, "delete_filters_bw_limit" - ) as delete_bw_limit: + @mock.patch.object(tc_lib.TcCommand, "delete_bw") + def test_delete_bandwidth_limit(self, mock_delete_bw): + with mock.patch.object(self.qos_driver, '_get_port_bw_parameters') as \ + mock_bw_param: + mock_bw_param.return_value = (None, None, None) self.qos_driver.delete_bandwidth_limit(self.port) - delete_bw_limit.assert_called_once_with() + mock_delete_bw.assert_called_once_with(self.RULE_DIRECTION_EGRESS) + mock_bw_param.assert_called_once_with(self.port['port_id']) + + @mock.patch.object(tc_lib.TcCommand, "set_bw") + def test_update_minimum_bandwidth(self, mock_set_bw): + with mock.patch.object(self.qos_driver, '_get_port_bw_parameters') as \ + mock_bw_param: + mock_bw_param.return_value = (None, None, + self.rule_min_bw.min_kbps) + self.qos_driver.update_minimum_bandwidth(self.port, + self.rule_min_bw) + mock_set_bw.assert_called_once_with(None, None, + self.rule_min_bw.min_kbps, self.RULE_DIRECTION_EGRESS) + mock_bw_param.assert_called_once_with(self.port['port_id']) + + @mock.patch.object(tc_lib.TcCommand, "delete_bw") + def test_delete_minimum_bandwidth(self, mock_delete_bw): + with mock.patch.object(self.qos_driver, '_get_port_bw_parameters') as \ + mock_bw_param: + mock_bw_param.return_value = (None, None, None) + self.qos_driver.delete_minimum_bandwidth(self.port) + mock_delete_bw.assert_called_once_with(self.RULE_DIRECTION_EGRESS) + mock_bw_param.assert_called_once_with(self.port['port_id']) def test_create_dscp_marking(self): expected_calls = [ @@ -195,3 +243,23 @@ class QosLinuxbridgeAgentDriverTestCase(base.BaseTestCase): ]) iptables_manager.ipv4['mangle'].remove_chain.assert_not_called() iptables_manager.ipv4['mangle'].remove_rule.assert_not_called() + + def test_get_port_bw_parameters_existing_port(self): + port_id = 'port_id_1' + self.qos_driver._port_rules[port_id][ + qos_consts.RULE_TYPE_MINIMUM_BANDWIDTH] = self.rule_min_bw + self.qos_driver._port_rules[port_id][ + qos_consts.RULE_TYPE_BANDWIDTH_LIMIT] = self.rule_bw_limit + max, burst, min = self.qos_driver._get_port_bw_parameters(port_id) + self.assertEqual(self.rule_bw_limit.max_kbps, max) + self.assertEqual(self.rule_bw_limit.max_burst_kbps, burst) + self.assertEqual(self.rule_min_bw.min_kbps, min) + self.mock_get_egress_burst_value.assert_called_once_with( + self.rule_bw_limit) + + def test_get_port_bw_parameters_not_existing_port(self): + port_id = 'port_id_1' + max, burst, min = self.qos_driver._get_port_bw_parameters(port_id) + self.assertIsNone(max) + self.assertIsNone(burst) + self.assertIsNone(min) diff --git a/releasenotes/notes/add-minimum-bandwidth-support-linuxbridge-9dc9d4458d8affef.yaml b/releasenotes/notes/add-minimum-bandwidth-support-linuxbridge-9dc9d4458d8affef.yaml new file mode 100644 index 00000000000..05da4fcfa7e --- /dev/null +++ b/releasenotes/notes/add-minimum-bandwidth-support-linuxbridge-9dc9d4458d8affef.yaml @@ -0,0 +1,6 @@ +--- +features: + - Linux Bridge now supports egress minimum bandwidth configuration. +deprecations: + - Configuration parameters ``kernel_hz`` and ``tbf_latency`` in ``QoS`` + section have been removed, because tc-tbf is no longer used.