be38fa94b2
The scenario tests base class from Tempest is not a stable interface and it's going to be refactored on Tempest side, as notified in http://lists.openstack.org/pipermail/openstack-dev/2017-February/112938.html Maintain a local copy of the base class, taken from Tempest with head of master at c5f1064759fe6c75a4bc5dc251ed1661845936cb. Change-Id: I39cd4db393d590a2fb7074003448209de6a9a7fb
235 lines
10 KiB
Python
235 lines
10 KiB
Python
# Copyright 2017 VMware Inc
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import collections
|
|
|
|
import netaddr
|
|
from oslo_log import log as logging
|
|
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
from tempest.lib.common.utils import test_utils
|
|
from tempest.lib import exceptions as lib_exc
|
|
|
|
from vmware_nsx_tempest.common import constants
|
|
from vmware_nsx_tempest.tests.scenario import manager
|
|
|
|
CONF = config.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class ApplianceManager(manager.NetworkScenarioTest):
|
|
server_details = collections.namedtuple('server_details',
|
|
['server', 'floating_ip',
|
|
'networks'])
|
|
|
|
def setUp(self):
|
|
super(ApplianceManager, self).setUp()
|
|
self.topology_routers = {}
|
|
self.topology_networks = {}
|
|
self.topology_subnets = {}
|
|
self.topology_servers = {}
|
|
self.topology_servers_floating_ip = []
|
|
self.topology_public_network_id = CONF.network.public_network_id
|
|
self.topology_config_drive = CONF.compute_feature_enabled.config_drive
|
|
self.topology_keypairs = {}
|
|
self.servers_details = {}
|
|
|
|
def get_internal_ips(self, server, network, device="network"):
|
|
internal_ips = [p['fixed_ips'][0]['ip_address'] for p in
|
|
self.os_admin.ports_client.list_ports(
|
|
tenant_id=server['tenant_id'],
|
|
network_id=network['id'])['ports'] if
|
|
p['device_owner'].startswith(device)]
|
|
return internal_ips
|
|
|
|
def _verify_empty_security_group_status(self, security_group):
|
|
ip_protocols = ["IPV6", "IPV4"]
|
|
nsx_fw_section, nsx_fw_section_rules = \
|
|
self.nsx_client.get_firewall_section_and_rules(
|
|
security_group['name'], security_group['id'])
|
|
msg = "Newly created empty security group does not meet criteria !!!"
|
|
self.assertEqual(nsx_fw_section["rule_count"], 2, msg)
|
|
self.assertEqual(nsx_fw_section_rules[0]["action"], "ALLOW", msg)
|
|
self.assertEqual(nsx_fw_section_rules[1]["action"], "ALLOW", msg)
|
|
self.assertEqual(nsx_fw_section_rules[0]["direction"], "OUT", msg)
|
|
self.assertEqual(nsx_fw_section_rules[1]["direction"], "OUT", msg)
|
|
self.assertIn(nsx_fw_section_rules[0]["ip_protocol"], ip_protocols,
|
|
msg)
|
|
self.assertIn(nsx_fw_section_rules[1]["ip_protocol"], ip_protocols,
|
|
msg)
|
|
|
|
def create_topology_empty_security_group(self, namestart="vmw_"):
|
|
security_group = self._create_empty_security_group(namestart=namestart)
|
|
self._verify_empty_security_group_status(security_group)
|
|
return security_group
|
|
|
|
def add_security_group_rule(self, security_group, rule):
|
|
return self._create_security_group_rule(secgroup=security_group,
|
|
**rule)
|
|
|
|
def get_server_key(self, server):
|
|
return self.topology_keypairs[server['key_name']]['private_key']
|
|
|
|
def create_topology_router(self, router_name, routers_client=None,
|
|
**kwargs):
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
router_name_ = constants.APPLIANCE_NAME_STARTS_WITH + router_name
|
|
router = self._create_router(namestart=router_name_, **kwargs)
|
|
public_network_info = {"external_gateway_info": dict(
|
|
network_id=self.topology_public_network_id)}
|
|
routers_client.update_router(router['id'], **public_network_info)
|
|
self.topology_routers[router_name] = router
|
|
return router
|
|
|
|
def create_topology_network(self, network_name, networks_client=None,
|
|
tenant_id=None, port_security_enabled=True, **kwargs):
|
|
if not networks_client:
|
|
networks_client = self.networks_client
|
|
if not tenant_id:
|
|
tenant_id = networks_client.tenant_id
|
|
network_name_ = constants.APPLIANCE_NAME_STARTS_WITH + network_name
|
|
name = data_utils.rand_name(network_name_)
|
|
# Neutron disables port security by default so we have to check the
|
|
# config before trying to create the network with port_security_enabled
|
|
if CONF.network_feature_enabled.port_security:
|
|
port_security_enabled = True
|
|
result = networks_client.create_network(
|
|
name=name, tenant_id=tenant_id,
|
|
port_security_enabled=port_security_enabled, **kwargs)
|
|
network = result['network']
|
|
self.assertEqual(network['name'], name)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
networks_client.delete_network, network['id'])
|
|
self.topology_networks[network_name] = network
|
|
return network
|
|
|
|
def create_topology_subnet(
|
|
self, subnet_name, network, routers_client=None,
|
|
subnets_client=None, router_id=None, ip_version=4, cidr=None,
|
|
mask_bits=None, **kwargs):
|
|
subnet_name_ = constants.APPLIANCE_NAME_STARTS_WITH + subnet_name
|
|
if not subnets_client:
|
|
subnets_client = self.subnets_client
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
|
|
def cidr_in_use(cidr, tenant_id):
|
|
"""Check cidr existence
|
|
:returns: True if subnet with cidr already exist in tenant
|
|
False else
|
|
"""
|
|
cidr_in_use = \
|
|
self.os_admin.subnets_client.list_subnets(tenant_id=tenant_id,
|
|
cidr=cidr)['subnets']
|
|
return len(cidr_in_use) != 0
|
|
|
|
if ip_version == 6:
|
|
tenant_cidr = (cidr or netaddr.IPNetwork(
|
|
CONF.network.project_network_v6_cidr))
|
|
mask_bits = mask_bits or CONF.network.project_network_v6_mask_bits
|
|
else:
|
|
tenant_cidr = cidr or netaddr.IPNetwork(
|
|
CONF.network.project_network_cidr)
|
|
mask_bits = mask_bits or CONF.network.project_network_mask_bits
|
|
str_cidr = str(tenant_cidr)
|
|
if not cidr:
|
|
# Repeatedly attempt subnet creation with sequential cidr
|
|
# blocks until an unallocated block is found.
|
|
for subnet_cidr in tenant_cidr.subnet(mask_bits):
|
|
str_cidr = str(subnet_cidr)
|
|
if not cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
|
|
break
|
|
else:
|
|
if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
|
|
LOG.error("Specified subnet %r is in use" % str_cidr)
|
|
raise
|
|
subnet = dict(name=data_utils.rand_name(subnet_name_),
|
|
network_id=network['id'], tenant_id=network['tenant_id'],
|
|
cidr=str_cidr, ip_version=ip_version, **kwargs)
|
|
try:
|
|
result = None
|
|
result = subnets_client.create_subnet(**subnet)
|
|
except lib_exc.Conflict as e:
|
|
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
|
|
if not is_overlapping_cidr:
|
|
raise
|
|
self.assertIsNotNone(result, 'Unable to allocate tenant network')
|
|
subnet = result['subnet']
|
|
self.assertEqual(subnet['cidr'], str_cidr)
|
|
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
subnets_client.delete_subnet, subnet['id'])
|
|
self.topology_subnets[subnet_name] = subnet
|
|
if router_id:
|
|
if not routers_client:
|
|
routers_client = self.routers_client
|
|
routers_client.add_router_interface(
|
|
router_id, subnet_id=subnet["id"])
|
|
self.addCleanup(
|
|
test_utils.call_and_ignore_notfound_exc,
|
|
routers_client.remove_router_interface, router_id,
|
|
subnet_id=subnet["id"])
|
|
return subnet
|
|
|
|
def create_topology_security_group(self, **kwargs):
|
|
return self._create_security_group(**kwargs)
|
|
|
|
def create_topology_instance(
|
|
self, server_name, networks, security_groups=None,
|
|
config_drive=None, keypair=None, image_id=None,
|
|
clients=None, create_floating_ip=True, **kwargs):
|
|
# Define security group for server.
|
|
if security_groups:
|
|
kwargs["security_groups"] = security_groups
|
|
else:
|
|
_sg = self.create_topology_security_group()
|
|
_security_groups = [{'name': _sg['name']}]
|
|
kwargs["security_groups"] = _security_groups
|
|
# Define config drive for server.
|
|
if not config_drive:
|
|
kwargs["config_drive"] = self.topology_config_drive
|
|
else:
|
|
kwargs["config_drive"] = config_drive
|
|
if not keypair:
|
|
keypair = self.create_keypair()
|
|
self.topology_keypairs[keypair['name']] = keypair
|
|
kwargs["key_name"] = keypair['name']
|
|
else:
|
|
kwargs["key_name"] = keypair['name']
|
|
# Define image id for server.
|
|
if image_id:
|
|
kwargs["image_id"] = image_id
|
|
server_name_ = constants.APPLIANCE_NAME_STARTS_WITH + server_name
|
|
# Collect all the networks for server.
|
|
networks_ = []
|
|
for net in networks:
|
|
net_ = {"uuid": net["id"]}
|
|
networks_.append(net_)
|
|
# Deploy server with all teh args.
|
|
server = self.create_server(
|
|
name=server_name_, networks=networks_, clients=clients, **kwargs)
|
|
if create_floating_ip:
|
|
floating_ip = self.create_floating_ip(server)
|
|
server["floating_ip"] = floating_ip
|
|
self.topology_servers_floating_ip.append(floating_ip)
|
|
else:
|
|
floating_ip = None
|
|
server_details = self.server_details(server=server,
|
|
floating_ip=floating_ip,
|
|
networks=networks)
|
|
self.servers_details[server_name] = server_details
|
|
self.topology_servers[server_name] = server
|
|
return server
|