Add support for Network QoS rule commands

Added following commands:
  - network qos rule create --type minimum-bandwidth
                                   dscp-marking
                                   limit-bandwidth
  - network qos rule delete
  - network qos rule list
  - network qos rule set
  - network qos rule show

Closes-Bug: 1609472
Depends-On: I2e8869750024a8ccbc7777b95fe8ef6e26ec0885
Depends-On: Ife549ff6499217ca65e2554be8ef86ea7866b2d8

Change-Id: Ib3e1951f0917f5f23c8d9e0a380d19da2b3af5f0
This commit is contained in:
Rodolfo Alonso Hernandez 2016-08-05 09:57:24 +01:00
parent 1957690754
commit 6b114cd98f
10 changed files with 1815 additions and 74 deletions

View File

@ -0,0 +1,165 @@
================
network qos rule
================
A **Network QoS rule** specifies a rule defined in a Network QoS policy; its
type is defined by the parameter 'type'. Can be assigned, within a Network QoS
policy, to a port or a network. Each Network QoS policy can contain several
rules, each of them
Network v2
network qos rule create
-----------------------
Create new Network QoS rule
.. program:: network qos rule create
.. code:: bash
os network qos rule create
--type <type>
[--max-kbps <max-kbps>]
[--max-burst-kbits <max-burst-kbits>]
[--dscp-marks <dscp-marks>]
[--min-kbps <min-kbps>]
[--ingress | --egress]
<qos-policy>
.. option:: --type <type>
QoS rule type (minimum-bandwidth, dscp-marking, bandwidth-limit)
.. option:: --max-kbps <min-kbps>
Maximum bandwidth in kbps
.. option:: --max-burst-kbits <max-burst-kbits>
Maximum burst in kilobits, 0 means automatic
.. option:: --dscp-mark <dscp-mark>
DSCP mark: value can be 0, even numbers from 8-56, excluding 42, 44, 50,
52, and 54
.. option:: --min-kbps <min-kbps>
Minimum guaranteed bandwidth in kbps
.. option:: --ingress
Ingress traffic direction from the project point of view
.. option:: --egress
Egress traffic direction from the project point of view
.. describe:: <qos-policy>
QoS policy that contains the rule (name or ID)
network qos rule delete
-----------------------
Delete Network QoS rule
.. program:: network qos rule delete
.. code:: bash
os network qos rule delete
<qos-policy>
<rule-id>
.. describe:: <qos-policy>
QoS policy that contains the rule (name or ID)
.. describe:: <rule-id>
Network QoS rule to delete (ID)
network qos rule list
---------------------
List Network QoS rules
.. program:: network qos rule list
.. code:: bash
os network qos rule list
<qos-policy>
.. describe:: <qos-policy>
QoS policy that contains the rule (name or ID)
network qos rule set
--------------------
Set Network QoS rule properties
.. program:: network qos rule set
.. code:: bash
os network qos rule set
[--max-kbps <max-kbps>]
[--max-burst-kbits <max-burst-kbits>]
[--dscp-marks <dscp-marks>]
[--min-kbps <min-kbps>]
[--ingress | --egress]
<qos-policy>
<rule-id>
.. option:: --max-kbps <min-kbps>
Maximum bandwidth in kbps
.. option:: --max-burst-kbits <max-burst-kbits>
Maximum burst in kilobits, 0 means automatic
.. option:: --dscp-mark <dscp-mark>
DSCP mark: value can be 0, even numbers from 8-56, excluding 42, 44, 50,
52, and 54
.. option:: --min-kbps <min-kbps>
Minimum guaranteed bandwidth in kbps
.. option:: --ingress
Ingress traffic direction from the project point of view
.. option:: --egress
Egress traffic direction from the project point of view
.. describe:: <qos-policy>
QoS policy that contains the rule (name or ID)
.. describe:: <rule-id>
Network QoS rule to delete (ID)
network qos rule show
---------------------
Display Network QoS rule details
.. program:: network qos rule show
.. code:: bash
os network qos rule show
<qos-policy>
<rule-id>
.. describe:: <qos-policy>
QoS policy that contains the rule (name or ID)
.. describe:: <rule-id>
Network QoS rule to delete (ID)

View File

@ -113,6 +113,7 @@ referring to both Compute and Volume quotas.
* ``network agent``: (**Network**) - A network agent is an agent that handles various tasks used to implement virtual networks
* ``network meter``: (**Network**) - allow traffic metering in a network
* ``network rbac``: (**Network**) - an RBAC policy for network resources
* ``network qos rule``: (**Network**) - a QoS rule for network resources
* ``network qos policy``: (**Network**) - a QoS policy for network resources
* ``network qos rule type``: (**Network**) - list of QoS available rule types
* ``network segment``: (**Network**) - a segment of a virtual network

View File

