bp/api-filters

This changeset implements filters for core Quantum API and provides unit tests

Change-Id: I8247b3587c2cc8e53785781a45d1e457980261d2
This commit is contained in:
Salvatore Orlando 2012-01-11 17:26:47 +00:00
parent 7bb25c5112
commit b12f8559e3
11 changed files with 564 additions and 67 deletions

View File

@ -9,4 +9,4 @@ ChangeLog
*.pid *.pid
*.log *.log
quantum/vcsversion.py quantum/vcsversion.py
.ropeproject .ropeproject

View File

@ -20,6 +20,7 @@ from webob import exc
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.api import faults from quantum.api import faults
from quantum.api.views import networks as networks_view from quantum.api.views import networks as networks_view
from quantum.api.views import filters
from quantum.common import exceptions as exception from quantum.common import exceptions as exception
LOG = logging.getLogger('quantum.api.networks') LOG = logging.getLogger('quantum.api.networks')
@ -53,8 +54,10 @@ class Controller(common.QuantumController):
# concerning logical ports as well. # concerning logical ports as well.
network = self._plugin.get_network_details( network = self._plugin.get_network_details(
tenant_id, network_id) tenant_id, network_id)
port_list = self._plugin.get_all_ports( # Doing this in the API is inefficient
tenant_id, network_id) # TODO(salvatore-orlando): This should be fixed with Bug #834012
# Don't pass filter options
port_list = self._plugin.get_all_ports(tenant_id, network_id)
ports_data = [self._plugin.get_port_details( ports_data = [self._plugin.get_port_details(
tenant_id, network_id, port['port-id']) tenant_id, network_id, port['port-id'])
for port in port_list] for port in port_list]
@ -64,8 +67,27 @@ class Controller(common.QuantumController):
return dict(network=result) return dict(network=result)
def _items(self, request, tenant_id, net_details=False): def _items(self, request, tenant_id, net_details=False):
""" Returns a list of networks. """ """ Returns a list of networks.
networks = self._plugin.get_all_networks(tenant_id) Ideally, the plugin would perform filtering,
returning only the items matching filters specified
on the request query string.
However, plugins are not required to support filtering.
In this case, this function will filter the complete list
of networks returned by the plugin
"""
filter_opts = {}
filter_opts.update(request.str_GET)
networks = self._plugin.get_all_networks(tenant_id,
filter_opts=filter_opts)
# Inefficient, API-layer filtering
# will be performed only for the filters not implemented by the plugin
# NOTE(salvatore-orlando): the plugin is supposed to leave only filters
# it does not implement in filter_opts
networks = filters.filter_networks(networks,
self._plugin,
tenant_id,
filter_opts)
builder = networks_view.get_view_builder(request, self.version) builder = networks_view.get_view_builder(request, self.version)
result = [builder.build(network, net_details)['network'] result = [builder.build(network, net_details)['network']
for network in networks] for network in networks]

View File

