Tempest: Add dvs specific test cases

Change-Id: Icc77ce2ee1b3e55d8eb64551833bab38a8af5e68
This commit is contained in:
Zhenmei 2016-07-26 06:17:19 -04:00 committed by Kobi Samoray
parent 9f3bdbb46d
commit 290d75a34a
8 changed files with 592 additions and 0 deletions

View File

@ -0,0 +1,6 @@
Placeholder for dvs plugin specific automated tests
directory:
dvs/
api/
scenario/

View File

View File

@ -0,0 +1,157 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest.api.network import base
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
import tempest.test
CONF = config.CONF
class BaseDvsAdminNetworkTest(base.BaseAdminNetworkTest):
@classmethod
def resource_cleanup(cls):
for port in cls.ports:
cls.admin_ports_client.delete_port(port['id'])
for subnet in cls.subnets:
cls.admin_subnets_client.delete_subnet(subnet['id'])
for network in cls.networks:
cls.admin_networks_client.delete_network(network['id'])
# clean up ports, subnets and networks
cls.ports = []
cls.subnets = []
cls.networks = []
@classmethod
def create_network(cls, **kwargs):
"""Wrapper utility that returns a test admin provider network."""
network_name = (kwargs.get('net_name')
or data_utils.rand_name('test-adm-net-'))
net_type = kwargs.get('net_type', "flat")
if tempest.test.is_extension_enabled('provider', 'network'):
body = {'name': network_name}
body.update({'provider:network_type': net_type,
'provider:physical_network': 'dvs'})
if net_type == 'vlan':
_vlanid = kwargs.get('seg_id')
body.update({'provider:segmentation_id': _vlanid})
body = cls.admin_networks_client.create_network(**body)
network = body['network']
cls.networks.append(network)
return network
@classmethod
def create_subnet(cls, network):
"""Wrapper utility that returns a test subnet."""
# The cidr and mask_bits depend on the ip version.
if cls._ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr
or "192.168.101.0/24")
mask_bits = CONF.network.tenant_network_mask_bits or 24
elif cls._ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
mask_bits = CONF.network.tenant_network_v6_mask_bits
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
try:
body = cls.admin_subnets_client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=cls._ip_version)
break
except exceptions.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = 'Available CIDR for subnet creation could not be found'
raise exceptions.BuildErrorException(message)
subnet = body['subnet']
cls.subnets.append(subnet)
return subnet
@classmethod
def create_port(cls, network_id, **kwargs):
"""Wrapper utility that returns a test port."""
body = cls.admin_ports_client.create_port(network_id=network_id,
**kwargs)
port = body['port']
cls.ports.append(port)
return port
@classmethod
def update_network(cls, network_id, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.update_network(network_id, **kwargs)
@classmethod
def delete_network(cls, network_id, client=None):
net_client = client if client else cls.admin_networks_client
return net_client.delete_network(network_id)
@classmethod
def show_network(cls, network_id, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.show_network(network_id, **kwargs)
@classmethod
def list_networks(cls, client=None, **kwargs):
net_client = client if client else cls.admin_networks_client
return net_client.list_networks(**kwargs)
@classmethod
def update_subnet(cls, subnet_id, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.update_subnet(subnet_id, **kwargs)
@classmethod
def delete_subnet(cls, subnet_id, client=None):
net_client = client if client else cls.admin_subnets_client
return net_client.delete_subnet(subnet_id)
@classmethod
def show_subnet(cls, subnet_id, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.show_subnet(subnet_id, **kwargs)
@classmethod
def list_subnets(cls, client=None, **kwargs):
net_client = client if client else cls.admin_subnets_client
return net_client.list_subnets(**kwargs)
@classmethod
def delete_port(cls, port_id, client=None):
net_client = client if client else cls.admin_ports_client
return net_client.delete_port(port_id)
@classmethod
def show_port(cls, port_id, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.show_port(port_id, **kwargs)
@classmethod
def list_ports(cls, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.list_ports(**kwargs)
@classmethod
def update_port(cls, port_id, client=None, **kwargs):
net_client = client if client else cls.admin_ports_client
return net_client.update_port(port_id, **kwargs)

View File

@ -0,0 +1,131 @@
# Copyright 2014 VMware.inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base_dvs as base
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
from tempest import test
class AdminNetworksTestJSON(base.BaseDvsAdminNetworkTest):
_interface = 'json'
"""
Test admin actions for networks, subnets.
create/update/delete an admin network
create/update/delete an admin subnets
"""
@classmethod
def resource_setup(cls):
super(AdminNetworksTestJSON, cls).resource_setup()
name = data_utils.rand_name('admin-network-')
cls.network = cls.create_network(net_name=name)
cls.name = cls.network['name']
cls.subnet = cls.create_subnet(cls.network)
cls.cidr = cls.subnet['cidr']
@test.attr(type='smoke')
@test.idempotent_id('1dcead1d-d773-4da1-9534-0b984ca684b3')
def test_create_update_delete_flat_network_subnet(self):
# Create an admin network
name = data_utils.rand_name('admin-network-')
network = self.create_network(net_name=name, net_type='flat')
net_id = network['id']
# Verify an exception thrown when updating network
new_name = "New_network"
self.assertRaises(exceptions.ServerFault,
self.update_network,
net_id,
name=new_name)
# create a subnet and verify it is an admin tenant subnet
subnet = self.create_subnet(network)
subnet_id = subnet['id']
self.assertEqual(network['tenant_id'], subnet['tenant_id'])
# Verify subnet update
new_name = "New_subnet"
body = self.update_subnet(subnet_id, name=new_name)
updated_subnet = body['subnet']
self.assertEqual(updated_subnet['name'], new_name)
# Delete subnet and network
body = self.delete_subnet(subnet_id)
# Remove subnet from cleanup list
self.subnets.pop()
body = self.delete_network(net_id)
self.networks.pop()
@test.attr(type='smoke')
@test.idempotent_id('15d3d53c-3328-401f-b8f5-3a29aee2ea3a')
def test_create_update_delete_vlan_network_subnet(self):
# Create an admin network
name = data_utils.rand_name('admin-network-')
network = self.create_network(net_name=name,
net_type='vlan',
seg_id=1000)
net_id = network['id']
# Verify an exception thrown when updating network
new_name = "New_network"
self.assertRaises(exceptions.ServerFault,
self.update_network,
net_id,
name=new_name)
# create a subnet and verify it is an admin tenant subnet
subnet = self.create_subnet(network)
subnet_id = subnet['id']
self.assertEqual(network['tenant_id'], subnet['tenant_id'])
# Verify subnet update
new_name = "New_subnet"
body = self.update_subnet(subnet_id, name=new_name)
updated_subnet = body['subnet']
self.assertEqual(updated_subnet['name'], new_name)
# Delete subnet and network
body = self.delete_subnet(subnet_id)
# Remove subnet from cleanup list
self.subnets.pop()
body = self.delete_network(net_id)
self.networks.pop()
@test.attr(type='smoke')
@test.idempotent_id('838aee5f-92f2-47b9-86c6-629a04aa6269')
def test_show_network(self):
# Verify the details of a network
body = self.show_network(self.network['id'])
network = body['network']
for key in ['id', 'name', 'provider:network_type',
'provider:physical_network']:
self.assertEqual(network[key], self.network[key])
@test.attr(type='smoke')
@test.idempotent_id('b86d50ef-39a7-4136-8c89-e5e534fe92aa')
def test_list_networks(self):
# Verify the network exists in the list of all networks
body = self.list_networks()
networks = [network['id'] for network in body['networks']
if network['id'] == self.network['id']]
self.assertNotEmpty(networks, "Created network not found in the list")
@test.attr(type='smoke')
@test.idempotent_id('ee3f8b79-da3f-4394-9bea-012488202257')
def test_show_subnet(self):
# Verify the details of a subnet
body = self.show_subnet(self.subnet['id'])
subnet = body['subnet']
self.assertNotEmpty(subnet, "Subnet returned has no fields")
for key in ['id', 'cidr']:
self.assertIn(key, subnet)
self.assertEqual(subnet[key], self.subnet[key])

View File

@ -0,0 +1,99 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base_dvs as base
from tempest.lib.common.utils import data_utils
from tempest import test
class AdminPortsTestJSON(base.BaseDvsAdminNetworkTest):
_interface = 'json'
"""
Test the following operations for ports:
port create
port delete
port list
port show
port update
"""
@classmethod
def resource_setup(cls):
super(AdminPortsTestJSON, cls).resource_setup()
name = data_utils.rand_name('admin-ports-')
cls.network = cls.create_network(net_name=name)
cls.port = cls.create_port(cls.network['id'])
@test.idempotent_id('c3f751d4-e358-44b9-bfd2-3d563c4a2d04')
def test_create_update_delete_port(self):
# Verify port creation
network_id = self.network['id']
port = self.create_port(network_id)
self.assertTrue(port['admin_state_up'])
# Verify port update
new_name = "New_Port"
body = self.update_port(
port['id'],
name=new_name,
admin_state_up=False)
updated_port = body['port']
self.assertEqual(updated_port['name'], new_name)
self.assertFalse(updated_port['admin_state_up'])
@test.attr(type='smoke')
@test.idempotent_id('d3dcd23b-7d5a-4720-8d88-473fb154d609')
def test_show_port(self):
# Verify the details of port
body = self.show_port(self.port['id'])
port = body['port']
self.assertIn('id', port)
self.assertEqual(port['id'], self.port['id'])
self.assertEqual(self.port['admin_state_up'], port['admin_state_up'])
self.assertEqual(self.port['device_id'], port['device_id'])
self.assertEqual(self.port['device_owner'], port['device_owner'])
self.assertEqual(self.port['mac_address'], port['mac_address'])
self.assertEqual(self.port['name'], port['name'])
self.assertEqual(self.port['security_groups'],
port['security_groups'])
self.assertEqual(self.port['network_id'], port['network_id'])
self.assertEqual(self.port['security_groups'],
port['security_groups'])
self.assertEqual(port['fixed_ips'], [])
@test.attr(type='smoke')
@test.idempotent_id('c5f74042-c512-4569-b9b9-bc2bf46e77e1')
def test_list_ports(self):
# Verify the port exists in the list of all ports
body = self.list_ports()
ports = [port['id'] for port in body['ports']
if port['id'] == self.port['id']]
self.assertNotEmpty(ports, "Created port not found in the list")
@test.attr(type='smoke')
@test.idempotent_id('2775f96c-a09b-49e1-a5a4-adb83a3e91c7')
def test_list_ports_fields(self):
# Verify specific fields of ports
fields = ['binding:vif_type', 'id', 'mac_address']
body = self.list_ports(fields=fields)
ports = body['ports']
self.assertNotEmpty(ports, "Port list returned is empty")
# Asserting the fields returned are correct
# Verify binding:vif_type is dvs
for port in ports:
self.assertEqual(sorted(fields), sorted(port.keys()))
self.assertEqual(port.get(fields[0]), 'dvs')

View File

@ -0,0 +1,199 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_log import log as logging
from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import data_utils
from tempest.scenario import manager
from tempest import test
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestDvsNetworkBasicOps(manager.NetworkScenarioTest):
"""
This smoke test suite assumes that Nova has been configured to
boot VM's with Neutron-managed VDS networking, and attempts to
verify network connectivity as follows:
"""
def setUp(self):
super(TestDvsNetworkBasicOps, self).setUp()
self._ip_version = 4
self.keypairs = {}
self.servers = []
self.admin_net_client = self.admin_manager.networks_client
self.admin_subnet_client = self.admin_manager.subnets_client
def _setup_network(self):
self.network = self._create_network()
self.subnet = self._create_subnet(self.network)
def _create_network(self, network_name=None):
"""Wrapper utility that returns a test admin provider network."""
network_name = network_name or data_utils.rand_name('test-adm-net-')
if test.is_extension_enabled('provider', 'network'):
body = {'name': network_name}
body.update({'provider:network_type': 'flat',
'provider:physical_network': 'dvs',
'shared': True})
body = self.admin_net_client.create_network(**body)
self.addCleanup(self.delete_wrapper,
self.admin_net_client.delete_network,
body['network']['id'])
return body['network']
def _create_subnet(self, network):
# The cidr and mask_bits depend on the ip version.
if self._ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr
or "192.168.101.0/24")
mask_bits = CONF.network.tenant_network_mask_bits or 24
elif self._ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
mask_bits = CONF.network.tenant_network_v6_mask_bits
# Find a cidr that is not in use yet and create a subnet with it
for subnet_cidr in cidr.subnet(mask_bits):
try:
body = self.admin_subnet_client.create_subnet(
network_id=network['id'],
cidr=str(subnet_cidr),
ip_version=self._ip_version)
break
except exceptions.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
else:
message = ('Available CIDR for subnet creation '
'could not be found')
raise exceptions.BuildErrorException(message)
return body['subnet']
def _check_networks(self):
"""
Checks that we see the newly created network/subnet via
checking the result of list_[networks,subnets]
"""
seen_nets = self._list_networks()
seen_names = [n['name'] for n in seen_nets]
seen_ids = [n['id'] for n in seen_nets]
self.assertIn(self.network['name'], seen_names)
self.assertIn(self.network['id'], seen_ids)
seen_subnets = self._list_subnets()
seen_net_ids = [n['network_id'] for n in seen_subnets]
seen_subnet_ids = [n['id'] for n in seen_subnets]
self.assertIn(self.network['id'], seen_net_ids)
self.assertIn(self.subnet['id'], seen_subnet_ids)
def _create_server(self):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
networks = [{'uuid': self.network['id']}]
name = data_utils.rand_name('server-smoke')
server = self.create_server(name=name,
networks=networks,
key_name=keypair['name'],
wait_until='ACTIVE')
self.servers.append(server)
return server
def _get_server_key(self, server):
return self.keypairs[server['key_name']]['private_key']
def _check_tenant_network_connectivity(self):
ssh_login = CONF.compute.image_ssh_user
for server in self.servers:
# call the common method in the parent class
(super(TestDvsNetworkBasicOps, self).
_check_tenant_network_connectivity(
server, ssh_login, self._get_server_key(server),
servers_for_debug=self.servers))
def _check_server_connectivity(self, address_list,
should_connect=True):
private_key = self._get_server_key(self.servers[0])
ip_address = address_list[0]
ssh_source = self._ssh_to_server(ip_address, private_key)
for remote_ip in address_list:
if should_connect:
msg = "Timed out waiting for "
"%s to become reachable" % remote_ip
else:
msg = "ip address %s is reachable" % remote_ip
try:
self.assertTrue(self._check_remote_connectivity
(ssh_source, remote_ip, should_connect),
msg)
except Exception:
LOG.exception("Unable to access {dest} via ssh to "
"fix-ip {src}".format(dest=remote_ip,
src=ip_address))
raise
def _check_network_internal_connectivity(self, network,
should_connect=True):
"""
via ssh check VM internal connectivity:
- ping internal gateway and DHCP port, implying in-tenant connectivity
pinging both, because L3 and DHCP agents might be on different nodes
"""
server = self.servers[0]
self._create_server()
# get internal ports' ips:
# get all network ports in the new network
internal_ips = ([p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network['id'])
if p['device_owner'].startswith('compute')])
self._check_server_connectivity(internal_ips,
should_connect)
@test.attr(type='smoke')
@test.idempotent_id('b977dce6-6527-4676-9b66-862b22058f0f')
@test.services('compute', 'network')
def test_network_basic_ops(self):
"""
For a freshly-booted VM with an IP address ("port") on a given
network:
- the Tempest host can ping the IP address. This implies, but
does not guarantee (see the ssh check that follows), that the
VM has been assigned the correct IP address and has
connectivity to the Tempest host.
- the Tempest host can perform key-based authentication to an
ssh server hosted at the IP address. This check guarantees
that the IP address is associated with the target VM.
- the Tempest host can ssh into the VM via the IP address and
successfully execute the following:
- ping an internal IP address, implying connectivity to another
VM on the same network.
"""
self._setup_network()
self._check_networks()
self._create_server()
self._check_network_internal_connectivity(self.network)