@ -0,0 +1,356 @@
# Copyright (c) 2016, Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import logging
import six
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from openstackclient.i18n import _
from openstackclient.network import sdk_utils
LOG = logging.getLogger(__name__)
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
RULE_TYPE_DSCP_MARKING = 'dscp-marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
REQUIRED_PARAMETERS = {
RULE_TYPE_MINIMUM_BANDWIDTH: ['min_kbps', 'direction'],
RULE_TYPE_DSCP_MARKING: ['dscp_mark'],
RULE_TYPE_BANDWIDTH_LIMIT: ['max_kbps', 'max_burst_kbps']}
DIRECTION_EGRESS = 'egress'
DIRECTION_INGRESS = 'ingress'
DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 46, 48, 56]
ACTION_CREATE = 'create'
ACTION_DELETE = 'delete'
ACTION_FIND = 'find'
ACTION_SET = 'update'
ACTION_SHOW = 'get'
def _get_columns(item):
column_map = {
'tenant_id': 'project_id',
}
return sdk_utils.get_osc_show_columns_for_sdk_resource(item, column_map)
def _check_type_parameters(attrs, type, is_create):
req_params = REQUIRED_PARAMETERS[type]
notreq_params = list(itertools.chain(
*[v for k, v in six.iteritems(REQUIRED_PARAMETERS) if k != type]))
if is_create and None in map(attrs.get, req_params):
msg = (_('"Create" rule command for type "%(rule_type)s" requires '
'arguments %(args)s') % {'rule_type': type,
'args': ", ".join(req_params)})
raise exceptions.CommandError(msg)
if set(six.iterkeys(attrs)) & set(notreq_params):
msg = (_('Rule type "%(rule_type)s" only requires arguments %(args)s')
% {'rule_type': type, 'args': ", ".join(req_params)})
raise exceptions.CommandError(msg)
def _get_attrs(network_client, parsed_args, is_create=False):
attrs = {}
qos = network_client.find_qos_policy(parsed_args.qos_policy,
ignore_missing=False)
attrs['qos_policy_id'] = qos.id
if not is_create:
attrs['id'] = parsed_args.id
rule_type = _find_rule_type(qos, parsed_args.id)
if not rule_type:
msg = (_('Rule ID %(rule_id)s not found') %
{'rule_id': parsed_args.id})
raise exceptions.CommandError(msg)
else:
if not parsed_args.type:
msg = _('"Create" rule command requires argument "type"')
raise exceptions.CommandError(msg)
rule_type = parsed_args.type
if parsed_args.max_kbps:
attrs['max_kbps'] = parsed_args.max_kbps
if parsed_args.max_burst_kbits:
# NOTE(ralonsoh): this parameter must be changed in SDK and then in
# Neutron API, from 'max_burst_kbps' to
# 'max_burst_kbits'
attrs['max_burst_kbps'] = parsed_args.max_burst_kbits
if parsed_args.dscp_mark:
attrs['dscp_mark'] = parsed_args.dscp_mark
if parsed_args.min_kbps:
attrs['min_kbps'] = parsed_args.min_kbps
if parsed_args.ingress:
attrs['direction'] = 'ingress'
if parsed_args.egress:
attrs['direction'] = 'egress'
_check_type_parameters(attrs, rule_type, is_create)
return attrs
def _get_item_properties(item, fields):
"""Return a tuple containing the item properties."""
row = []
for field in fields:
row.append(item.get(field, ''))
return tuple(row)
def _rule_action_call(client, action, rule_type):
rule_type = rule_type.replace('-', '_')
func_name = '%(action)s_qos_%(rule_type)s_rule' % {'action': action,
'rule_type': rule_type}
return getattr(client, func_name)
def _find_rule_type(qos, rule_id):
for rule in (r for r in qos.rules if r['id'] == rule_id):
return rule['type'].replace('_', '-')
return None
def _add_rule_arguments(parser):
parser.add_argument(
'--max-kbps',
dest='max_kbps',
metavar='<max-kbps>',
type=int,
help=_('Maximum bandwidth in kbps')
)
parser.add_argument(
'--max-burst-kbits',
dest='max_burst_kbits',
metavar='<max-burst-kbits>',
type=int,
help=_('Maximum burst in kilobits, 0 means automatic')
)
parser.add_argument(
'--dscp-mark',
dest='dscp_mark',
metavar='<dscp-mark>',
type=int,
help=_('DSCP mark: value can be 0, even numbers from 8-56, '
'excluding 42, 44, 50, 52, and 54')
)
parser.add_argument(
'--min-kbps',
dest='min_kbps',
metavar='<min-kbps>',
type=int,
help=_('Minimum guaranteed bandwidth in kbps')
)
direction_group = parser.add_mutually_exclusive_group()
direction_group.add_argument(
'--ingress',
action='store_true',
help=_("Ingress traffic direction from the project point of view")
)
direction_group.add_argument(
'--egress',
action='store_true',
help=_("Egress traffic direction from the project point of view")
)
class CreateNetworkQosRule(command.ShowOne):
_description = _("Create new Network QoS rule")
def get_parser(self, prog_name):
parser = super(CreateNetworkQosRule, self).get_parser(
prog_name)
parser.add_argument(
'qos_policy',
metavar='<qos-policy>',
help=_('QoS policy that contains the rule (name or ID)')
)
parser.add_argument(
'--type',
metavar='<type>',
choices=[RULE_TYPE_MINIMUM_BANDWIDTH,
RULE_TYPE_DSCP_MARKING,
RULE_TYPE_BANDWIDTH_LIMIT],
help=(_('QoS rule type (%s)') %
", ".join(six.iterkeys(REQUIRED_PARAMETERS)))
)
_add_rule_arguments(parser)
return parser
def take_action(self, parsed_args):
network_client = self.app.client_manager.network
attrs = _get_attrs(network_client, parsed_args, is_create=True)
try:
obj = _rule_action_call(
network_client, ACTION_CREATE, parsed_args.type)(
attrs.pop('qos_policy_id'), **attrs)
except Exception as e:
msg = (_('Failed to create Network QoS rule: %(e)s') % {'e': e})
raise exceptions.CommandError(msg)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return display_columns, data
class DeleteNetworkQosRule(command.Command):
_description = _("Delete Network QoS rule")
def get_parser(self, prog_name):
parser = super(DeleteNetworkQosRule, self).get_parser(prog_name)
parser.add_argument(
'qos_policy',
metavar='<qos-policy>',
help=_('QoS policy that contains the rule (name or ID)')
)
parser.add_argument(
'id',
metavar='<rule-id>',
help=_('Network QoS rule to delete (ID)')
)
return parser
def take_action(self, parsed_args):
network_client = self.app.client_manager.network
rule_id = parsed_args.id
try:
qos = network_client.find_qos_policy(parsed_args.qos_policy,
ignore_missing=False)
rule_type = _find_rule_type(qos, rule_id)
if not rule_type:
raise Exception('Rule %s not found' % rule_id)
_rule_action_call(network_client, ACTION_DELETE, rule_type)(
rule_id, qos.id)
except Exception as e:
msg = (_('Failed to delete Network QoS rule ID "%(rule)s": %(e)s')
% {'rule': rule_id, 'e': e})
raise exceptions.CommandError(msg)
class ListNetworkQosRule(command.Lister):
_description = _("List Network QoS rules")
def get_parser(self, prog_name):
parser = super(ListNetworkQosRule, self).get_parser(prog_name)
parser.add_argument(
'qos_policy',
metavar='<qos-policy>',
help=_('QoS policy that contains the rule (name or ID)')
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
columns = (
'id',
'qos_policy_id',
'type',
'max_kbps',
'max_burst_kbps',
'min_kbps',
'dscp_mark',
'direction',
)
column_headers = (
'ID',
'QoS Policy ID',
'Type',
'Max Kbps',
'Max Burst Kbits',
'Min Kbps',
'DSCP mark',
'Direction',
)
qos = client.find_qos_policy(parsed_args.qos_policy,
ignore_missing=False)
data = qos.rules
return (column_headers,
(_get_item_properties(s, columns) for s in data))
class SetNetworkQosRule(command.Command):
_description = _("Set Network QoS rule properties")
def get_parser(self, prog_name):
parser = super(SetNetworkQosRule, self).get_parser(prog_name)
parser.add_argument(
'qos_policy',
metavar='<qos-policy>',
help=_('QoS policy that contains the rule (name or ID)')
)
parser.add_argument(
'id',
metavar='<rule-id>',
help=_('Network QoS rule to delete (ID)')
)
_add_rule_arguments(parser)
return parser
def take_action(self, parsed_args):
network_client = self.app.client_manager.network
try:
qos = network_client.find_qos_policy(parsed_args.qos_policy,
ignore_missing=False)
rule_type = _find_rule_type(qos, parsed_args.id)
if not rule_type:
raise Exception('Rule not found')
attrs = _get_attrs(network_client, parsed_args)
qos_id = attrs.pop('qos_policy_id')
qos_rule = _rule_action_call(network_client, ACTION_FIND,
rule_type)(attrs.pop('id'), qos_id)
_rule_action_call(network_client, ACTION_SET, rule_type)(
qos_rule, qos_id, **attrs)
except Exception as e:
msg = (_('Failed to set Network QoS rule ID "%(rule)s": %(e)s') %
{'rule': parsed_args.id, 'e': e})
raise exceptions.CommandError(msg)
class ShowNetworkQosRule(command.ShowOne):
_description = _("Display Network QoS rule details")
def get_parser(self, prog_name):
parser = super(ShowNetworkQosRule, self).get_parser(prog_name)
parser.add_argument(
'qos_policy',
metavar='<qos-policy>',
help=_('QoS policy that contains the rule (name or ID)')
)
parser.add_argument(
'id',
metavar='<rule-id>',
help=_('Network QoS rule to delete (ID)')
)
return parser
def take_action(self, parsed_args):
network_client = self.app.client_manager.network
rule_id = parsed_args.id
try:
qos = network_client.find_qos_policy(parsed_args.qos_policy,
ignore_missing=False)
rule_type = _find_rule_type(qos, rule_id)
if not rule_type:
raise Exception('Rule not found')
obj = _rule_action_call(network_client, ACTION_SHOW, rule_type)(
rule_id, qos.id)
except Exception as e:
msg = (_('Failed to set Network QoS rule ID "%(rule)s": %(e)s') %
{'rule': rule_id, 'e': e})
raise exceptions.CommandError(msg)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(obj, columns)
return display_columns, data

View File

@ -77,6 +77,11 @@ class TestCase(testtools.TestCase):
if expected not in actual:
raise Exception(expected + ' not in ' + actual)
@classmethod
def assertsOutputNotNone(cls, observed):
if observed is None:
raise Exception('No output observed')
def assert_table_structure(self, items, field_names):
"""Verify that all items have keys listed in field_names."""
for item in items:

View File

@ -0,0 +1,181 @@
# Copyright (c) 2016, Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from openstackclient.tests.functional import base
class NetworkQosRuleTestsMinimumBandwidth(base.TestCase):
"""Functional tests for QoS minimum bandwidth rule."""
RULE_ID = None
QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
MIN_KBPS = 2800
MIN_KBPS_MODIFIED = 7500
DIRECTION = '--egress'
HEADERS = ['ID']
FIELDS = ['id']
TYPE = 'minimum-bandwidth'
@classmethod
def setUpClass(cls):
opts = cls.get_opts(cls.FIELDS)
cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
cls.RULE_ID = cls.openstack('network qos rule create --type ' +
cls.TYPE + ' --min-kbps ' +
str(cls.MIN_KBPS) + ' ' + cls.DIRECTION +
' ' + cls.QOS_POLICY_NAME + opts)
cls.assertsOutputNotNone(cls.RULE_ID)
@classmethod
def tearDownClass(cls):
raw_output = cls.openstack('network qos rule delete ' +
cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
cls.assertOutput('', raw_output)
def test_qos_policy_list(self):
opts = self.get_opts(self.HEADERS)
raw_output = self.openstack('network qos rule list '
+ self.QOS_POLICY_NAME + opts)
self.assertIn(self.RULE_ID, raw_output)
def test_qos_policy_show(self):
opts = self.get_opts(self.FIELDS)
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(self.RULE_ID, raw_output)
def test_qos_policy_set(self):
self.openstack('network qos rule set --min-kbps ' +
str(self.MIN_KBPS_MODIFIED) + ' ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
opts = self.get_opts(['min_kbps'])
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
class NetworkQosRuleTestsDSCPMarking(base.TestCase):
"""Functional tests for QoS DSCP marking rule."""
RULE_ID = None
QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
DSCP_MARK = 8
DSCP_MARK_MODIFIED = 32
HEADERS = ['ID']
FIELDS = ['id']
TYPE = 'dscp-marking'
@classmethod
def setUpClass(cls):
opts = cls.get_opts(cls.FIELDS)
cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
cls.RULE_ID = cls.openstack('network qos rule create --type ' +
cls.TYPE + ' --dscp-mark ' +
str(cls.DSCP_MARK) + ' ' +
cls.QOS_POLICY_NAME + opts)
cls.assertsOutputNotNone(cls.RULE_ID)
@classmethod
def tearDownClass(cls):
raw_output = cls.openstack('network qos rule delete ' +
cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
cls.assertOutput('', raw_output)
def test_qos_policy_list(self):
opts = self.get_opts(self.HEADERS)
raw_output = self.openstack('network qos rule list '
+ self.QOS_POLICY_NAME + opts)
self.assertIn(self.RULE_ID, raw_output)
def test_qos_policy_show(self):
opts = self.get_opts(self.FIELDS)
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(self.RULE_ID, raw_output)
def test_qos_policy_set(self):
self.openstack('network qos rule set --dscp-mark ' +
str(self.DSCP_MARK_MODIFIED) + ' ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
opts = self.get_opts(['dscp_mark'])
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
class NetworkQosRuleTestsBandwidthLimit(base.TestCase):
"""Functional tests for QoS bandwidth limit rule."""
RULE_ID = None
QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
MAX_KBPS = 10000
MAX_KBPS_MODIFIED = 15000
MAX_BURST_KBITS = 1400
MAX_BURST_KBITS_MODIFIED = 1800
HEADERS = ['ID']
FIELDS = ['id']
TYPE = 'bandwidth-limit'
@classmethod
def setUpClass(cls):
opts = cls.get_opts(cls.FIELDS)
cls.openstack('network qos policy create ' + cls.QOS_POLICY_NAME)
cls.RULE_ID = cls.openstack('network qos rule create --type ' +
cls.TYPE + ' --max-kbps ' +
str(cls.MAX_KBPS) + ' --max-burst-kbits ' +
str(cls.MAX_BURST_KBITS) + ' ' +
cls.QOS_POLICY_NAME + opts)
cls.assertsOutputNotNone(cls.RULE_ID)
@classmethod
def tearDownClass(cls):
raw_output = cls.openstack('network qos rule delete ' +
cls.QOS_POLICY_NAME + ' ' + cls.RULE_ID)
cls.openstack('network qos policy delete ' + cls.QOS_POLICY_NAME)
cls.assertOutput('', raw_output)
def test_qos_policy_list(self):
opts = self.get_opts(self.HEADERS)
raw_output = self.openstack('network qos rule list '
+ self.QOS_POLICY_NAME + opts)
self.assertIn(self.RULE_ID, raw_output)
def test_qos_policy_show(self):
opts = self.get_opts(self.FIELDS)
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(self.RULE_ID, raw_output)
def test_qos_policy_set(self):
self.openstack('network qos rule set --max-kbps ' +
str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
opts = self.get_opts(['max_kbps'])
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(str(self.MAX_KBPS_MODIFIED) + "\n", raw_output)
opts = self.get_opts(['max_burst_kbps'])
raw_output = self.openstack('network qos rule show ' +
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
opts)
self.assertEqual(str(self.MAX_BURST_KBITS_MODIFIED) + "\n", raw_output)

View File

@ -216,6 +216,12 @@ class FakeResource(object):
def info(self):
return self._info
def __getitem__(self, item):
return self._info.get(item)
def get(self, item, default=None):
return self._info.get(item, default)
class FakeResponse(requests.Response):

View File

@ -14,6 +14,8 @@
import argparse
import copy
import mock
from random import choice
from random import randint
import uuid
from openstackclient.tests.unit import fakes
@ -37,6 +39,15 @@ QUOTA = {
"l7policy": 5,
}
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
RULE_TYPE_DSCP_MARKING = 'dscp-marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
VALID_QOS_RULES = [RULE_TYPE_BANDWIDTH_LIMIT,
RULE_TYPE_DSCP_MARKING,
RULE_TYPE_MINIMUM_BANDWIDTH]
VALID_DSCP_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 46, 48, 56]
class FakeNetworkV2Client(object):
@ -662,77 +673,6 @@ class FakeNetworkRBAC(object):
return mock.Mock(side_effect=rbac_policies)
class FakeNetworkQosBandwidthLimitRule(object):
"""Fake one or more QoS bandwidth limit rules."""
@staticmethod
def create_one_qos_bandwidth_limit_rule(attrs=None):
"""Create a fake QoS bandwidth limit rule.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object with id, qos_policy_id, max_kbps and
max_burst_kbps attributes.
"""
attrs = attrs or {}
# Set default attributes.
qos_bandwidth_limit_rule_attrs = {
'id': 'qos-bandwidth-limit-rule-id-' + uuid.uuid4().hex,
'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
'max_kbps': 1500,
'max_burst_kbps': 1200,
}
# Overwrite default attributes.
qos_bandwidth_limit_rule_attrs.update(attrs)
qos_bandwidth_limit_rule = fakes.FakeResource(
info=copy.deepcopy(qos_bandwidth_limit_rule_attrs),
loaded=True)
return qos_bandwidth_limit_rule
@staticmethod
def create_qos_bandwidth_limit_rules(attrs=None, count=2):
"""Create multiple fake QoS bandwidth limit rules.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of QoS bandwidth limit rules to fake
:return:
A list of FakeResource objects faking the QoS bandwidth limit rules
"""
qos_policies = []
for i in range(0, count):
qos_policies.append(FakeNetworkQosBandwidthLimitRule.
create_one_qos_bandwidth_limit_rule(attrs))
return qos_policies
@staticmethod
def get_qos_bandwidth_limit_rules(qos_rules=None, count=2):
"""Get a list of faked QoS bandwidth limit rules.
If QoS bandwidth limit rules list is provided, then initialize the
Mock object with the list. Otherwise create one.
:param List address scopes:
A list of FakeResource objects faking QoS bandwidth limit rules
:param int count:
The number of QoS bandwidth limit rules to fake
:return:
An iterable Mock object with side_effect set to a list of faked
qos bandwidth limit rules
"""
if qos_rules is None:
qos_rules = (FakeNetworkQosBandwidthLimitRule.
create_qos_bandwidth_limit_rules(count))
return mock.Mock(side_effect=qos_rules)
class FakeNetworkQosPolicy(object):
"""Fake one or more QoS policies."""
@ -748,9 +688,7 @@ class FakeNetworkQosPolicy(object):
attrs = attrs or {}
qos_id = attrs.get('id') or 'qos-policy-id-' + uuid.uuid4().hex
rule_attrs = {'qos_policy_id': qos_id}
rules = [
FakeNetworkQosBandwidthLimitRule.
create_one_qos_bandwidth_limit_rule(rule_attrs)]
rules = [FakeNetworkQosRule.create_one_qos_rule(rule_attrs)]
# Set default attributes.
qos_policy_attrs = {
@ -813,6 +751,84 @@ class FakeNetworkQosPolicy(object):
return mock.Mock(side_effect=qos_policies)
class FakeNetworkQosRule(object):
"""Fake one or more Network QoS rules."""
@staticmethod
def create_one_qos_rule(attrs=None):
"""Create a fake Network QoS rule.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeResource object with name, id, etc.
"""
attrs = attrs or {}
# Set default attributes.
type = attrs.get('type') or choice(VALID_QOS_RULES)
qos_rule_attrs = {
'id': 'qos-rule-id-' + uuid.uuid4().hex,
'qos_policy_id': 'qos-policy-id-' + uuid.uuid4().hex,
'tenant_id': 'project-id-' + uuid.uuid4().hex,
'type': type,
}
if type == RULE_TYPE_BANDWIDTH_LIMIT:
qos_rule_attrs['max_kbps'] = randint(1, 10000)
qos_rule_attrs['max_burst_kbits'] = randint(1, 10000)
elif type == RULE_TYPE_DSCP_MARKING:
qos_rule_attrs['dscp_mark'] = choice(VALID_DSCP_MARKS)
elif type == RULE_TYPE_MINIMUM_BANDWIDTH:
qos_rule_attrs['min_kbps'] = randint(1, 10000)
qos_rule_attrs['direction'] = 'egress'
# Overwrite default attributes.
qos_rule_attrs.update(attrs)
qos_rule = fakes.FakeResource(info=copy.deepcopy(qos_rule_attrs),
loaded=True)
# Set attributes with special mapping in OpenStack SDK.
qos_rule.project_id = qos_rule['tenant_id']
return qos_rule
@staticmethod
def create_qos_rules(attrs=None, count=2):
"""Create multiple fake Network QoS rules.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of Network QoS rule to fake
:return:
A list of FakeResource objects faking the Network QoS rules
"""
qos_rules = []
for i in range(0, count):
qos_rules.append(FakeNetworkQosRule.create_one_qos_rule(attrs))
return qos_rules
@staticmethod
def get_qos_rules(qos_rules=None, count=2):
"""Get a list of faked Network QoS rules.
If Network QoS rules list is provided, then initialize the Mock
object with the list. Otherwise create one.
:param List address scopes:
A list of FakeResource objects faking Network QoS rules
:param int count:
The number of QoS minimum bandwidth rules to fake
:return:
An iterable Mock object with side_effect set to a list of faked
qos minimum bandwidth rules
"""
if qos_rules is None:
qos_rules = (FakeNetworkQosRule.create_qos_rules(count))
return mock.Mock(side_effect=qos_rules)
class FakeNetworkQosRuleType(object):
"""Fake one or more Network QoS rule types."""

View File

@ -0,0 +1,997 @@
# Copyright (c) 2016, Intel Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from osc_lib import exceptions
from openstackclient.network.v2 import network_qos_rule
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
from openstackclient.tests.unit import utils as tests_utils
RULE_TYPE_BANDWIDTH_LIMIT = 'bandwidth-limit'
RULE_TYPE_DSCP_MARKING = 'dscp-marking'
RULE_TYPE_MINIMUM_BANDWIDTH = 'minimum-bandwidth'
DSCP_VALID_MARKS = [0, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
34, 36, 38, 40, 46, 48, 56]
class TestNetworkQosRule(network_fakes.TestNetworkV2):
def setUp(self):
super(TestNetworkQosRule, self).setUp()
# Get a shortcut to the network client
self.network = self.app.client_manager.network
self.qos_policy = (network_fakes.FakeNetworkQosPolicy.
create_one_qos_policy())
self.network.find_qos_policy = mock.Mock(return_value=self.qos_policy)
class TestCreateNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
def test_check_type_parameters(self):
pass
def setUp(self):
super(TestCreateNetworkQosRuleMinimumBandwidth, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.columns = (
'direction',
'id',
'min_kbps',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.direction,
self.new_rule.id,
self.new_rule.min_kbps,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.create_qos_minimum_bandwidth_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
self.namespace)
def test_create_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
'--type', RULE_TYPE_MINIMUM_BANDWIDTH,
'--min-kbps', str(self.new_rule.min_kbps),
'--egress',
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_MINIMUM_BANDWIDTH),
('min_kbps', self.new_rule.min_kbps),
('egress', True),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.network.create_qos_minimum_bandwidth_rule.assert_called_once_with(
self.qos_policy.id,
**{'min_kbps': self.new_rule.min_kbps,
'direction': self.new_rule.direction}
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_wrong_options(self):
arglist = [
'--type', RULE_TYPE_MINIMUM_BANDWIDTH,
'--max-kbps', '10000',
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_MINIMUM_BANDWIDTH),
('max_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('"Create" rule command for type "minimum-bandwidth" '
'requires arguments min_kbps, direction')
self.assertEqual(msg, str(e))
class TestCreateNetworkQosRuleDSCPMarking(TestNetworkQosRule):
def test_check_type_parameters(self):
pass
def setUp(self):
super(TestCreateNetworkQosRuleDSCPMarking, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_DSCP_MARKING}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.columns = (
'dscp_mark',
'id',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.dscp_mark,
self.new_rule.id,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.create_qos_dscp_marking_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
self.namespace)
def test_create_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
'--type', RULE_TYPE_DSCP_MARKING,
'--dscp-mark', str(self.new_rule.dscp_mark),
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_DSCP_MARKING),
('dscp_mark', self.new_rule.dscp_mark),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.create_qos_dscp_marking_rule.assert_called_once_with(
self.qos_policy.id,
**{'dscp_mark': self.new_rule.dscp_mark}
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_wrong_options(self):
arglist = [
'--type', RULE_TYPE_DSCP_MARKING,
'--max-kbps', '10000',
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_DSCP_MARKING),
('max_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('"Create" rule command for type "dscp-marking" '
'requires arguments dscp_mark')
self.assertEqual(msg, str(e))
class TestCreateNetworkQosRuleBandwidtLimit(TestNetworkQosRule):
def test_check_type_parameters(self):
pass
def setUp(self):
super(TestCreateNetworkQosRuleBandwidtLimit, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_BANDWIDTH_LIMIT}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.columns = (
'id',
'max_burst_kbits',
'max_kbps',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.id,
self.new_rule.max_burst_kbits,
self.new_rule.max_kbps,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.create_qos_bandwidth_limit_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.CreateNetworkQosRule(self.app,
self.namespace)
def test_create_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
'--type', RULE_TYPE_BANDWIDTH_LIMIT,
'--max-kbps', str(self.new_rule.max_kbps),
'--max-burst-kbits', str(self.new_rule.max_burst_kbits),
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_BANDWIDTH_LIMIT),
('max_kbps', self.new_rule.max_kbps),
('max_burst_kbits', self.new_rule.max_burst_kbits),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.network.create_qos_bandwidth_limit_rule.assert_called_once_with(
self.qos_policy.id,
**{'max_kbps': self.new_rule.max_kbps,
'max_burst_kbps': self.new_rule.max_burst_kbits}
)
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_wrong_options(self):
arglist = [
'--type', RULE_TYPE_BANDWIDTH_LIMIT,
'--min-kbps', '10000',
self.new_rule.qos_policy_id,
]
verifylist = [
('type', RULE_TYPE_BANDWIDTH_LIMIT),
('min_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('"Create" rule command for type "bandwidth-limit" '
'requires arguments max_kbps, max_burst_kbps')
self.assertEqual(msg, str(e))
class TestDeleteNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
def setUp(self):
super(TestDeleteNetworkQosRuleMinimumBandwidth, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.network.delete_qos_minimum_bandwidth_rule = mock.Mock(
return_value=None)
self.network.find_qos_minimum_bandwidth_rule = (
network_fakes.FakeNetworkQosRule.get_qos_rules(
qos_rules=self.new_rule)
)
# Get the command object to test
self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
self.namespace)
def test_qos_policy_delete(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.find_qos_policy.assert_called_once_with(
self.qos_policy.id, ignore_missing=False)
self.network.delete_qos_minimum_bandwidth_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertIsNone(result)
def test_qos_policy_delete_error(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
self.network.delete_qos_minimum_bandwidth_rule.side_effect = \
Exception('Error message')
try:
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
{'rule': self.new_rule.id, 'e': 'Error message'})
self.assertEqual(msg, str(e))
class TestDeleteNetworkQosRuleDSCPMarking(TestNetworkQosRule):
def setUp(self):
super(TestDeleteNetworkQosRuleDSCPMarking, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_DSCP_MARKING}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.network.delete_qos_dscp_marking_rule = mock.Mock(
return_value=None)
self.network.find_qos_dscp_marking_rule = (
network_fakes.FakeNetworkQosRule.get_qos_rules(
qos_rules=self.new_rule)
)
# Get the command object to test
self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
self.namespace)
def test_qos_policy_delete(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.find_qos_policy.assert_called_once_with(
self.qos_policy.id, ignore_missing=False)
self.network.delete_qos_dscp_marking_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertIsNone(result)
def test_qos_policy_delete_error(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
self.network.delete_qos_dscp_marking_rule.side_effect = \
Exception('Error message')
try:
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
{'rule': self.new_rule.id, 'e': 'Error message'})
self.assertEqual(msg, str(e))
class TestDeleteNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
def setUp(self):
super(TestDeleteNetworkQosRuleBandwidthLimit, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_BANDWIDTH_LIMIT}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.network.delete_qos_bandwidth_limit_rule = mock.Mock(
return_value=None)
self.network.find_qos_bandwidth_limit_rule = (
network_fakes.FakeNetworkQosRule.get_qos_rules(
qos_rules=self.new_rule)
)
# Get the command object to test
self.cmd = network_qos_rule.DeleteNetworkQosRule(self.app,
self.namespace)
def test_qos_policy_delete(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.find_qos_policy.assert_called_once_with(
self.qos_policy.id, ignore_missing=False)
self.network.delete_qos_bandwidth_limit_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertIsNone(result)
def test_qos_policy_delete_error(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
self.network.delete_qos_bandwidth_limit_rule.side_effect = \
Exception('Error message')
try:
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to delete Network QoS rule ID "%(rule)s": %(e)s' %
{'rule': self.new_rule.id, 'e': 'Error message'})
self.assertEqual(msg, str(e))
class TestSetNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
def setUp(self):
super(TestSetNetworkQosRuleMinimumBandwidth, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs=attrs)
self.qos_policy.rules = [self.new_rule]
self.network.update_qos_minimum_bandwidth_rule = mock.Mock(
return_value=None)
self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
return_value=self.new_rule)
self.network.find_qos_policy = mock.Mock(
return_value=self.qos_policy)
# Get the command object to test
self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
self.namespace))
def test_set_nothing(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
self.new_rule, self.qos_policy.id)
self.assertIsNone(result)
def test_set_min_kbps(self):
arglist = [
'--min-kbps', str(self.new_rule.min_kbps),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('min_kbps', self.new_rule.min_kbps),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'min_kbps': self.new_rule.min_kbps,
}
self.network.update_qos_minimum_bandwidth_rule.assert_called_with(
self.new_rule, self.qos_policy.id, **attrs)
self.assertIsNone(result)
def test_set_wrong_options(self):
arglist = [
'--max-kbps', str(10000),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('max_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
'"minimum-bandwidth" only requires arguments min_kbps, '
'direction' % {'rule': self.new_rule.id})
self.assertEqual(msg, str(e))
class TestSetNetworkQosRuleDSCPMarking(TestNetworkQosRule):
def setUp(self):
super(TestSetNetworkQosRuleDSCPMarking, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_DSCP_MARKING}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs=attrs)
self.qos_policy.rules = [self.new_rule]
self.network.update_qos_dscp_marking_rule = mock.Mock(
return_value=None)
self.network.find_qos_dscp_marking_rule = mock.Mock(
return_value=self.new_rule)
self.network.find_qos_policy = mock.Mock(
return_value=self.qos_policy)
# Get the command object to test
self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
self.namespace))
def test_set_nothing(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.update_qos_dscp_marking_rule.assert_called_with(
self.new_rule, self.qos_policy.id)
self.assertIsNone(result)
def test_set_dscp_mark(self):
arglist = [
'--dscp-mark', str(self.new_rule.dscp_mark),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('dscp_mark', self.new_rule.dscp_mark),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'dscp_mark': self.new_rule.dscp_mark,
}
self.network.update_qos_dscp_marking_rule.assert_called_with(
self.new_rule, self.qos_policy.id, **attrs)
self.assertIsNone(result)
def test_set_wrong_options(self):
arglist = [
'--max-kbps', str(10000),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('max_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
'"dscp-marking" only requires arguments dscp_mark' %
{'rule': self.new_rule.id})
self.assertEqual(msg, str(e))
class TestSetNetworkQosRuleBandwidthLimit(TestNetworkQosRule):
def setUp(self):
super(TestSetNetworkQosRuleBandwidthLimit, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_BANDWIDTH_LIMIT}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs=attrs)
self.qos_policy.rules = [self.new_rule]
self.network.update_qos_bandwidth_limit_rule = mock.Mock(
return_value=None)
self.network.find_qos_bandwidth_limit_rule = mock.Mock(
return_value=self.new_rule)
self.network.find_qos_policy = mock.Mock(
return_value=self.qos_policy)
# Get the command object to test
self.cmd = (network_qos_rule.SetNetworkQosRule(self.app,
self.namespace))
def test_set_nothing(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.network.update_qos_bandwidth_limit_rule.assert_called_with(
self.new_rule, self.qos_policy.id)
self.assertIsNone(result)
def test_set_max_kbps(self):
arglist = [
'--max-kbps', str(self.new_rule.max_kbps),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('max_kbps', self.new_rule.max_kbps),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'max_kbps': self.new_rule.max_kbps,
}
self.network.update_qos_bandwidth_limit_rule.assert_called_with(
self.new_rule, self.qos_policy.id, **attrs)
self.assertIsNone(result)
def test_set_max_burst_kbits(self):
arglist = [
'--max-burst-kbits', str(self.new_rule.max_burst_kbits),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('max_burst_kbits', self.new_rule.max_burst_kbits),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'max_burst_kbps': self.new_rule.max_burst_kbits,
}
self.network.update_qos_bandwidth_limit_rule.assert_called_with(
self.new_rule, self.qos_policy.id, **attrs)
self.assertIsNone(result)
def test_set_wrong_options(self):
arglist = [
'--min-kbps', str(10000),
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('min_kbps', 10000),
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
except exceptions.CommandError as e:
msg = ('Failed to set Network QoS rule ID "%(rule)s": Rule type '
'"bandwidth-limit" only requires arguments max_kbps, '
'max_burst_kbps' % {'rule': self.new_rule.id})
self.assertEqual(msg, str(e))
class TestListNetworkQosRule(TestNetworkQosRule):
def setUp(self):
super(TestListNetworkQosRule, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule_min_bw = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
attrs['type'] = RULE_TYPE_DSCP_MARKING
self.new_rule_dscp_mark = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
attrs['type'] = RULE_TYPE_BANDWIDTH_LIMIT
self.new_rule_max_bw = (network_fakes.FakeNetworkQosRule.
create_one_qos_rule(attrs=attrs))
self.qos_policy.rules = [self.new_rule_min_bw,
self.new_rule_dscp_mark,
self.new_rule_max_bw]
self.network.find_qos_minimum_bandwidth_rule = mock.Mock(
return_value=self.new_rule_min_bw)
self.network.find_qos_dscp_marking_rule = mock.Mock(
return_value=self.new_rule_dscp_mark)
self.network.find_qos_bandwidth_limit_rule = mock.Mock(
return_value=self.new_rule_max_bw)
self.columns = (
'ID',
'QoS Policy ID',
'Type',
'Max Kbps',
'Max Burst Kbits',
'Min Kbps',
'DSCP mark',
'Direction',
)
self.data = []
for index in range(len(self.qos_policy.rules)):
self.data.append((
self.qos_policy.rules[index].id,
self.qos_policy.rules[index].qos_policy_id,
self.qos_policy.rules[index].type,
getattr(self.qos_policy.rules[index], 'max_kbps', ''),
getattr(self.qos_policy.rules[index], 'max_burst_kbps', ''),
getattr(self.qos_policy.rules[index], 'min_kbps', ''),
getattr(self.qos_policy.rules[index], 'dscp_mark', ''),
getattr(self.qos_policy.rules[index], 'direction', ''),
))
# Get the command object to test
self.cmd = network_qos_rule.ListNetworkQosRule(self.app,
self.namespace)
def test_qos_rule_list(self):
arglist = [
self.qos_policy.id
]
verifylist = [
('qos_policy', self.qos_policy.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.find_qos_policy.assert_called_once_with(
self.qos_policy.id, ignore_missing=False)
self.assertEqual(self.columns, columns)
list_data = list(data)
self.assertEqual(len(self.data), len(list_data))
for index in range(len(list_data)):
self.assertEqual(self.data[index], list_data[index])
class TestShowNetworkQosRuleMinimumBandwidth(TestNetworkQosRule):
def setUp(self):
super(TestShowNetworkQosRuleMinimumBandwidth, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_MINIMUM_BANDWIDTH}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.columns = (
'direction',
'id',
'min_kbps',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.direction,
self.new_rule.id,
self.new_rule.min_kbps,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.get_qos_minimum_bandwidth_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.get_qos_minimum_bandwidth_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.data), list(data))
class TestShowNetworkQosDSCPMarking(TestNetworkQosRule):
def setUp(self):
super(TestShowNetworkQosDSCPMarking, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_DSCP_MARKING}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.columns = (
'dscp_mark',
'id',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.dscp_mark,
self.new_rule.id,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.get_qos_dscp_marking_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.get_qos_dscp_marking_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.data), list(data))
class TestShowNetworkQosBandwidthLimit(TestNetworkQosRule):
def setUp(self):
super(TestShowNetworkQosBandwidthLimit, self).setUp()
attrs = {'qos_policy_id': self.qos_policy.id,
'type': RULE_TYPE_BANDWIDTH_LIMIT}
self.new_rule = network_fakes.FakeNetworkQosRule.create_one_qos_rule(
attrs)
self.qos_policy.rules = [self.new_rule]
self.columns = (
'id',
'max_burst_kbits',
'max_kbps',
'project_id',
'qos_policy_id',
'type'
)
self.data = (
self.new_rule.id,
self.new_rule.max_burst_kbits,
self.new_rule.max_kbps,
self.new_rule.project_id,
self.new_rule.qos_policy_id,
self.new_rule.type,
)
self.network.get_qos_bandwidth_limit_rule = mock.Mock(
return_value=self.new_rule)
# Get the command object to test
self.cmd = network_qos_rule.ShowNetworkQosRule(self.app,
self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
# Missing required args should bail here
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self.new_rule.qos_policy_id,
self.new_rule.id,
]
verifylist = [
('qos_policy', self.new_rule.qos_policy_id),
('id', self.new_rule.id),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.network.get_qos_bandwidth_limit_rule.assert_called_once_with(
self.new_rule.id, self.qos_policy.id)
self.assertEqual(self.columns, columns)
self.assertEqual(list(self.data), list(data))

View File

@ -0,0 +1,8 @@
---
features:
- |
Add support for Network QoS rule commands:
``network qos rule create``, ``network qos rule delete``,
``network qos rule list``, ``network qos rule show`` and
``network qos rule set``
[Bug `1609472 <https://bugs.launchpad.net/bugs/1609472>`_]

View File

@ -377,6 +377,12 @@ openstack.network.v2 =
network_qos_policy_set = openstackclient.network.v2.network_qos_policy:SetNetworkQosPolicy
network_qos_policy_show = openstackclient.network.v2.network_qos_policy:ShowNetworkQosPolicy
network_qos_rule_create = openstackclient.network.v2.network_qos_rule:CreateNetworkQosRule
network_qos_rule_delete = openstackclient.network.v2.network_qos_rule:DeleteNetworkQosRule
network_qos_rule_list = openstackclient.network.v2.network_qos_rule:ListNetworkQosRule
network_qos_rule_set = openstackclient.network.v2.network_qos_rule:SetNetworkQosRule
network_qos_rule_show = openstackclient.network.v2.network_qos_rule:ShowNetworkQosRule
network_qos_rule_type_list = openstackclient.network.v2.network_qos_rule_type:ListNetworkQosRuleType
network_rbac_create = openstackclient.network.v2.network_rbac:CreateNetworkRBAC