@ -16,6 +16,7 @@
import logging import logging
from quantum.api import api_common as common from quantum.api import api_common as common
from quantum.api.views import filters
from quantum.api.views import ports as ports_view from quantum.api.views import ports as ports_view
from quantum.common import exceptions as exception from quantum.common import exceptions as exception
@ -48,11 +49,25 @@ class Controller(common.QuantumController):
def _items(self, request, tenant_id, network_id, def _items(self, request, tenant_id, network_id,
port_details=False): port_details=False):
""" Returns a list of ports. """ """ Returns a list of ports.
port_list = self._plugin.get_all_ports(tenant_id, network_id) Ideally, the plugin would perform filtering,
returning only the items matching filters specified
on the request query string.
However, plugins are not required to support filtering.
In this case, this function will filter the complete list
of ports returned by the plugin
"""
filter_opts = {}
filter_opts.update(request.str_GET)
port_list = self._plugin.get_all_ports(tenant_id,
network_id,
filter_opts=filter_opts)
builder = ports_view.get_view_builder(request, self.version) builder = ports_view.get_view_builder(request, self.version)
# Load extra data for ports if required. # Load extra data for ports if required.
# This can be inefficient.
# TODO(salvatore-orlando): the fix for bug #834012 should deal with it
if port_details: if port_details:
port_list_detail = \ port_list_detail = \
[self._plugin.get_port_details( [self._plugin.get_port_details(
@ -60,6 +75,16 @@ class Controller(common.QuantumController):
for port in port_list] for port in port_list]
port_list = port_list_detail port_list = port_list_detail
# Perform manual filtering if not supported by plugin
# Inefficient, API-layer filtering
# will be performed only if the plugin does
# not support filtering
# NOTE(salvatore-orlando): the plugin is supposed to leave only filters
# it does not implement in filter_opts
port_list = filters.filter_ports(port_list, self._plugin,
tenant_id, network_id,
filter_opts)
result = [builder.build(port, port_details)['port'] result = [builder.build(port, port_details)['port']
for port in port_list] for port in port_list]
return dict(ports=result) return dict(ports=result)

View File

@ -0,0 +1,160 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Citrix Systems
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
LOG = logging.getLogger('quantum.api.views.filters')
def _load_network_ports_details(network, **kwargs):
plugin = kwargs.get('plugin', None)
tenant_id = kwargs.get('tenant_id', None)
#load network details only if required
if not 'net-ports' in network:
# Don't pass filter options, don't care about unused filters
port_list = plugin.get_all_ports(tenant_id, network['net-id'])
ports_data = [plugin.get_port_details(
tenant_id, network['net-id'],
port['port-id'])
for port in port_list]
network['net-ports'] = ports_data
def _filter_network_by_name(network, name, **kwargs):
return network.get('net-name', None) == name
def _filter_network_with_operational_port(network, port_op_status,
**kwargs):
_load_network_ports_details(network, **kwargs)
return any([port['port-op-status'] == port_op_status
for port in network['net-ports']])
def _filter_network_with_active_port(network, port_state, **kwargs):
_load_network_ports_details(network, **kwargs)
return any([port['port-state'] == port_state
for port in network['net-ports']])
def _filter_network_has_interface(network, has_interface, **kwargs):
_load_network_ports_details(network, **kwargs)
# convert to bool
match_has_interface = has_interface.lower() == 'true'
really_has_interface = any([port['attachment'] is not None
for port in network['net-ports']])
return match_has_interface == really_has_interface
def _filter_network_by_port(network, port_id, **kwargs):
_load_network_ports_details(network, **kwargs)
return any([port['port-id'] == port_id
for port in network['net-ports']])
def _filter_network_by_interface(network, interface_id, **kwargs):
_load_network_ports_details(network, **kwargs)
return any([port.get('attachment', None) == interface_id
for port in network['net-ports']])
def _filter_port_by_state(port, state, **kwargs):
return port.get('port-state', None) == state
def _filter_network_by_op_status(network, op_status, **kwargs):
return network.get('net-op-status', None) == op_status
def _filter_port_by_op_status(port, op_status, **kwargs):
return port.get('port-op-status', None) == op_status
def _filter_port_by_interface(port, interface_id, **kwargs):
return port.get('attachment', None) == interface_id
def _filter_port_has_interface(port, has_interface, **kwargs):
# convert to bool
match_has_interface = has_interface.lower() == 'true'
really_has_interface = 'attachment' in port and port['attachment'] != None
return match_has_interface == really_has_interface
def _do_filtering(items, filters, filter_opts, plugin,
tenant_id, network_id=None):
filtered_items = []
for item in items:
is_filter_match = False
for flt in filters:
if flt in filter_opts:
is_filter_match = filters[flt](item,
filter_opts[flt],
plugin=plugin,
tenant_id=tenant_id,
network_id=network_id)
if not is_filter_match:
break
if is_filter_match:
filtered_items.append(item)
return filtered_items
def filter_networks(networks, plugin, tenant_id, filter_opts):
# Do filtering only if the plugin supports it
# and if filtering options have been specific
if len(filter_opts) == 0:
return networks
# load filter functions
filters = {
'name': _filter_network_by_name,
'op-status': _filter_network_by_op_status,
'port-op-status': _filter_network_with_operational_port,
'port-state': _filter_network_with_active_port,
'has-attachment': _filter_network_has_interface,
'attachment': _filter_network_by_interface,
'port': _filter_network_by_port}
# filter networks
return _do_filtering(networks, filters, filter_opts, plugin, tenant_id)
def filter_ports(ports, plugin, tenant_id, network_id, filter_opts):
# Do filtering only if the plugin supports it
# and if filtering options have been specific
if len(filter_opts) == 0:
return ports
# load filter functions
filters = {
'state': _filter_port_by_state,
'op-status': _filter_port_by_op_status,
'has-attachment': _filter_port_has_interface,
'attachment': _filter_port_by_interface}
# port details are need for filtering
ports = [plugin.get_port_details(tenant_id, network_id,
port['port-id'])
for port in ports]
# filter ports
return _do_filtering(ports,
filters,
filter_opts,
plugin,
tenant_id,
network_id)

View File

@ -54,7 +54,7 @@ class L2Network(QuantumPluginBase):
""" """
Core API implementation Core API implementation
""" """
def get_all_networks(self, tenant_id): def get_all_networks(self, tenant_id, **kwargs):
""" """
Returns a dictionary containing all Returns a dictionary containing all
<network_uuid, network_name> for <network_uuid, network_name> for
@ -154,7 +154,7 @@ class L2Network(QuantumPluginBase):
[]) [])
return net_dict return net_dict
def get_all_ports(self, tenant_id, net_id): def get_all_ports(self, tenant_id, net_id, **kwargs):
""" """
Retrieves all port identifiers belonging to the Retrieves all port identifiers belonging to the
specified Virtual Network. specified Virtual Network.

