b80f1d0be3
Change-Id: I0a625019ab7495a71125edbd37d9005a4675b86b Partial-Bug: #1862968 Depends-On: https://review.opendev.org/709122
1446 lines
56 KiB
Python
1446 lines
56 KiB
Python
# Copyright 2012 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import math
|
|
import time
|
|
|
|
import netaddr
|
|
from neutron_lib import constants as const
|
|
from oslo_log import log
|
|
from tempest.common import utils as tutils
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib import exceptions as lib_exc
|
|
from tempest import test
|
|
|
|
from neutron_tempest_plugin.api import clients
|
|
from neutron_tempest_plugin.common import constants
|
|
from neutron_tempest_plugin.common import utils
|
|
from neutron_tempest_plugin import config
|
|
from neutron_tempest_plugin import exceptions
|
|
|
|
CONF = config.CONF
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class BaseNetworkTest(test.BaseTestCase):
|
|
|
|
"""Base class for Neutron tests that use the Tempest Neutron REST client
|
|
|
|
Per the Neutron API Guide, API v1.x was removed from the source code tree
|
|
(docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
|
|
Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
|
|
following options are defined in the [network] section of etc/tempest.conf:
|
|
|
|
project_network_cidr with a block of cidr's from which smaller blocks
|
|
can be allocated for tenant networks
|
|
|
|
project_network_mask_bits with the mask bits to be used to partition
|
|
the block defined by tenant-network_cidr
|
|
|
|
Finally, it is assumed that the following option is defined in the
|
|
[service_available] section of etc/tempest.conf
|
|
|
|
neutron as True
|
|
"""
|
|
|
|
force_tenant_isolation = False
|
|
credentials = ['primary']
|
|
|
|
# Default to ipv4.
|
|
_ip_version = const.IP_VERSION_4
|
|
|
|
# Derive from BaseAdminNetworkTest class to have this initialized
|
|
admin_client = None
|
|
|
|
external_network_id = CONF.network.public_network_id
|
|
|
|
@classmethod
|
|
def get_client_manager(cls, credential_type=None, roles=None,
|
|
force_new=None):
|
|
manager = super(BaseNetworkTest, cls).get_client_manager(
|
|
credential_type=credential_type,
|
|
roles=roles,
|
|
force_new=force_new
|
|
)
|
|
# Neutron uses a different clients manager than the one in the Tempest
|
|
# save the original in case mixed tests need it
|
|
if credential_type == 'primary':
|
|
cls.os_tempest = manager
|
|
return clients.Manager(manager.credentials)
|
|
|
|
@classmethod
|
|
def skip_checks(cls):
|
|
super(BaseNetworkTest, cls).skip_checks()
|
|
if not CONF.service_available.neutron:
|
|
raise cls.skipException("Neutron support is required")
|
|
if (cls._ip_version == const.IP_VERSION_6 and
|
|
not CONF.network_feature_enabled.ipv6):
|
|
raise cls.skipException("IPv6 Tests are disabled.")
|
|
for req_ext in getattr(cls, 'required_extensions', []):
|
|
if not tutils.is_extension_enabled(req_ext, 'network'):
|
|
msg = "%s extension not enabled." % req_ext
|
|
raise cls.skipException(msg)
|
|
|
|
@classmethod
|
|
def setup_credentials(cls):
|
|
# Create no network resources for these test.
|
|
cls.set_network_resources()
|
|
super(BaseNetworkTest, cls).setup_credentials()
|
|
|
|
@classmethod
|
|
def setup_clients(cls):
|
|
super(BaseNetworkTest, cls).setup_clients()
|
|
cls.client = cls.os_primary.network_client
|
|
|
|
@classmethod
|
|
def resource_setup(cls):
|
|
super(BaseNetworkTest, cls).resource_setup()
|
|
|
|
cls.networks = []
|
|
cls.admin_networks = []
|
|
cls.subnets = []
|
|
cls.admin_subnets = []
|
|
cls.ports = []
|
|
cls.routers = []
|
|
cls.floating_ips = []
|
|
cls.port_forwardings = []
|
|
cls.metering_labels = []
|
|
cls.service_profiles = []
|
|
cls.flavors = []
|
|
cls.metering_label_rules = []
|
|
cls.qos_rules = []
|
|
cls.qos_policies = []
|
|
cls.ethertype = "IPv" + str(cls._ip_version)
|
|
cls.address_scopes = []
|
|
cls.admin_address_scopes = []
|
|
cls.subnetpools = []
|
|
cls.admin_subnetpools = []
|
|
cls.security_groups = []
|
|
cls.admin_security_groups = []
|
|
cls.projects = []
|
|
cls.log_objects = []
|
|
cls.reserved_subnet_cidrs = set()
|
|
cls.keypairs = []
|
|
cls.trunks = []
|
|
cls.network_segment_ranges = []
|
|
cls.conntrack_helpers = []
|
|
|
|
@classmethod
|
|
def resource_cleanup(cls):
|
|
if CONF.service_available.neutron:
|
|
# Clean up trunks
|
|
for trunk in cls.trunks:
|
|
cls._try_delete_resource(cls.delete_trunk, trunk)
|
|
|
|
# Clean up port forwardings
|
|
for pf in cls.port_forwardings:
|
|
cls._try_delete_resource(cls.delete_port_forwarding, pf)
|
|
|
|
# Clean up floating IPs
|
|
for floating_ip in cls.floating_ips:
|
|
cls._try_delete_resource(cls.delete_floatingip, floating_ip)
|
|
|
|
# Clean up conntrack helpers
|
|
for cth in cls.conntrack_helpers:
|
|
cls._try_delete_resource(cls.delete_conntrack_helper, cth)
|
|
|
|
# Clean up routers
|
|
for router in cls.routers:
|
|
cls._try_delete_resource(cls.delete_router,
|
|
router)
|
|
# Clean up metering label rules
|
|
for metering_label_rule in cls.metering_label_rules:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_metering_label_rule,
|
|
metering_label_rule['id'])
|
|
# Clean up metering labels
|
|
for metering_label in cls.metering_labels:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_metering_label,
|
|
metering_label['id'])
|
|
# Clean up flavors
|
|
for flavor in cls.flavors:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_flavor,
|
|
flavor['id'])
|
|
# Clean up service profiles
|
|
for service_profile in cls.service_profiles:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_service_profile,
|
|
service_profile['id'])
|
|
# Clean up ports
|
|
for port in cls.ports:
|
|
cls._try_delete_resource(cls.client.delete_port,
|
|
port['id'])
|
|
# Clean up subnets
|
|
for subnet in cls.subnets:
|
|
cls._try_delete_resource(cls.client.delete_subnet,
|
|
subnet['id'])
|
|
# Clean up admin subnets
|
|
for subnet in cls.admin_subnets:
|
|
cls._try_delete_resource(cls.admin_client.delete_subnet,
|
|
subnet['id'])
|
|
# Clean up networks
|
|
for network in cls.networks:
|
|
cls._try_delete_resource(cls.delete_network, network)
|
|
|
|
# Clean up admin networks
|
|
for network in cls.admin_networks:
|
|
cls._try_delete_resource(cls.admin_client.delete_network,
|
|
network['id'])
|
|
|
|
# Clean up security groups
|
|
for security_group in cls.security_groups:
|
|
cls._try_delete_resource(cls.delete_security_group,
|
|
security_group)
|
|
|
|
# Clean up admin security groups
|
|
for security_group in cls.admin_security_groups:
|
|
cls._try_delete_resource(cls.delete_security_group,
|
|
security_group,
|
|
client=cls.admin_client)
|
|
|
|
for subnetpool in cls.subnetpools:
|
|
cls._try_delete_resource(cls.client.delete_subnetpool,
|
|
subnetpool['id'])
|
|
|
|
for subnetpool in cls.admin_subnetpools:
|
|
cls._try_delete_resource(cls.admin_client.delete_subnetpool,
|
|
subnetpool['id'])
|
|
|
|
for address_scope in cls.address_scopes:
|
|
cls._try_delete_resource(cls.client.delete_address_scope,
|
|
address_scope['id'])
|
|
|
|
for address_scope in cls.admin_address_scopes:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_address_scope,
|
|
address_scope['id'])
|
|
|
|
for project in cls.projects:
|
|
cls._try_delete_resource(
|
|
cls.identity_admin_client.delete_project,
|
|
project['id'])
|
|
|
|
# Clean up QoS rules
|
|
for qos_rule in cls.qos_rules:
|
|
cls._try_delete_resource(cls.admin_client.delete_qos_rule,
|
|
qos_rule['id'])
|
|
# Clean up QoS policies
|
|
# as all networks and ports are already removed, QoS policies
|
|
# shouldn't be "in use"
|
|
for qos_policy in cls.qos_policies:
|
|
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
|
|
qos_policy['id'])
|
|
|
|
# Clean up log_objects
|
|
for log_object in cls.log_objects:
|
|
cls._try_delete_resource(cls.admin_client.delete_log,
|
|
log_object['id'])
|
|
|
|
for keypair in cls.keypairs:
|
|
cls._try_delete_resource(cls.delete_keypair, keypair)
|
|
|
|
# Clean up network_segment_ranges
|
|
for network_segment_range in cls.network_segment_ranges:
|
|
cls._try_delete_resource(
|
|
cls.admin_client.delete_network_segment_range,
|
|
network_segment_range['id'])
|
|
|
|
super(BaseNetworkTest, cls).resource_cleanup()
|
|
|
|
@classmethod
|
|
def _try_delete_resource(cls, delete_callable, *args, **kwargs):
|
|
"""Cleanup resources in case of test-failure
|
|
|
|
Some resources are explicitly deleted by the test.
|
|
If the test failed to delete a resource, this method will execute
|
|
the appropriate delete methods. Otherwise, the method ignores NotFound
|
|
exceptions thrown for resources that were correctly deleted by the
|
|
test.
|
|
|
|
:param delete_callable: delete method
|
|
:param args: arguments for delete method
|
|
:param kwargs: keyword arguments for delete method
|
|
"""
|
|
try:
|
|
delete_callable(*args, **kwargs)
|
|
# if resource is not found, this means it was deleted in the test
|
|
except lib_exc.NotFound:
|
|
pass
|
|
|
|
@classmethod
|
|
def create_network(cls, network_name=None, client=None, external=None,
|
|
shared=None, provider_network_type=None,
|
|
provider_physical_network=None,
|
|
provider_segmentation_id=None, **kwargs):
|
|
"""Create a network.
|
|
|
|
When client is not provider and admin_client is attribute is not None
|
|
(for example when using BaseAdminNetworkTest base class) and using any
|
|
of the convenience parameters (external, shared, provider_network_type,
|
|
provider_physical_network and provider_segmentation_id) it silently
|
|
uses admin_client. If the network is not shared then it uses the same
|
|
project_id as regular client.
|
|
|
|
:param network_name: Human-readable name of the network
|
|
|
|
:param client: client to be used for connecting to network service
|
|
|
|
:param external: indicates whether the network has an external routing
|
|
facility that's not managed by the networking service.
|
|
|
|
:param shared: indicates whether this resource is shared across all
|
|
projects. By default, only administrative users can change this value.
|
|
If True and admin_client attribute is not None, then the network is
|
|
created under administrative project.
|
|
|
|
:param provider_network_type: the type of physical network that this
|
|
network should be mapped to. For example, 'flat', 'vlan', 'vxlan', or
|
|
'gre'. Valid values depend on a networking back-end.
|
|
|
|
:param provider_physical_network: the physical network where this
|
|
network should be implemented. The Networking API v2.0 does not provide
|
|
a way to list available physical networks. For example, the Open
|
|
vSwitch plug-in configuration file defines a symbolic name that maps to
|
|
specific bridges on each compute host.
|
|
|
|
:param provider_segmentation_id: The ID of the isolated segment on the
|
|
physical network. The network_type attribute defines the segmentation
|
|
model. For example, if the network_type value is 'vlan', this ID is a
|
|
vlan identifier. If the network_type value is 'gre', this ID is a gre
|
|
key.
|
|
|
|
:param **kwargs: extra parameters to be forwarded to network service
|
|
"""
|
|
|
|
name = (network_name or kwargs.pop('name', None) or
|
|
data_utils.rand_name('test-network-'))
|
|
|
|
# translate convenience parameters
|
|
admin_client_required = False
|
|
if provider_network_type:
|
|
admin_client_required = True
|
|
kwargs['provider:network_type'] = provider_network_type
|
|
if provider_physical_network:
|
|
admin_client_required = True
|
|
kwargs['provider:physical_network'] = provider_physical_network
|
|
if provider_segmentation_id:
|
|
admin_client_required = True
|
|
kwargs['provider:segmentation_id'] = provider_segmentation_id
|
|
if external is not None:
|
|
admin_client_required = True
|
|
kwargs['router:external'] = bool(external)
|
|
if shared is not None:
|
|
admin_client_required = True
|
|
kwargs['shared'] = bool(shared)
|
|
|
|
if not client:
|
|
if admin_client_required and cls.admin_client:
|
|
# For convenience silently switch to admin client
|
|
client = cls.admin_client
|
|
if not shared:
|
|
# Keep this network visible from current project
|
|
project_id = (kwargs.get('project_id') or
|
|
kwargs.get('tenant_id') or
|
|
cls.client.tenant_id)
|
|
kwargs.update(project_id=project_id, tenant_id=project_id)
|
|
else:
|
|
# Use default client
|
|
client = cls.client
|
|
|
|
network = client.create_network(name=name, **kwargs)['network']
|
|
network['client'] = client
|
|
cls.networks.append(network)
|
|
return network
|
|
|
|
@classmethod
|
|
def delete_network(cls, network, client=None):
|
|
client = client or network.get('client') or cls.client
|
|
client.delete_network(network['id'])
|
|
|
|
@classmethod
|
|
def create_shared_network(cls, network_name=None, **kwargs):
|
|
return cls.create_network(name=network_name, shared=True, **kwargs)
|
|
|
|
@classmethod
|
|
def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
|
|
ip_version=None, client=None, reserve_cidr=True,
|
|
**kwargs):
|
|
"""Wrapper utility that returns a test subnet.
|
|
|
|
Convenient wrapper for client.create_subnet method. It reserves and
|
|
allocates CIDRs to avoid creating overlapping subnets.
|
|
|
|
:param network: network where to create the subnet
|
|
network['id'] must contain the ID of the network
|
|
|
|
:param gateway: gateway IP address
|
|
It can be a str or a netaddr.IPAddress
|
|
If gateway is not given, then it will use default address for
|
|
given subnet CIDR, like "192.168.0.1" for "192.168.0.0/24" CIDR
|
|
if gateway is given as None then no gateway will be assigned
|
|
|
|
:param cidr: CIDR of the subnet to create
|
|
It can be either None, a str or a netaddr.IPNetwork instance
|
|
|
|
:param mask_bits: CIDR prefix length
|
|
It can be either None or a numeric value.
|
|
If cidr parameter is given then mask_bits is used to determinate a
|
|
sequence of valid CIDR to use as generated.
|
|
Please see netaddr.IPNetwork.subnet method documentation[1]
|
|
|
|
:param ip_version: ip version of generated subnet CIDRs
|
|
It can be None, IP_VERSION_4 or IP_VERSION_6
|
|
It has to match given either given CIDR and gateway
|
|
|
|
:param ip_version: numeric value (either IP_VERSION_4 or IP_VERSION_6)
|
|
this value must match CIDR and gateway IP versions if any of them is
|
|
given
|
|
|
|
:param client: client to be used to connect to network service
|
|
|
|
:param reserve_cidr: if True then it reserves assigned CIDR to avoid
|
|
using the same CIDR for further subnets in the scope of the same
|
|
test case class
|
|
|
|
:param **kwargs: optional parameters to be forwarded to wrapped method
|
|
|
|
[1] http://netaddr.readthedocs.io/en/latest/tutorial_01.html#supernets-and-subnets # noqa
|
|
"""
|
|
|
|
# allow tests to use admin client
|
|
if not client:
|
|
client = cls.client
|
|
|
|
if gateway:
|
|
gateway_ip = netaddr.IPAddress(gateway)
|
|
if ip_version:
|
|
if ip_version != gateway_ip.version:
|
|
raise ValueError(
|
|
"Gateway IP version doesn't match IP version")
|
|
else:
|
|
ip_version = gateway_ip.version
|
|
else:
|
|
ip_version = ip_version or cls._ip_version
|
|
|
|
for subnet_cidr in cls.get_subnet_cidrs(
|
|
ip_version=ip_version, cidr=cidr, mask_bits=mask_bits):
|
|
if gateway is not None:
|
|
kwargs['gateway_ip'] = str(gateway or (subnet_cidr.ip + 1))
|
|
else:
|
|
kwargs['gateway_ip'] = None
|
|
try:
|
|
body = client.create_subnet(
|
|
network_id=network['id'],
|
|
cidr=str(subnet_cidr),
|
|
ip_version=subnet_cidr.version,
|
|
**kwargs)
|
|
break
|
|
except lib_exc.BadRequest as e:
|
|
if 'overlaps with another subnet' not in str(e):
|
|
raise
|
|
else:
|
|
message = 'Available CIDR for subnet creation could not be found'
|
|
raise ValueError(message)
|
|
subnet = body['subnet']
|
|
if client is cls.client:
|
|
cls.subnets.append(subnet)
|
|
else:
|
|
cls.admin_subnets.append(subnet)
|
|
if reserve_cidr:
|
|
cls.reserve_subnet_cidr(subnet_cidr)
|
|
return subnet
|
|
|
|
@classmethod
|
|
def reserve_subnet_cidr(cls, addr, **ipnetwork_kwargs):
|
|
"""Reserve given subnet CIDR making sure it is not used by create_subnet
|
|
|
|
:param addr: the CIDR address to be reserved
|
|
It can be a str or netaddr.IPNetwork instance
|
|
|
|
:param **ipnetwork_kwargs: optional netaddr.IPNetwork constructor
|
|
parameters
|
|
"""
|
|
|
|
if not cls.try_reserve_subnet_cidr(addr, **ipnetwork_kwargs):
|
|
raise ValueError('Subnet CIDR already reserved: %r'.format(
|
|
addr))
|
|
|
|
@classmethod
|
|
def try_reserve_subnet_cidr(cls, addr, **ipnetwork_kwargs):
|
|
"""Reserve given subnet CIDR if it hasn't been reserved before
|
|
|
|
:param addr: the CIDR address to be reserved
|
|
It can be a str or netaddr.IPNetwork instance
|
|
|
|
:param **ipnetwork_kwargs: optional netaddr.IPNetwork constructor
|
|
parameters
|
|
|
|
:return: True if it wasn't reserved before, False elsewhere.
|
|
"""
|
|
|
|
subnet_cidr = netaddr.IPNetwork(addr, **ipnetwork_kwargs)
|
|
if subnet_cidr in cls.reserved_subnet_cidrs:
|
|
return False
|
|
else:
|
|
cls.reserved_subnet_cidrs.add(subnet_cidr)
|
|
return True
|
|
|
|
@classmethod
|
|
def get_subnet_cidrs(
|
|
cls, cidr=None, mask_bits=None, ip_version=None):
|
|
"""Iterate over a sequence of unused subnet CIDR for IP version
|
|
|
|
:param cidr: CIDR of the subnet to create
|
|
It can be either None, a str or a netaddr.IPNetwork instance
|
|
|
|
:param mask_bits: CIDR prefix length
|
|
It can be either None or a numeric value.
|
|
If cidr parameter is given then mask_bits is used to determinate a
|
|
sequence of valid CIDR to use as generated.
|
|
Please see netaddr.IPNetwork.subnet method documentation[1]
|
|
|
|
:param ip_version: ip version of generated subnet CIDRs
|
|
It can be None, IP_VERSION_4 or IP_VERSION_6
|
|
It has to match given CIDR if given
|
|
|
|
:return: iterator over reserved CIDRs of type netaddr.IPNetwork
|
|
|
|
[1] http://netaddr.readthedocs.io/en/latest/tutorial_01.html#supernets-and-subnets # noqa
|
|
"""
|
|
|
|
if cidr:
|
|
# Generate subnet CIDRs starting from given CIDR
|
|
# checking it is of requested IP version
|
|
cidr = netaddr.IPNetwork(cidr, version=ip_version)
|
|
else:
|
|
# Generate subnet CIDRs starting from configured values
|
|
ip_version = ip_version or cls._ip_version
|
|
if ip_version == const.IP_VERSION_4:
|
|
mask_bits = mask_bits or config.safe_get_config_value(
|
|
'network', 'project_network_mask_bits')
|
|
cidr = netaddr.IPNetwork(config.safe_get_config_value(
|
|
'network', 'project_network_cidr'))
|
|
elif ip_version == const.IP_VERSION_6:
|
|
mask_bits = config.safe_get_config_value(
|
|
'network', 'project_network_v6_mask_bits')
|
|
cidr = netaddr.IPNetwork(config.safe_get_config_value(
|
|
'network', 'project_network_v6_cidr'))
|
|
else:
|
|
raise ValueError('Invalid IP version: {!r}'.format(ip_version))
|
|
|
|
if mask_bits:
|
|
subnet_cidrs = cidr.subnet(mask_bits)
|
|
else:
|
|
subnet_cidrs = iter([cidr])
|
|
|
|
for subnet_cidr in subnet_cidrs:
|
|
if subnet_cidr not in cls.reserved_subnet_cidrs:
|
|
yield subnet_cidr
|
|
|
|
@classmethod
|
|
def create_port(cls, network, **kwargs):
|
|
"""Wrapper utility that returns a test port."""
|
|
if CONF.network.port_vnic_type and 'binding:vnic_type' not in kwargs:
|
|
kwargs['binding:vnic_type'] = CONF.network.port_vnic_type
|
|
body = cls.client.create_port(network_id=network['id'],
|
|
**kwargs)
|
|
port = body['port']
|
|
cls.ports.append(port)
|
|
return port
|
|
|
|
@classmethod
|
|
def update_port(cls, port, **kwargs):
|
|
"""Wrapper utility that updates a test port."""
|
|
body = cls.client.update_port(port['id'],
|
|
**kwargs)
|
|
return body['port']
|
|
|
|
@classmethod
|
|
def _create_router_with_client(
|
|
cls, client, router_name=None, admin_state_up=False,
|
|
external_network_id=None, enable_snat=None, **kwargs
|
|
):
|
|
ext_gw_info = {}
|
|
if external_network_id:
|
|
ext_gw_info['network_id'] = external_network_id
|
|
if enable_snat is not None:
|
|
ext_gw_info['enable_snat'] = enable_snat
|
|
body = client.create_router(
|
|
router_name, external_gateway_info=ext_gw_info,
|
|
admin_state_up=admin_state_up, **kwargs)
|
|
router = body['router']
|
|
cls.routers.append(router)
|
|
return router
|
|
|
|
@classmethod
|
|
def create_router(cls, *args, **kwargs):
|
|
return cls._create_router_with_client(cls.client, *args, **kwargs)
|
|
|
|
@classmethod
|
|
def create_admin_router(cls, *args, **kwargs):
|
|
return cls._create_router_with_client(cls.os_admin.network_client,
|
|
*args, **kwargs)
|
|
|
|
@classmethod
|
|
def create_floatingip(cls, external_network_id=None, port=None,
|
|
client=None, **kwargs):
|
|
"""Creates a floating IP.
|
|
|
|
Create a floating IP and schedule it for later deletion.
|
|
If a client is passed, then it is used for deleting the IP too.
|
|
|
|
:param external_network_id: network ID where to create
|
|
By default this is 'CONF.network.public_network_id'.
|
|
|
|
:param port: port to bind floating IP to
|
|
This is translated to 'port_id=port['id']'
|
|
By default it is None.
|
|
|
|
:param client: network client to be used for creating and cleaning up
|
|
the floating IP.
|
|
|
|
:param **kwargs: additional creation parameters to be forwarded to
|
|
networking server.
|
|
"""
|
|
|
|
client = client or cls.client
|
|
external_network_id = (external_network_id or
|
|
cls.external_network_id)
|
|
|
|
if port:
|
|
port_id = kwargs.setdefault('port_id', port['id'])
|
|
if port_id != port['id']:
|
|
message = "Port ID specified twice: {!s} != {!s}".format(
|
|
port_id, port['id'])
|
|
raise ValueError(message)
|
|
|
|
fip = client.create_floatingip(external_network_id,
|
|
**kwargs)['floatingip']
|
|
|
|
# save client to be used later in cls.delete_floatingip
|
|
# for final cleanup
|
|
fip['client'] = client
|
|
cls.floating_ips.append(fip)
|
|
return fip
|
|
|
|
@classmethod
|
|
def delete_floatingip(cls, floating_ip, client=None):
|
|
"""Delete floating IP
|
|
|
|
:param client: Client to be used
|
|
If client is not given it will use the client used to create
|
|
the floating IP, or cls.client if unknown.
|
|
"""
|
|
|
|
client = client or floating_ip.get('client') or cls.client
|
|
client.delete_floatingip(floating_ip['id'])
|
|
|
|
@classmethod
|
|
def create_port_forwarding(cls, fip_id, internal_port_id,
|
|
internal_port, external_port,
|
|
internal_ip_address=None, protocol="tcp",
|
|
client=None):
|
|
"""Creates a port forwarding.
|
|
|
|
Create a port forwarding and schedule it for later deletion.
|
|
If a client is passed, then it is used for deleting the PF too.
|
|
|
|
:param fip_id: The ID of the floating IP address.
|
|
|
|
:param internal_port_id: The ID of the Neutron port associated to
|
|
the floating IP port forwarding.
|
|
|
|
:param internal_port: The TCP/UDP/other protocol port number of the
|
|
Neutron port fixed IP address associated to the floating ip
|
|
port forwarding.
|
|
|
|
:param external_port: The TCP/UDP/other protocol port number of
|
|
the port forwarding floating IP address.
|
|
|
|
:param internal_ip_address: The fixed IPv4 address of the Neutron
|
|
port associated to the floating IP port forwarding.
|
|
|
|
:param protocol: The IP protocol used in the floating IP port
|
|
forwarding.
|
|
|
|
:param client: network client to be used for creating and cleaning up
|
|
the floating IP port forwarding.
|
|
"""
|
|
|
|
client = client or cls.client
|
|
|
|
pf = client.create_port_forwarding(
|
|
fip_id, internal_port_id, internal_port, external_port,
|
|
internal_ip_address, protocol)['port_forwarding']
|
|
|
|
# save ID of floating IP associated with port forwarding for final
|
|
# cleanup
|
|
pf['floatingip_id'] = fip_id
|
|
|
|
# save client to be used later in cls.delete_port_forwarding
|
|
# for final cleanup
|
|
pf['client'] = client
|
|
cls.port_forwardings.append(pf)
|
|
return pf
|
|
|
|
@classmethod
|
|
def delete_port_forwarding(cls, pf, client=None):
|
|
"""Delete port forwarding
|
|
|
|
:param client: Client to be used
|
|
If client is not given it will use the client used to create
|
|
the port forwarding, or cls.client if unknown.
|
|
"""
|
|
|
|
client = client or pf.get('client') or cls.client
|
|
client.delete_port_forwarding(pf['floatingip_id'], pf['id'])
|
|
|
|
@classmethod
|
|
def create_router_interface(cls, router_id, subnet_id):
|
|
"""Wrapper utility that returns a router interface."""
|
|
interface = cls.client.add_router_interface_with_subnet_id(
|
|
router_id, subnet_id)
|
|
return interface
|
|
|
|
@classmethod
|
|
def add_extra_routes_atomic(cls, *args, **kwargs):
|
|
return cls.client.add_extra_routes_atomic(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def remove_extra_routes_atomic(cls, *args, **kwargs):
|
|
return cls.client.remove_extra_routes_atomic(*args, **kwargs)
|
|
|
|
@classmethod
|
|
def get_supported_qos_rule_types(cls):
|
|
body = cls.client.list_qos_rule_types()
|
|
return [rule_type['type'] for rule_type in body['rule_types']]
|
|
|
|
@classmethod
|
|
def create_qos_policy(cls, name, description=None, shared=False,
|
|
project_id=None, is_default=False):
|
|
"""Wrapper utility that returns a test QoS policy."""
|
|
body = cls.admin_client.create_qos_policy(
|
|
name, description, shared, project_id, is_default)
|
|
qos_policy = body['policy']
|
|
cls.qos_policies.append(qos_policy)
|
|
return qos_policy
|
|
|
|
@classmethod
|
|
def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
|
|
max_burst_kbps,
|
|
direction=const.EGRESS_DIRECTION):
|
|
"""Wrapper utility that returns a test QoS bandwidth limit rule."""
|
|
body = cls.admin_client.create_bandwidth_limit_rule(
|
|
policy_id, max_kbps, max_burst_kbps, direction)
|
|
qos_rule = body['bandwidth_limit_rule']
|
|
cls.qos_rules.append(qos_rule)
|
|
return qos_rule
|
|
|
|
@classmethod
|
|
def create_qos_minimum_bandwidth_rule(cls, policy_id, min_kbps,
|
|
direction=const.EGRESS_DIRECTION):
|
|
"""Wrapper utility that creates and returns a QoS min bw rule."""
|
|
body = cls.admin_client.create_minimum_bandwidth_rule(
|
|
policy_id, direction, min_kbps)
|
|
qos_rule = body['minimum_bandwidth_rule']
|
|
cls.qos_rules.append(qos_rule)
|
|
return qos_rule
|
|
|
|
@classmethod
|
|
def delete_router(cls, router, client=None):
|
|
client = client or cls.client
|
|
if 'routes' in router:
|
|
client.remove_router_extra_routes(router['id'])
|
|
body = client.list_router_interfaces(router['id'])
|
|
interfaces = [port for port in body['ports']
|
|
if port['device_owner'] in const.ROUTER_INTERFACE_OWNERS]
|
|
for i in interfaces:
|
|
try:
|
|
client.remove_router_interface_with_subnet_id(
|
|
router['id'], i['fixed_ips'][0]['subnet_id'])
|
|
except lib_exc.NotFound:
|
|
pass
|
|
client.delete_router(router['id'])
|
|
|
|
@classmethod
|
|
def create_address_scope(cls, name, is_admin=False, **kwargs):
|
|
if is_admin:
|
|
body = cls.admin_client.create_address_scope(name=name, **kwargs)
|
|
cls.admin_address_scopes.append(body['address_scope'])
|
|
else:
|
|
body = cls.client.create_address_scope(name=name, **kwargs)
|
|
cls.address_scopes.append(body['address_scope'])
|
|
return body['address_scope']
|
|
|
|
@classmethod
|
|
def create_subnetpool(cls, name, is_admin=False, client=None, **kwargs):
|
|
if client is None:
|
|
client = cls.admin_client if is_admin else cls.client
|
|
|
|
if is_admin:
|
|
body = client.create_subnetpool(name, **kwargs)
|
|
cls.admin_subnetpools.append(body['subnetpool'])
|
|
else:
|
|
body = client.create_subnetpool(name, **kwargs)
|
|
cls.subnetpools.append(body['subnetpool'])
|
|
return body['subnetpool']
|
|
|
|
@classmethod
|
|
def create_project(cls, name=None, description=None):
|
|
test_project = name or data_utils.rand_name('test_project_')
|
|
test_description = description or data_utils.rand_name('desc_')
|
|
project = cls.identity_admin_client.create_project(
|
|
name=test_project,
|
|
description=test_description)['project']
|
|
cls.projects.append(project)
|
|
# Create a project will create a default security group.
|
|
sgs_list = cls.admin_client.list_security_groups(
|
|
tenant_id=project['id'])['security_groups']
|
|
for security_group in sgs_list:
|
|
# Make sure delete_security_group method will use
|
|
# the admin client for this group
|
|
security_group['client'] = cls.admin_client
|
|
cls.security_groups.append(security_group)
|
|
return project
|
|
|
|
@classmethod
|
|
def create_security_group(cls, name=None, project=None, client=None,
|
|
**kwargs):
|
|
if project:
|
|
client = client or cls.admin_client
|
|
project_id = kwargs.setdefault('project_id', project['id'])
|
|
tenant_id = kwargs.setdefault('tenant_id', project['id'])
|
|
if project_id != project['id'] or tenant_id != project['id']:
|
|
raise ValueError('Project ID specified multiple times')
|
|
else:
|
|
client = client or cls.client
|
|
|
|
name = name or data_utils.rand_name(cls.__name__)
|
|
security_group = client.create_security_group(name=name, **kwargs)[
|
|
'security_group']
|
|
security_group['client'] = client
|
|
cls.security_groups.append(security_group)
|
|
return security_group
|
|
|
|
@classmethod
|
|
def delete_security_group(cls, security_group, client=None):
|
|
client = client or security_group.get('client') or cls.client
|
|
client.delete_security_group(security_group['id'])
|
|
|
|
@classmethod
|
|
def create_security_group_rule(cls, security_group=None, project=None,
|
|
client=None, ip_version=None, **kwargs):
|
|
if project:
|
|
client = client or cls.admin_client
|
|
project_id = kwargs.setdefault('project_id', project['id'])
|
|
tenant_id = kwargs.setdefault('tenant_id', project['id'])
|
|
if project_id != project['id'] or tenant_id != project['id']:
|
|
raise ValueError('Project ID specified multiple times')
|
|
|
|
if 'security_group_id' not in kwargs:
|
|
security_group = (security_group or
|
|
cls.get_security_group(client=client))
|
|
|
|
if security_group:
|
|
client = client or security_group.get('client')
|
|
security_group_id = kwargs.setdefault('security_group_id',
|
|
security_group['id'])
|
|
if security_group_id != security_group['id']:
|
|
raise ValueError('Security group ID specified multiple times.')
|
|
|
|
ip_version = ip_version or cls._ip_version
|
|
default_params = (
|
|
constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
|
|
for key, value in default_params.items():
|
|
kwargs.setdefault(key, value)
|
|
|
|
client = client or cls.client
|
|
return client.create_security_group_rule(**kwargs)[
|
|
'security_group_rule']
|
|
|
|
@classmethod
|
|
def get_security_group(cls, name='default', client=None):
|
|
client = client or cls.client
|
|
security_groups = client.list_security_groups()['security_groups']
|
|
for security_group in security_groups:
|
|
if security_group['name'] == name:
|
|
return security_group
|
|
raise ValueError("No such security group named {!r}".format(name))
|
|
|
|
@classmethod
|
|
def create_keypair(cls, client=None, name=None, **kwargs):
|
|
client = client or cls.os_primary.keypairs_client
|
|
name = name or data_utils.rand_name('keypair-test')
|
|
keypair = client.create_keypair(name=name, **kwargs)['keypair']
|
|
|
|
# save client for later cleanup
|
|
keypair['client'] = client
|
|
cls.keypairs.append(keypair)
|
|
return keypair
|
|
|
|
@classmethod
|
|
def delete_keypair(cls, keypair, client=None):
|
|
client = (client or keypair.get('client') or
|
|
cls.os_primary.keypairs_client)
|
|
client.delete_keypair(keypair_name=keypair['name'])
|
|
|
|
@classmethod
|
|
def create_trunk(cls, port=None, subports=None, client=None, **kwargs):
|
|
"""Create network trunk
|
|
|
|
:param port: dictionary containing parent port ID (port['id'])
|
|
:param client: client to be used for connecting to networking service
|
|
:param **kwargs: extra parameters to be forwarded to network service
|
|
|
|
:returns: dictionary containing created trunk details
|
|
"""
|
|
client = client or cls.client
|
|
|
|
if port:
|
|
kwargs['port_id'] = port['id']
|
|
|
|
trunk = client.create_trunk(subports=subports, **kwargs)['trunk']
|
|
# Save client reference for later deletion
|
|
trunk['client'] = client
|
|
cls.trunks.append(trunk)
|
|
return trunk
|
|
|
|
@classmethod
|
|
def delete_trunk(cls, trunk, client=None, detach_parent_port=True):
|
|
"""Delete network trunk
|
|
|
|
:param trunk: dictionary containing trunk ID (trunk['id'])
|
|
|
|
:param client: client to be used for connecting to networking service
|
|
"""
|
|
client = client or trunk.get('client') or cls.client
|
|
trunk.update(client.show_trunk(trunk['id'])['trunk'])
|
|
|
|
if not trunk['admin_state_up']:
|
|
# Cannot touch trunk before admin_state_up is True
|
|
client.update_trunk(trunk['id'], admin_state_up=True)
|
|
if trunk['sub_ports']:
|
|
# Removes trunk ports before deleting it
|
|
cls._try_delete_resource(client.remove_subports, trunk['id'],
|
|
trunk['sub_ports'])
|
|
|
|
# we have to detach the interface from the server before
|
|
# the trunk can be deleted.
|
|
parent_port = {'id': trunk['port_id']}
|
|
|
|
def is_parent_port_detached():
|
|
parent_port.update(client.show_port(parent_port['id'])['port'])
|
|
return not parent_port['device_id']
|
|
|
|
if detach_parent_port and not is_parent_port_detached():
|
|
# this could probably happen when trunk is deleted and parent port
|
|
# has been assigned to a VM that is still running. Here we are
|
|
# assuming that device_id points to such VM.
|
|
cls.os_primary.compute.InterfacesClient().delete_interface(
|
|
parent_port['device_id'], parent_port['id'])
|
|
utils.wait_until_true(is_parent_port_detached)
|
|
|
|
client.delete_trunk(trunk['id'])
|
|
|
|
@classmethod
|
|
def create_conntrack_helper(cls, router_id, helper, protocol, port,
|
|
client=None):
|
|
"""Create a conntrack helper
|
|
|
|
Create a conntrack helper and schedule it for later deletion. If a
|
|
client is passed, then it is used for deleteing the CTH too.
|
|
|
|
:param router_id: The ID of the Neutron router associated to the
|
|
conntrack helper.
|
|
|
|
:param helper: The conntrack helper module alias
|
|
|
|
:param protocol: The conntrack helper IP protocol used in the conntrack
|
|
helper.
|
|
|
|
:param port: The conntrack helper IP protocol port number for the
|
|
conntrack helper.
|
|
|
|
:param client: network client to be used for creating and cleaning up
|
|
the conntrack helper.
|
|
"""
|
|
|
|
client = client or cls.client
|
|
|
|
cth = client.create_conntrack_helper(router_id, helper, protocol,
|
|
port)['conntrack_helper']
|
|
|
|
# save ID of router associated with conntrack helper for final cleanup
|
|
cth['router_id'] = router_id
|
|
|
|
# save client to be used later in cls.delete_conntrack_helper for final
|
|
# cleanup
|
|
cth['client'] = client
|
|
cls.conntrack_helpers.append(cth)
|
|
return cth
|
|
|
|
@classmethod
|
|
def delete_conntrack_helper(cls, cth, client=None):
|
|
"""Delete conntrack helper
|
|
|
|
:param client: Client to be used
|
|
If client is not given it will use the client used to create the
|
|
conntrack helper, or cls.client if unknown.
|
|
"""
|
|
|
|
client = client or cth.get('client') or cls.client
|
|
client.delete_conntrack_helper(cth['router_id'], cth['id'])
|
|
|
|
|
|
class BaseAdminNetworkTest(BaseNetworkTest):
|
|
|
|
credentials = ['primary', 'admin']
|
|
|
|
@classmethod
|
|
def setup_clients(cls):
|
|
super(BaseAdminNetworkTest, cls).setup_clients()
|
|
cls.admin_client = cls.os_admin.network_client
|
|
cls.identity_admin_client = cls.os_admin.projects_client
|
|
|
|
@classmethod
|
|
def create_metering_label(cls, name, description):
|
|
"""Wrapper utility that returns a test metering label."""
|
|
body = cls.admin_client.create_metering_label(
|
|
description=description,
|
|
name=data_utils.rand_name("metering-label"))
|
|
metering_label = body['metering_label']
|
|
cls.metering_labels.append(metering_label)
|
|
return metering_label
|
|
|
|
@classmethod
|
|
def create_metering_label_rule(cls, remote_ip_prefix, direction,
|
|
metering_label_id):
|
|
"""Wrapper utility that returns a test metering label rule."""
|
|
body = cls.admin_client.create_metering_label_rule(
|
|
remote_ip_prefix=remote_ip_prefix, direction=direction,
|
|
metering_label_id=metering_label_id)
|
|
metering_label_rule = body['metering_label_rule']
|
|
cls.metering_label_rules.append(metering_label_rule)
|
|
return metering_label_rule
|
|
|
|
@classmethod
|
|
def create_network_segment_range(cls, name, shared,
|
|
project_id, network_type,
|
|
physical_network, minimum,
|
|
maximum):
|
|
"""Wrapper utility that returns a test network segment range."""
|
|
network_segment_range_args = {'name': name,
|
|
'shared': shared,
|
|
'project_id': project_id,
|
|
'network_type': network_type,
|
|
'physical_network': physical_network,
|
|
'minimum': minimum,
|
|
'maximum': maximum}
|
|
body = cls.admin_client.create_network_segment_range(
|
|
**network_segment_range_args)
|
|
network_segment_range = body['network_segment_range']
|
|
cls.network_segment_ranges.append(network_segment_range)
|
|
return network_segment_range
|
|
|
|
@classmethod
|
|
def create_flavor(cls, name, description, service_type):
|
|
"""Wrapper utility that returns a test flavor."""
|
|
body = cls.admin_client.create_flavor(
|
|
description=description, service_type=service_type,
|
|
name=name)
|
|
flavor = body['flavor']
|
|
cls.flavors.append(flavor)
|
|
return flavor
|
|
|
|
@classmethod
|
|
def create_service_profile(cls, description, metainfo, driver):
|
|
"""Wrapper utility that returns a test service profile."""
|
|
body = cls.admin_client.create_service_profile(
|
|
driver=driver, metainfo=metainfo, description=description)
|
|
service_profile = body['service_profile']
|
|
cls.service_profiles.append(service_profile)
|
|
return service_profile
|
|
|
|
@classmethod
|
|
def create_log(cls, name, description=None,
|
|
resource_type='security_group', resource_id=None,
|
|
target_id=None, event='ALL', enabled=True):
|
|
"""Wrapper utility that returns a test log object."""
|
|
log_args = {'name': name,
|
|
'description': description,
|
|
'resource_type': resource_type,
|
|
'resource_id': resource_id,
|
|
'target_id': target_id,
|
|
'event': event,
|
|
'enabled': enabled}
|
|
body = cls.admin_client.create_log(**log_args)
|
|
log_object = body['log']
|
|
cls.log_objects.append(log_object)
|
|
return log_object
|
|
|
|
@classmethod
|
|
def get_unused_ip(cls, net_id, ip_version=None):
|
|
"""Get an unused ip address in a allocation pool of net"""
|
|
body = cls.admin_client.list_ports(network_id=net_id)
|
|
ports = body['ports']
|
|
used_ips = []
|
|
for port in ports:
|
|
used_ips.extend(
|
|
[fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
|
|
body = cls.admin_client.list_subnets(network_id=net_id)
|
|
subnets = body['subnets']
|
|
|
|
for subnet in subnets:
|
|
if ip_version and subnet['ip_version'] != ip_version:
|
|
continue
|
|
cidr = subnet['cidr']
|
|
allocation_pools = subnet['allocation_pools']
|
|
iterators = []
|
|
if allocation_pools:
|
|
for allocation_pool in allocation_pools:
|
|
iterators.append(netaddr.iter_iprange(
|
|
allocation_pool['start'], allocation_pool['end']))
|
|
else:
|
|
net = netaddr.IPNetwork(cidr)
|
|
|
|
def _iterip():
|
|
for ip in net:
|
|
if ip not in (net.network, net.broadcast):
|
|
yield ip
|
|
iterators.append(iter(_iterip()))
|
|
|
|
for iterator in iterators:
|
|
for ip in iterator:
|
|
if str(ip) not in used_ips:
|
|
return str(ip)
|
|
|
|
message = (
|
|
"net(%s) has no usable IP address in allocation pools" % net_id)
|
|
raise exceptions.InvalidConfiguration(message)
|
|
|
|
@classmethod
|
|
def create_provider_network(cls, physnet_name, start_segmentation_id,
|
|
max_attempts=30):
|
|
segmentation_id = start_segmentation_id
|
|
for attempts in range(max_attempts):
|
|
try:
|
|
return cls.create_network(
|
|
name=data_utils.rand_name('test_net'),
|
|
shared=True,
|
|
provider_network_type='vlan',
|
|
provider_physical_network=physnet_name,
|
|
provider_segmentation_id=segmentation_id)
|
|
except lib_exc.Conflict:
|
|
segmentation_id += 1
|
|
if segmentation_id > 4095:
|
|
raise lib_exc.TempestException(
|
|
"No free segmentation id was found for provider "
|
|
"network creation!")
|
|
time.sleep(CONF.network.build_interval)
|
|
LOG.exception("Failed to create provider network after "
|
|
"%d attempts", max_attempts)
|
|
raise lib_exc.TimeoutException
|
|
|
|
|
|
def require_qos_rule_type(rule_type):
|
|
def decorator(f):
|
|
@functools.wraps(f)
|
|
def wrapper(self, *func_args, **func_kwargs):
|
|
if rule_type not in self.get_supported_qos_rule_types():
|
|
raise self.skipException(
|
|
"%s rule type is required." % rule_type)
|
|
return f(self, *func_args, **func_kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def _require_sorting(f):
|
|
@functools.wraps(f)
|
|
def inner(self, *args, **kwargs):
|
|
if not tutils.is_extension_enabled("sorting", "network"):
|
|
self.skipTest('Sorting feature is required')
|
|
return f(self, *args, **kwargs)
|
|
return inner
|
|
|
|
|
|
def _require_pagination(f):
|
|
@functools.wraps(f)
|
|
def inner(self, *args, **kwargs):
|
|
if not tutils.is_extension_enabled("pagination", "network"):
|
|
self.skipTest('Pagination feature is required')
|
|
return f(self, *args, **kwargs)
|
|
return inner
|
|
|
|
|
|
class BaseSearchCriteriaTest(BaseNetworkTest):
|
|
|
|
# This should be defined by subclasses to reflect resource name to test
|
|
resource = None
|
|
|
|
field = 'name'
|
|
|
|
# NOTE(ihrachys): some names, like those starting with an underscore (_)
|
|
# are sorted differently depending on whether the plugin implements native
|
|
# sorting support, or not. So we avoid any such cases here, sticking to
|
|
# alphanumeric. Also test a case when there are multiple resources with the
|
|
# same name
|
|
resource_names = ('test1', 'abc1', 'test10', '123test') + ('test1',)
|
|
|
|
force_tenant_isolation = True
|
|
|
|
list_kwargs = {}
|
|
|
|
list_as_admin = False
|
|
|
|
def assertSameOrder(self, original, actual):
|
|
# gracefully handle iterators passed
|
|
original = list(original)
|
|
actual = list(actual)
|
|
self.assertEqual(len(original), len(actual))
|
|
for expected, res in zip(original, actual):
|
|
self.assertEqual(expected[self.field], res[self.field])
|
|
|
|
@utils.classproperty
|
|
def plural_name(self):
|
|
return '%ss' % self.resource
|
|
|
|
@property
|
|
def list_client(self):
|
|
return self.admin_client if self.list_as_admin else self.client
|
|
|
|
def list_method(self, *args, **kwargs):
|
|
method = getattr(self.list_client, 'list_%s' % self.plural_name)
|
|
kwargs.update(self.list_kwargs)
|
|
return method(*args, **kwargs)
|
|
|
|
def get_bare_url(self, url):
|
|
base_url = self.client.base_url
|
|
base_url_normalized = utils.normalize_url(base_url)
|
|
url_normalized = utils.normalize_url(url)
|
|
self.assertTrue(url_normalized.startswith(base_url_normalized))
|
|
return url_normalized[len(base_url_normalized):]
|
|
|
|
@classmethod
|
|
def _extract_resources(cls, body):
|
|
return body[cls.plural_name]
|
|
|
|
def _test_list_sorts(self, direction):
|
|
sort_args = {
|
|
'sort_dir': direction,
|
|
'sort_key': self.field
|
|
}
|
|
body = self.list_method(**sort_args)
|
|
resources = self._extract_resources(body)
|
|
self.assertNotEmpty(
|
|
resources, "%s list returned is empty" % self.resource)
|
|
retrieved_names = [res[self.field] for res in resources]
|
|
expected = sorted(retrieved_names)
|
|
if direction == constants.SORT_DIRECTION_DESC:
|
|
expected = list(reversed(expected))
|
|
self.assertEqual(expected, retrieved_names)
|
|
|
|
@_require_sorting
|
|
def _test_list_sorts_asc(self):
|
|
self._test_list_sorts(constants.SORT_DIRECTION_ASC)
|
|
|
|
@_require_sorting
|
|
def _test_list_sorts_desc(self):
|
|
self._test_list_sorts(constants.SORT_DIRECTION_DESC)
|
|
|
|
@_require_pagination
|
|
def _test_list_pagination(self):
|
|
for limit in range(1, len(self.resource_names) + 1):
|
|
pagination_args = {
|
|
'limit': limit,
|
|
}
|
|
body = self.list_method(**pagination_args)
|
|
resources = self._extract_resources(body)
|
|
self.assertEqual(limit, len(resources))
|
|
|
|
@_require_pagination
|
|
def _test_list_no_pagination_limit_0(self):
|
|
pagination_args = {
|
|
'limit': 0,
|
|
}
|
|
body = self.list_method(**pagination_args)
|
|
resources = self._extract_resources(body)
|
|
self.assertGreaterEqual(len(resources), len(self.resource_names))
|
|
|
|
def _test_list_pagination_iteratively(self, lister):
|
|
# first, collect all resources for later comparison
|
|
sort_args = {
|
|
'sort_dir': constants.SORT_DIRECTION_ASC,
|
|
'sort_key': self.field
|
|
}
|
|
body = self.list_method(**sort_args)
|
|
expected_resources = self._extract_resources(body)
|
|
self.assertNotEmpty(expected_resources)
|
|
|
|
resources = lister(
|
|
len(expected_resources), sort_args
|
|
)
|
|
|
|
# finally, compare that the list retrieved in one go is identical to
|
|
# the one containing pagination results
|
|
self.assertSameOrder(expected_resources, resources)
|
|
|
|
def _list_all_with_marker(self, niterations, sort_args):
|
|
# paginate resources one by one, using last fetched resource as a
|
|
# marker
|
|
resources = []
|
|
for i in range(niterations):
|
|
pagination_args = sort_args.copy()
|
|
pagination_args['limit'] = 1
|
|
if resources:
|
|
pagination_args['marker'] = resources[-1]['id']
|
|
body = self.list_method(**pagination_args)
|
|
resources_ = self._extract_resources(body)
|
|
self.assertEqual(1, len(resources_))
|
|
resources.extend(resources_)
|
|
return resources
|
|
|
|
@_require_pagination
|
|
@_require_sorting
|
|
def _test_list_pagination_with_marker(self):
|
|
self._test_list_pagination_iteratively(self._list_all_with_marker)
|
|
|
|
def _list_all_with_hrefs(self, niterations, sort_args):
|
|
# paginate resources one by one, using next href links
|
|
resources = []
|
|
prev_links = {}
|
|
|
|
for i in range(niterations):
|
|
if prev_links:
|
|
uri = self.get_bare_url(prev_links['next'])
|
|
else:
|
|
sort_args.update(self.list_kwargs)
|
|
uri = self.list_client.build_uri(
|
|
self.plural_name, limit=1, **sort_args)
|
|
prev_links, body = self.list_client.get_uri_with_links(
|
|
self.plural_name, uri
|
|
)
|
|
resources_ = self._extract_resources(body)
|
|
self.assertEqual(1, len(resources_))
|
|
resources.extend(resources_)
|
|
|
|
# The last element is empty and does not contain 'next' link
|
|
uri = self.get_bare_url(prev_links['next'])
|
|
prev_links, body = self.client.get_uri_with_links(
|
|
self.plural_name, uri
|
|
)
|
|
self.assertNotIn('next', prev_links)
|
|
|
|
# Now walk backwards and compare results
|
|
resources2 = []
|
|
for i in range(niterations):
|
|
uri = self.get_bare_url(prev_links['previous'])
|
|
prev_links, body = self.list_client.get_uri_with_links(
|
|
self.plural_name, uri
|
|
)
|
|
resources_ = self._extract_resources(body)
|
|
self.assertEqual(1, len(resources_))
|
|
resources2.extend(resources_)
|
|
|
|
self.assertSameOrder(resources, reversed(resources2))
|
|
|
|
return resources
|
|
|
|
@_require_pagination
|
|
@_require_sorting
|
|
def _test_list_pagination_with_href_links(self):
|
|
self._test_list_pagination_iteratively(self._list_all_with_hrefs)
|
|
|
|
@_require_pagination
|
|
@_require_sorting
|
|
def _test_list_pagination_page_reverse_with_href_links(
|
|
self, direction=constants.SORT_DIRECTION_ASC):
|
|
pagination_args = {
|
|
'sort_dir': direction,
|
|
'sort_key': self.field,
|
|
}
|
|
body = self.list_method(**pagination_args)
|
|
expected_resources = self._extract_resources(body)
|
|
|
|
page_size = 2
|
|
pagination_args['limit'] = page_size
|
|
|
|
prev_links = {}
|
|
resources = []
|
|
num_resources = len(expected_resources)
|
|
niterations = int(math.ceil(float(num_resources) / page_size))
|
|
for i in range(niterations):
|
|
if prev_links:
|
|
uri = self.get_bare_url(prev_links['previous'])
|
|
else:
|
|
pagination_args.update(self.list_kwargs)
|
|
uri = self.list_client.build_uri(
|
|
self.plural_name, page_reverse=True, **pagination_args)
|
|
prev_links, body = self.list_client.get_uri_with_links(
|
|
self.plural_name, uri
|
|
)
|
|
resources_ = self._extract_resources(body)
|
|
self.assertGreaterEqual(page_size, len(resources_))
|
|
resources.extend(reversed(resources_))
|
|
|
|
self.assertSameOrder(expected_resources, reversed(resources))
|
|
|
|
@_require_pagination
|
|
@_require_sorting
|
|
def _test_list_pagination_page_reverse_asc(self):
|
|
self._test_list_pagination_page_reverse(
|
|
direction=constants.SORT_DIRECTION_ASC)
|
|
|
|
@_require_pagination
|
|
@_require_sorting
|
|
def _test_list_pagination_page_reverse_desc(self):
|
|
self._test_list_pagination_page_reverse(
|
|
direction=constants.SORT_DIRECTION_DESC)
|
|
|
|
def _test_list_pagination_page_reverse(self, direction):
|
|
pagination_args = {
|
|
'sort_dir': direction,
|
|
'sort_key': self.field,
|
|
'limit': 3,
|
|
}
|
|
body = self.list_method(**pagination_args)
|
|
expected_resources = self._extract_resources(body)
|
|
|
|
pagination_args['limit'] -= 1
|
|
pagination_args['marker'] = expected_resources[-1]['id']
|
|
pagination_args['page_reverse'] = True
|
|
body = self.list_method(**pagination_args)
|
|
|
|
self.assertSameOrder(
|
|
# the last entry is not included in 2nd result when used as a
|
|
# marker
|
|
expected_resources[:-1],
|
|
self._extract_resources(body))
|
|
|
|
@tutils.requires_ext(extension="filter-validation", service="network")
|
|
def _test_list_validation_filters(
|
|
self, validation_args, filter_is_valid=True):
|
|
if not filter_is_valid:
|
|
self.assertRaises(lib_exc.BadRequest, self.list_method,
|
|
**validation_args)
|
|
else:
|
|
body = self.list_method(**validation_args)
|
|
resources = self._extract_resources(body)
|
|
for resource in resources:
|
|
self.assertIn(resource['name'], self.resource_names)
|