View File

@ -106,7 +106,7 @@ class OVSQuantumPlugin(QuantumPluginBase):
# % (vlan_id, network_id)) # % (vlan_id, network_id))
self.vmap.set_vlan(vlan_id, network_id) self.vmap.set_vlan(vlan_id, network_id)
def get_all_networks(self, tenant_id): def get_all_networks(self, tenant_id, **kwargs):
nets = [] nets = []
for x in db.network_list(tenant_id): for x in db.network_list(tenant_id):
LOG.debug("Adding network: %s" % x.uuid) LOG.debug("Adding network: %s" % x.uuid)
@ -167,9 +167,10 @@ class OVSQuantumPlugin(QuantumPluginBase):
'net-id': port.network_id, 'net-id': port.network_id,
'attachment': port.interface_id} 'attachment': port.interface_id}
def get_all_ports(self, tenant_id, net_id): def get_all_ports(self, tenant_id, net_id, **kwargs):
ids = [] ids = []
ports = db.port_list(net_id) ports = db.port_list(net_id)
# This plugin does not perform filtering at the moment
return [{'port-id': str(p.uuid)} for p in ports] return [{'port-id': str(p.uuid)} for p in ports]
def create_port(self, tenant_id, net_id, port_state=None, **kwargs): def create_port(self, tenant_id, net_id, port_state=None, **kwargs):

View File

@ -165,13 +165,17 @@ class FakePlugin(object):
att_id=port['interface_id'], att_id=port['interface_id'],
att_port_id=port['uuid']) att_port_id=port['uuid'])
def get_all_networks(self, tenant_id): def get_all_networks(self, tenant_id, **kwargs):
""" """
Returns a dictionary containing all Returns a dictionary containing all
<network_uuid, network_name> for <network_uuid, network_name> for
the specified tenant. the specified tenant.
""" """
LOG.debug("FakePlugin.get_all_networks() called") LOG.debug("FakePlugin.get_all_networks() called")
filter_opts = kwargs.get('filter_opts', None)
if not filter_opts is None and len(filter_opts) > 0:
LOG.debug("filtering options were passed to the plugin"
"but the Fake plugin does not support them")
nets = [] nets = []
for net in db.network_list(tenant_id): for net in db.network_list(tenant_id):
net_item = {'net-id': str(net.uuid), net_item = {'net-id': str(net.uuid),
@ -232,12 +236,16 @@ class FakePlugin(object):
net = db.network_update(net_id, tenant_id, **kwargs) net = db.network_update(net_id, tenant_id, **kwargs)
return net return net
def get_all_ports(self, tenant_id, net_id): def get_all_ports(self, tenant_id, net_id, **kwargs):
""" """
Retrieves all port identifiers belonging to the Retrieves all port identifiers belonging to the
specified Virtual Network. specified Virtual Network.
""" """
LOG.debug("FakePlugin.get_all_ports() called") LOG.debug("FakePlugin.get_all_ports() called")
filter_opts = kwargs.get('filter_opts', None)
if not filter_opts is None and len(filter_opts) > 0:
LOG.debug("filtering options were passed to the plugin"
"but the Fake plugin does not support them")
port_ids = [] port_ids = []
ports = db.port_list(net_id) ports = db.port_list(net_id)
for x in ports: for x in ports:

View File

@ -31,11 +31,16 @@ class QuantumPluginBase(object):
__metaclass__ = ABCMeta __metaclass__ = ABCMeta
@abstractmethod @abstractmethod
def get_all_networks(self, tenant_id): def get_all_networks(self, tenant_id, **kwargs):
""" """
Returns a dictionary containing all Returns a dictionary containing all
<network_uuid, network_name> for <network_uuid, network_name> for
the specified tenant. the specified tenant.
:param tenant_id: unique identifier for the tenant whose networks
are being retrieved by this method
:param **kwargs: options to be passed to the plugin. The following
keywork based-options can be specified:
filter_opts - options for filtering network list
:returns: a list of mapping sequences with the following signature: :returns: a list of mapping sequences with the following signature:
[ {'net-id': uuid that uniquely identifies [ {'net-id': uuid that uniquely identifies
the particular quantum network, the particular quantum network,
@ -119,11 +124,17 @@ class QuantumPluginBase(object):
pass pass
@abstractmethod @abstractmethod
def get_all_ports(self, tenant_id, net_id): def get_all_ports(self, tenant_id, net_id, **kwargs):
""" """
Retrieves all port identifiers belonging to the Retrieves all port identifiers belonging to the
specified Virtual Network. specified Virtual Network.
:param tenant_id: unique identifier for the tenant for which this
method is going to retrieve ports
:param net_id: unique identifiers for the network whose ports are
about to be retrieved
:param **kwargs: options to be passed to the plugin. The following
keywork based-options can be specified:
filter_opts - options for filtering network list
:returns: a list of mapping sequences with the following signature: :returns: a list of mapping sequences with the following signature:
[ {'port-id': uuid representing a particular port [ {'port-id': uuid representing a particular port
on the specified quantum network on the specified quantum network

View File

@ -34,10 +34,7 @@ ATTS = "attachments"
class AbstractAPITest(unittest.TestCase): class AbstractAPITest(unittest.TestCase):
"""Abstract base class for Quantum API unit tests """ Base class definiting some methods for API tests """
Defined according to operations defined for Quantum API v1.0
"""
def _deserialize_net_response(self, content_type, response): def _deserialize_net_response(self, content_type, response):
network_data = self._net_deserializers[content_type].\ network_data = self._net_deserializers[content_type].\
@ -85,10 +82,57 @@ class AbstractAPITest(unittest.TestCase):
if expected_res_status in (200, 202): if expected_res_status in (200, 202):
port_data = self._deserialize_port_response(content_type, port_data = self._deserialize_port_response(content_type,
port_res) port_res)
LOG.debug("PORT RESPONSE:%s", port_res.body)
LOG.debug("PORT DATA:%s", port_data)
return port_data['port']['id'] return port_data['port']['id']
def _set_attachment(self, network_id, port_id, interface_id, fmt,
expected_res_status=204):
put_attachment_req = testlib.put_attachment_request(self.tenant_id,
network_id,
port_id,
interface_id,
fmt)
put_attachment_res = put_attachment_req.get_response(self.api)
self.assertEqual(put_attachment_res.status_int, expected_res_status)
def setUp(self, api_router_klass, xml_metadata_dict):
options = {}
options['plugin_provider'] = test_config['plugin_name']
api_router_cls = utils.import_class(api_router_klass)
self.api = api_router_cls(options)
self.tenant_id = "test_tenant"
self.network_name = "test_network"
# Prepare XML & JSON deserializers
net_xml_deserializer = XMLDeserializer(xml_metadata_dict[NETS])
port_xml_deserializer = XMLDeserializer(xml_metadata_dict[PORTS])
att_xml_deserializer = XMLDeserializer(xml_metadata_dict[ATTS])
json_deserializer = JSONDeserializer()
self._net_deserializers = {
'application/xml': net_xml_deserializer,
'application/json': json_deserializer,
}
self._port_deserializers = {
'application/xml': port_xml_deserializer,
'application/json': json_deserializer,
}
self._att_deserializers = {
'application/xml': att_xml_deserializer,
'application/json': json_deserializer,
}
def tearDown(self):
"""Clear the test environment"""
# Remove database contents
db.clear_db()
class BaseAPIOperationsTest(AbstractAPITest):
"""Abstract base class for Quantum API unit tests
Defined according to operations defined for Quantum API v1.0
"""
def _test_create_network(self, fmt): def _test_create_network(self, fmt):
LOG.debug("_test_create_network - fmt:%s - START", fmt) LOG.debug("_test_create_network - fmt:%s - START", fmt)
content_type = "application/%s" % fmt content_type = "application/%s" % fmt
@ -844,39 +888,6 @@ class AbstractAPITest(unittest.TestCase):
LOG.debug("_test_unparsable_data - " \ LOG.debug("_test_unparsable_data - " \
"fmt:%s - END", fmt) "fmt:%s - END", fmt)
def setUp(self, api_router_klass, xml_metadata_dict):
options = {}
options['plugin_provider'] = test_config['plugin_name']
api_router_cls = utils.import_class(api_router_klass)
self.api = api_router_cls(options)
self.tenant_id = "test_tenant"
self.network_name = "test_network"
# Prepare XML & JSON deserializers
net_xml_deserializer = XMLDeserializer(xml_metadata_dict[NETS])
port_xml_deserializer = XMLDeserializer(xml_metadata_dict[PORTS])
att_xml_deserializer = XMLDeserializer(xml_metadata_dict[ATTS])
json_deserializer = JSONDeserializer()
self._net_deserializers = {
'application/xml': net_xml_deserializer,
'application/json': json_deserializer,
}
self._port_deserializers = {
'application/xml': port_xml_deserializer,
'application/json': json_deserializer,
}
self._att_deserializers = {
'application/xml': att_xml_deserializer,
'application/json': json_deserializer,
}
def tearDown(self):
"""Clear the test environment"""
# Remove database contents
db.clear_db()
def test_list_networks_json(self): def test_list_networks_json(self):
self._test_list_networks('json') self._test_list_networks('json')

View File

@ -16,17 +16,23 @@
# under the License. # under the License.
# @author: Salvatore Orlando, Citrix Systems # @author: Salvatore Orlando, Citrix Systems
import logging
from webob import exc from webob import exc
import quantum.api.attachments as atts import quantum.api.attachments as atts
import quantum.api.networks as nets import quantum.api.networks as nets
import quantum.api.ports as ports import quantum.api.ports as ports
import quantum.tests.unit._test_api as test_api import quantum.tests.unit._test_api as test_api
import quantum.tests.unit.testlib_api as testlib
from quantum.common.test_lib import test_config from quantum.common.test_lib import test_config
class APITestV10(test_api.AbstractAPITest): LOG = logging.getLogger('quantum.tests.test_api')
class APITestV10(test_api.BaseAPIOperationsTest):
def assert_network(self, **kwargs): def assert_network(self, **kwargs):
self.assertEqual({'id': kwargs['id'], self.assertEqual({'id': kwargs['id'],
@ -63,7 +69,7 @@ class APITestV10(test_api.AbstractAPITest):
self._already_attached_code = 440 self._already_attached_code = 440
class APITestV11(test_api.AbstractAPITest): class APITestV11(test_api.BaseAPIOperationsTest):
def assert_network(self, **kwargs): def assert_network(self, **kwargs):
self.assertEqual({'id': kwargs['id'], self.assertEqual({'id': kwargs['id'],
@ -107,3 +113,247 @@ class APITestV11(test_api.AbstractAPITest):
self._port_state_invalid_code = exc.HTTPBadRequest.code self._port_state_invalid_code = exc.HTTPBadRequest.code
self._port_in_use_code = exc.HTTPConflict.code self._port_in_use_code = exc.HTTPConflict.code
self._already_attached_code = exc.HTTPConflict.code self._already_attached_code = exc.HTTPConflict.code
class APIFiltersTest(test_api.AbstractAPITest):
""" Test case for API filters.
Uses controller for API v1.1
"""
def _do_filtered_network_list_request(self, flt):
list_network_req = testlib.network_list_request(self.tenant_id,
self.fmt,
query_string=flt)
list_network_res = list_network_req.get_response(self.api)
self.assertEqual(list_network_res.status_int, 200)
network_data = self._net_deserializers[self.content_type].\
deserialize(list_network_res.body)['body']
return network_data
def _do_filtered_port_list_request(self, flt, network_id):
list_port_req = testlib.port_list_request(self.tenant_id,
network_id,
self.fmt,
query_string=flt)
list_port_res = list_port_req.get_response(self.api)
self.assertEqual(list_port_res.status_int, 200)
port_data = self._port_deserializers[self.content_type].\
deserialize(list_port_res.body)['body']
return port_data
def setUp(self):
super(APIFiltersTest, self).setUp('quantum.api.APIRouterV11',
{test_api.NETS: nets.ControllerV11._serialization_metadata,
test_api.PORTS: ports.ControllerV11._serialization_metadata,
test_api.ATTS: atts.ControllerV11._serialization_metadata})
self.net_op_status = test_config.get('default_net_op_status',
'UNKNOWN')
self.port_op_status = test_config.get('default_port_op_status',
'UNKNOWN')
self.fmt = "xml"
self.content_type = "application/%s" % self.fmt
# create data for validating filters
# Create network "test-1"
self.net1_id = self._create_network(self.fmt, name="test-1")
# Add 2 ports, 1 ACTIVE, 1 DOWN
self.port11_id = self._create_port(self.net1_id, "ACTIVE", self.fmt)
self.port12_id = self._create_port(self.net1_id, "DOWN", self.fmt)
# Put attachment "test-1-att" in active port
self._set_attachment(self.net1_id,
self.port11_id,
"test-1-att",
self.fmt)
# Create network "test-2"
# Add 2 ports, 2 ACTIVE, 0 DOWN
self.net2_id = self._create_network(self.fmt, name="test-2")
self.port21_id = self._create_port(self.net2_id, "ACTIVE", self.fmt)
self.port22_id = self._create_port(self.net2_id, "ACTIVE", self.fmt)
def test_network_name_filter(self):
LOG.debug("test_network_name_filter - START")
flt = "name=test-1"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
flt = "name=non-existent"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 0
self.assertEqual(len(network_data['networks']), 0)
LOG.debug("test_network_name_filter - END")
def test_network_op_status_filter(self):
LOG.debug("test_network_op_status_filter - START")
# First filter for networks in default status
flt = "op-status=%s" % self.net_op_status
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 2
self.assertEqual(len(network_data['networks']), 2)
# And then for networks in 'DOWN' status
flt = "op-status=DOWN"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 0
self.assertEqual(len(network_data['networks']), 0)
LOG.debug("test_network_op_status_filter - END")
def test_network_port_op_status_filter(self):
LOG.debug("test_network_port_op_status_filter - START")
# First filter for networks with ports in default op status
flt = "port-op-status=%s" % self.port_op_status
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 2
self.assertEqual(len(network_data['networks']), 2)
LOG.debug("test_network_port_op_status_filter - END")
def test_network_port_state_filter(self):
LOG.debug("test_network_port_state_filter - START")
# First filter for networks with ports 'ACTIVE'
flt = "port-state=ACTIVE"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 2
self.assertEqual(len(network_data['networks']), 2)
# And then for networks with ports in 'DOWN' admin state
flt = "port-state=DOWN"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
LOG.debug("test_network_port_state_filter - END")
def test_network_has_attachment_filter(self):
LOG.debug("test_network_has_attachment_filter - START")
# First filter for networks with ports 'ACTIVE'
flt = "has-attachment=True"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
# And then for networks with ports in 'DOWN' admin state
flt = "has-attachment=False"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
LOG.debug("test_network_has_attachment_filter - END")
def test_network_port_filter(self):
LOG.debug("test_network_port_filter - START")
flt = "port=%s" % self.port11_id
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
flt = "port=%s" % self.port21_id
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
self.assertEqual(network_data['networks'][0]['id'], self.net2_id)
LOG.debug("test_network_port_filter - END")
def test_network_attachment_filter(self):
LOG.debug("test_network_attachment_filter - START")
flt = "attachment=test-1-att"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
flt = "attachment=non-existent"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 0
self.assertEqual(len(network_data['networks']), 0)
LOG.debug("test_network_attachment_filter - END")
def test_network_multiple_filters(self):
LOG.debug("test_network_multiple_filters - START")
# Add some data for having more fun
another_net_id = self._create_network(self.fmt, name="test-1")
# Add 1 ACTIVE port
self._create_port(another_net_id, "ACTIVE", self.fmt)
# Do the filtering
flt = "name=test-1&port-state=ACTIVE&attachment=test-1-att"
network_data = self._do_filtered_network_list_request(flt)
# Check network count: should return 1
self.assertEqual(len(network_data['networks']), 1)
self.assertEqual(network_data['networks'][0]['id'], self.net1_id)
LOG.debug("test_network_multiple_filters - END")
def test_port_state_filter(self):
LOG.debug("test_port_state_filter - START")
# First filter for 'ACTIVE' ports in 1st network
flt = "state=ACTIVE"
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 1
self.assertEqual(len(port_data['ports']), 1)
# And then in 2nd network
port_data = self._do_filtered_port_list_request(flt, self.net2_id)
# Check port count: should return 2
self.assertEqual(len(port_data['ports']), 2)
LOG.debug("test_port_state_filter - END")
def test_port_op_status_filter(self):
LOG.debug("test_port_op_status_filter - START")
# First filter for 'UP' ports in 1st network
flt = "op-status=%s" % self.port_op_status
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 2
self.assertEqual(len(port_data['ports']), 2)
LOG.debug("test_port_op_status_filter - END")
def test_port_has_attachment_filter(self):
LOG.debug("test_port_has_attachment_filter - START")
# First search for ports with attachments in 1st network
flt = "has-attachment=True"
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 1
self.assertEqual(len(port_data['ports']), 1)
self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
# And then for ports without attachment in 2nd network
flt = "has-attachment=False"
port_data = self._do_filtered_port_list_request(flt, self.net2_id)
# Check port count: should return 2
self.assertEqual(len(port_data['ports']), 2)
LOG.debug("test_port_has_attachment_filter - END")
def test_port_attachment_filter(self):
LOG.debug("test_port_attachment_filter - START")
# First search for ports with attachments in 1st network
flt = "attachment=test-1-att"
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 1
self.assertEqual(len(port_data['ports']), 1)
self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
# And then for a non-existent attachment in 2nd network
flt = "attachment=non-existent"
port_data = self._do_filtered_port_list_request(flt, self.net2_id)
# Check port count: should return 0
self.assertEqual(len(port_data['ports']), 0)
LOG.debug("test_port_has_attachment_filter - END")
def test_port_multiple_filters(self):
LOG.debug("test_port_multiple_filters - START")
flt = "op-status=%s&state=DOWN" % self.port_op_status
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 1
self.assertEqual(len(port_data['ports']), 1)
self.assertEqual(port_data['ports'][0]['id'], self.port12_id)
flt = "state=ACTIVE&attachment=test-1-att"
port_data = self._do_filtered_port_list_request(flt, self.net1_id)
# Check port count: should return 1
self.assertEqual(len(port_data['ports']), 1)
self.assertEqual(port_data['ports'][0]['id'], self.port11_id)
flt = "state=ACTIVE&has-attachment=False"
port_data = self._do_filtered_port_list_request(flt, self.net2_id)
# Check port count: should return 2
self.assertEqual(len(port_data['ports']), 2)
LOG.debug("test_port_multiple_filters - END")

View File

@ -3,8 +3,12 @@ import webob
from quantum.common.serializer import Serializer from quantum.common.serializer import Serializer
def create_request(path, body, content_type, method='GET'): def create_request(path, body, content_type, method='GET', query_string=None):
req = webob.Request.blank(path) if query_string:
url = "%s?%s" % (path, query_string)
else:
url = path
req = webob.Request.blank(url)
req.method = method req.method = method
req.headers = {} req.headers = {}
req.headers['Accept'] = content_type req.headers['Accept'] = content_type
@ -12,17 +16,18 @@ def create_request(path, body, content_type, method='GET'):
return req return req
def _network_list_request(tenant_id, format='xml', detail=False): def _network_list_request(tenant_id, format='xml', detail=False,
query_string=None):
method = 'GET' method = 'GET'
detail_str = detail and '/detail' or '' detail_str = detail and '/detail' or ''
path = "/tenants/%(tenant_id)s/networks" \ path = "/tenants/%(tenant_id)s/networks" \
"%(detail_str)s.%(format)s" % locals() "%(detail_str)s.%(format)s" % locals()
content_type = "application/%s" % format content_type = "application/%s" % format
return create_request(path, None, content_type, method) return create_request(path, None, content_type, method, query_string)
def network_list_request(tenant_id, format='xml'): def network_list_request(tenant_id, format='xml', query_string=None):
return _network_list_request(tenant_id, format) return _network_list_request(tenant_id, format, query_string=query_string)
def network_list_detail_request(tenant_id, format='xml'): def network_list_detail_request(tenant_id, format='xml'):
@ -75,17 +80,21 @@ def network_delete_request(tenant_id, network_id, format='xml'):
return create_request(path, None, content_type, method) return create_request(path, None, content_type, method)
def _port_list_request(tenant_id, network_id, format='xml', detail=False): def _port_list_request(tenant_id, network_id, format='xml',
detail=False, query_string=None):
method = 'GET' method = 'GET'
detail_str = detail and '/detail' or '' detail_str = detail and '/detail' or ''
path = "/tenants/%(tenant_id)s/networks/" \ path = "/tenants/%(tenant_id)s/networks/" \
"%(network_id)s/ports%(detail_str)s.%(format)s" % locals() "%(network_id)s/ports%(detail_str)s.%(format)s" % locals()
content_type = "application/%s" % format content_type = "application/%s" % format
return create_request(path, None, content_type, method) return create_request(path, None, content_type, method, query_string)
def port_list_request(tenant_id, network_id, format='xml'): def port_list_request(tenant_id, network_id, format='xml', query_string=None):
return _port_list_request(tenant_id, network_id, format) return _port_list_request(tenant_id,
network_id,
format,
query_string=query_string)
def port_list_detail_request(tenant_id, network_id, format='xml'): def port_list_detail_request(tenant_id, network_id, format='xml'):