Delete 118~ API tests from Neutron

Deleted tests that duplicate tests from Tempest, according to the
new criteria specified in TESTING.rst.

Follow up steps are detailed here:
https://etherpad.openstack.org/p/neutron-tempest-defork

For reviewers, here's how to get a complete list of network
tests from Tempest. From a Tempest directory, execute:
testr list-tests tempest.api.network | cut -d"[" -f1 | cut -d"." -f4-
I verified that every test removed here actually exists in that list.

Here's a list of patches that modified tests after the fork:
https://etherpad.openstack.org/p/neutron-tempest-defork-patches-since-initial-sync

And the list of tests we care about:
https://etherpad.openstack.org/p/neutron-tempest-defork, Ctrl-F for:
'Tests that should be synced from Neutron'

Related-bug: #1552960

Change-Id: I685291058f221a7ef0b5b7485280cdabceaa4af3
This commit is contained in:
Assaf Muller 2016-03-03 17:19:13 -05:00
parent a91eee74f4
commit ddd5df9526
17 changed files with 29 additions and 2517 deletions

View File

@ -287,8 +287,32 @@ be made about implementation. Only the contract defined by Neutron's REST API
should be validated, and all interaction with the daemon should be via should be validated, and all interaction with the daemon should be via
a REST client. a REST client.
neutron/tests/api was copied from the Tempest project. The Tempest networking neutron/tests/api was copied from the Tempest project. At the time, there was
API directory was frozen and any new tests belong to the Neutron repository. an overlap of tests between the Tempest and Neutron repositories. This overlap
was then eliminated by carving out a subset of resources that belong to
Tempest, with the rest in Neutron.
API tests that belong to Tempest deal with a subset of Neutron's resources:
* Port
* Network
* Subnet
* Security Group
* Router
* Floating IP
These resources were chosen for their ubiquitously. They are found in most
Neutron depoloyments regardless of plugin, and are directly involved in the
networking and security of an instance. Together, they form the bare minimum
needed by Neutron.
This is excluding extensions to these resources (For example: Extra DHCP
options to subnets, or snat_gateway mode to routers) that are not mandatory
in the majority of cases.
Tests for other resources should be contributed to the Neutron repository.
Scenario tests should be similarly split up between Tempest and Neutron
according to the API they're targeting.
Development Process Development Process
------------------- -------------------

View File

@ -19,117 +19,6 @@ import testtools
from neutron.tests.api import base from neutron.tests.api import base
class ExternalNetworksTestJSON(base.BaseAdminNetworkTest):
@classmethod
def resource_setup(cls):
super(ExternalNetworksTestJSON, cls).resource_setup()
cls.network = cls.create_network()
def _create_network(self, external=True):
post_body = {'name': data_utils.rand_name('network-')}
if external:
post_body['router:external'] = external
body = self.admin_client.create_network(**post_body)
network = body['network']
self.addCleanup(self.admin_client.delete_network, network['id'])
return network
@test.idempotent_id('462be770-b310-4df9-9c42-773217e4c8b1')
def test_create_external_network(self):
# Create a network as an admin user specifying the
# external network extension attribute
ext_network = self._create_network()
# Verifies router:external parameter
self.assertIsNotNone(ext_network['id'])
self.assertTrue(ext_network['router:external'])
@test.idempotent_id('4db5417a-e11c-474d-a361-af00ebef57c5')
def test_update_external_network(self):
# Update a network as an admin user specifying the
# external network extension attribute
network = self._create_network(external=False)
self.assertFalse(network.get('router:external', False))
update_body = {'router:external': True}
body = self.admin_client.update_network(network['id'],
**update_body)
updated_network = body['network']
# Verify that router:external parameter was updated
self.assertTrue(updated_network['router:external'])
@test.idempotent_id('39be4c9b-a57e-4ff9-b7c7-b218e209dfcc')
def test_list_external_networks(self):
# Create external_net
external_network = self._create_network()
# List networks as a normal user and confirm the external
# network extension attribute is returned for those networks
# that were created as external
body = self.client.list_networks()
networks_list = [net['id'] for net in body['networks']]
self.assertIn(external_network['id'], networks_list)
self.assertIn(self.network['id'], networks_list)
for net in body['networks']:
if net['id'] == self.network['id']:
self.assertFalse(net['router:external'])
elif net['id'] == external_network['id']:
self.assertTrue(net['router:external'])
@test.idempotent_id('2ac50ab2-7ebd-4e27-b3ce-a9e399faaea2')
def test_show_external_networks_attribute(self):
# Create external_net
external_network = self._create_network()
# Show an external network as a normal user and confirm the
# external network extension attribute is returned.
body = self.client.show_network(external_network['id'])
show_ext_net = body['network']
self.assertEqual(external_network['name'], show_ext_net['name'])
self.assertEqual(external_network['id'], show_ext_net['id'])
self.assertTrue(show_ext_net['router:external'])
body = self.client.show_network(self.network['id'])
show_net = body['network']
# Verify with show that router:external is False for network
self.assertEqual(self.network['name'], show_net['name'])
self.assertEqual(self.network['id'], show_net['id'])
self.assertFalse(show_net['router:external'])
@test.idempotent_id('82068503-2cf2-4ed4-b3be-ecb89432e4bb')
def test_delete_external_networks_with_floating_ip(self):
"""Verifies external network can be deleted while still holding
(unassociated) floating IPs
"""
# Set cls.client to admin to use base.create_subnet()
client = self.admin_client
body = client.create_network(**{'router:external': True})
external_network = body['network']
self.addCleanup(self._try_delete_resource,
client.delete_network,
external_network['id'])
subnet = self.create_subnet(external_network, client=client,
enable_dhcp=False)
body = client.create_floatingip(
floating_network_id=external_network['id'])
created_floating_ip = body['floatingip']
self.addCleanup(self._try_delete_resource,
client.delete_floatingip,
created_floating_ip['id'])
floatingip_list = client.list_floatingips(
network=external_network['id'])
self.assertIn(created_floating_ip['id'],
(f['id'] for f in floatingip_list['floatingips']))
client.delete_network(external_network['id'])
# Verifies floating ip is deleted
floatingip_list = client.list_floatingips()
self.assertNotIn(created_floating_ip['id'],
(f['id'] for f in floatingip_list['floatingips']))
# Verifies subnet is deleted
subnet_list = client.list_subnets()
self.assertNotIn(subnet['id'],
(s['id'] for s in subnet_list))
# Removes subnet from the cleanup list
self.subnets.remove(subnet)
class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest): class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
credentials = ['primary', 'alt', 'admin'] credentials = ['primary', 'alt', 'admin']

View File

@ -1,54 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron.tests.api import base
from neutron.tests.tempest import config
CONF = config.CONF
class ExternalNetworksAdminNegativeTestJSON(base.BaseAdminNetworkTest):
@test.attr(type=['negative'])
@test.idempotent_id('d402ae6c-0be0-4d8e-833b-a738895d98d0')
def test_create_port_with_precreated_floatingip_as_fixed_ip(self):
"""
External networks can be used to create both floating-ip as well
as instance-ip. So, creating an instance-ip with a value of a
pre-created floating-ip should be denied.
"""
# create a floating ip
client = self.admin_client
body = client.create_floatingip(
floating_network_id=CONF.network.public_network_id)
created_floating_ip = body['floatingip']
self.addCleanup(self._try_delete_resource,
client.delete_floatingip,
created_floating_ip['id'])
floating_ip_address = created_floating_ip['floating_ip_address']
self.assertIsNotNone(floating_ip_address)
# use the same value of floatingip as fixed-ip to create_port()
fixed_ips = [{'ip_address': floating_ip_address}]
# create a port which will internally create an instance-ip
self.assertRaises(lib_exc.Conflict,
client.create_port,
network_id=CONF.network.public_network_id,
fixed_ips=fixed_ips)

View File

@ -41,75 +41,6 @@ class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest):
cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.port = cls.create_port(cls.network) cls.port = cls.create_port(cls.network)
@test.attr(type='smoke')
@test.idempotent_id('64f2100b-5471-4ded-b46c-ddeeeb4f231b')
def test_list_floating_ips_from_admin_and_nonadmin(self):
# Create floating ip from admin user
floating_ip_admin = self.admin_client.create_floatingip(
floating_network_id=self.ext_net_id)
self.addCleanup(self.admin_client.delete_floatingip,
floating_ip_admin['floatingip']['id'])
# Create floating ip from alt user
body = self.alt_client.create_floatingip(
floating_network_id=self.ext_net_id)
floating_ip_alt = body['floatingip']
self.addCleanup(self.alt_client.delete_floatingip,
floating_ip_alt['id'])
# List floating ips from admin
body = self.admin_client.list_floatingips()
floating_ip_ids_admin = [f['id'] for f in body['floatingips']]
# Check that admin sees all floating ips
self.assertIn(self.floating_ip['id'], floating_ip_ids_admin)
self.assertIn(floating_ip_admin['floatingip']['id'],
floating_ip_ids_admin)
self.assertIn(floating_ip_alt['id'], floating_ip_ids_admin)
# List floating ips from nonadmin
body = self.client.list_floatingips()
floating_ip_ids = [f['id'] for f in body['floatingips']]
# Check that nonadmin user doesn't see floating ip created from admin
# and floating ip that is created in another tenant (alt user)
self.assertIn(self.floating_ip['id'], floating_ip_ids)
self.assertNotIn(floating_ip_admin['floatingip']['id'],
floating_ip_ids)
self.assertNotIn(floating_ip_alt['id'], floating_ip_ids)
@test.attr(type='smoke')
@test.idempotent_id('32727cc3-abe2-4485-a16e-48f2d54c14f2')
def test_create_list_show_floating_ip_with_tenant_id_by_admin(self):
# Creates a floating IP
body = self.admin_client.create_floatingip(
floating_network_id=self.ext_net_id,
tenant_id=self.network['tenant_id'],
port_id=self.port['id'])
created_floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip,
created_floating_ip['id'])
self.assertIsNotNone(created_floating_ip['id'])
self.assertIsNotNone(created_floating_ip['tenant_id'])
self.assertIsNotNone(created_floating_ip['floating_ip_address'])
self.assertEqual(created_floating_ip['port_id'], self.port['id'])
self.assertEqual(created_floating_ip['floating_network_id'],
self.ext_net_id)
port = self.port['fixed_ips']
self.assertEqual(created_floating_ip['fixed_ip_address'],
port[0]['ip_address'])
# Verifies the details of a floating_ip
floating_ip = self.admin_client.show_floatingip(
created_floating_ip['id'])
shown_floating_ip = floating_ip['floatingip']
self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
self.assertEqual(shown_floating_ip['floating_network_id'],
self.ext_net_id)
self.assertEqual(shown_floating_ip['tenant_id'],
self.network['tenant_id'])
self.assertEqual(shown_floating_ip['floating_ip_address'],
created_floating_ip['floating_ip_address'])
self.assertEqual(shown_floating_ip['port_id'], self.port['id'])
# Verify the floating ip exists in the list of all floating_ips
floating_ips = self.admin_client.list_floatingips()
floatingip_id_list = [f['id'] for f in floating_ips['floatingips']]
self.assertIn(created_floating_ip['id'], floatingip_id_list)
@test.attr(type=['negative', 'smoke']) @test.attr(type=['negative', 'smoke'])
@test.idempotent_id('11116ee9-4e99-5b15-b8e1-aa7df92ca589') @test.idempotent_id('11116ee9-4e99-5b15-b8e1-aa7df92ca589')
def test_associate_floating_ip_with_port_from_another_tenant(self): def test_associate_floating_ip_with_port_from_another_tenant(self):

View File

@ -43,21 +43,3 @@ class BaseRouterTest(base.BaseAdminNetworkTest):
for router in list_body['routers']: for router in list_body['routers']:
routers_list.append(router['id']) routers_list.append(router['id'])
self.assertNotIn(router_id, routers_list) self.assertNotIn(router_id, routers_list)
def _add_router_interface_with_subnet_id(self, router_id, subnet_id):
interface = self.client.add_router_interface_with_subnet_id(
router_id, subnet_id)
self.addCleanup(self._remove_router_interface_with_subnet_id,
router_id, subnet_id)
self.assertEqual(subnet_id, interface['subnet_id'])
return interface
def _remove_router_interface_with_subnet_id(self, router_id, subnet_id):
body = self.client.remove_router_interface_with_subnet_id(
router_id, subnet_id)
self.assertEqual(subnet_id, body['subnet_id'])
def _remove_router_interface_with_port_id(self, router_id, port_id):
body = self.client.remove_router_interface_with_port_id(router_id,
port_id)
self.assertEqual(port_id, body['port_id'])

View File

@ -43,13 +43,3 @@ class BaseSecGroupTest(base.BaseNetworkTest):
for secgroup in list_body['security_groups']: for secgroup in list_body['security_groups']:
secgroup_list.append(secgroup['id']) secgroup_list.append(secgroup['id'])
self.assertNotIn(secgroup_id, secgroup_list) self.assertNotIn(secgroup_id, secgroup_list)
def _delete_security_group_rule(self, rule_id):
self.client.delete_security_group_rule(rule_id)
# Asserting that the security group is not found in the list
# after deletion
list_body = self.client.list_security_group_rules()
rules_list = list()
for rule in list_body['security_group_rules']:
rules_list.append(rule['id'])
self.assertNotIn(rule_id, rules_list)

View File

@ -13,11 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import random
import netaddr import netaddr
import six
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
@ -31,16 +27,6 @@ CONF = config.CONF
class NetworksTestDHCPv6(base.BaseNetworkTest): class NetworksTestDHCPv6(base.BaseNetworkTest):
_ip_version = 6 _ip_version = 6
""" Test DHCPv6 specific features using SLAAC, stateless and
stateful settings for subnets. Also it shall check dual-stack
functionality (IPv4 + IPv6 together).
The tests include:
generating of SLAAC EUI-64 address in subnets with various settings
receiving SLAAC addresses in combinations of various subnets
receiving stateful IPv6 addresses
addressing in subnets with router
"""
@classmethod @classmethod
def skip_checks(cls): def skip_checks(cls):
msg = None msg = None
@ -89,249 +75,6 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
self.client.delete_router(router['id']) self.client.delete_router(router['id'])
self._remove_from_list_by_index(self.routers, router) self._remove_from_list_by_index(self.routers, router)
def _get_ips_from_subnet(self, **kwargs):
subnet = self.create_subnet(self.network, **kwargs)
port_mac = data_utils.rand_mac_address()
port = self.create_port(self.network, mac_address=port_mac)
real_ip = next(iter(port['fixed_ips']), None)['ip_address']
eui_ip = data_utils.get_ipv6_addr_by_EUI64(subnet['cidr'],
port_mac).format()
return real_ip, eui_ip
@test.idempotent_id('e5517e62-6f16-430d-a672-f80875493d4c')
def test_dhcpv6_stateless_eui64(self):
"""When subnets configured with IPv6 SLAAC (AOM=100) and DHCPv6
stateless (AOM=110) both for radvd and dnsmasq, port shall receive IP
address calculated from its MAC.
"""
for ra_mode, add_mode in (
('slaac', 'slaac'),
('dhcpv6-stateless', 'dhcpv6-stateless'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
real_ip, eui_ip = self._get_ips_from_subnet(**kwargs)
self._clean_network()
self.assertEqual(eui_ip, real_ip,
('Real port IP is %s, but shall be %s when '
'ipv6_ra_mode=%s and ipv6_address_mode=%s') % (
real_ip, eui_ip, ra_mode, add_mode))
@test.idempotent_id('ae2f4a5d-03ff-4c42-a3b0-ce2fcb7ea832')
def test_dhcpv6_stateless_no_ra(self):
"""When subnets configured with IPv6 SLAAC and DHCPv6 stateless
and there is no radvd, port shall receive IP address calculated
from its MAC and mask of subnet.
"""
for ra_mode, add_mode in (
(None, 'slaac'),
(None, 'dhcpv6-stateless'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
real_ip, eui_ip = self._get_ips_from_subnet(**kwargs)
self._clean_network()
self.assertEqual(eui_ip, real_ip,
('Real port IP %s shall be equal to EUI-64 %s'
'when ipv6_ra_mode=%s,ipv6_address_mode=%s') % (
real_ip, eui_ip,
ra_mode if ra_mode else "Off",
add_mode if add_mode else "Off"))
@test.idempotent_id('81f18ef6-95b5-4584-9966-10d480b7496a')
def test_dhcpv6_invalid_options(self):
"""Different configurations for radvd and dnsmasq are not allowed"""
for ra_mode, add_mode in (
('dhcpv6-stateless', 'dhcpv6-stateful'),
('dhcpv6-stateless', 'slaac'),
('slaac', 'dhcpv6-stateful'),
('dhcpv6-stateful', 'dhcpv6-stateless'),
('dhcpv6-stateful', 'slaac'),
('slaac', 'dhcpv6-stateless'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
self.assertRaises(lib_exc.BadRequest,
self.create_subnet,
self.network,
**kwargs)
@test.idempotent_id('21635b6f-165a-4d42-bf49-7d195e47342f')
def test_dhcpv6_stateless_no_ra_no_dhcp(self):
"""If no radvd option and no dnsmasq option is configured
port shall receive IP from fixed IPs list of subnet.
"""
real_ip, eui_ip = self._get_ips_from_subnet()
self._clean_network()
self.assertNotEqual(eui_ip, real_ip,
('Real port IP %s equal to EUI-64 %s when '
'ipv6_ra_mode=Off and ipv6_address_mode=Off,'
'but shall be taken from fixed IPs') % (
real_ip, eui_ip))
@test.idempotent_id('4544adf7-bb5f-4bdc-b769-b3e77026cef2')
def test_dhcpv6_two_subnets(self):
"""When one IPv6 subnet configured with IPv6 SLAAC or DHCPv6 stateless
and other IPv6 is with DHCPv6 stateful, port shall receive EUI-64 IP
addresses from first subnet and DHCPv6 address from second one.
Order of subnet creating should be unimportant.
"""
for order in ("slaac_first", "dhcp_first"):
for ra_mode, add_mode in (
('slaac', 'slaac'),
('dhcpv6-stateless', 'dhcpv6-stateless'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
kwargs_dhcp = {'ipv6_address_mode': 'dhcpv6-stateful'}
if order == "slaac_first":
subnet_slaac = self.create_subnet(self.network, **kwargs)
subnet_dhcp = self.create_subnet(
self.network, **kwargs_dhcp)
else:
subnet_dhcp = self.create_subnet(
self.network, **kwargs_dhcp)
subnet_slaac = self.create_subnet(self.network, **kwargs)
port_mac = data_utils.rand_mac_address()
dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"]
eui_ip = data_utils.get_ipv6_addr_by_EUI64(
subnet_slaac['cidr'],
port_mac
).format()
# TODO(sergsh): remove this when 1219795 is fixed
dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
port = self.create_port(self.network, mac_address=port_mac)
real_ips = dict([(k['subnet_id'], k['ip_address'])
for k in port['fixed_ips']])
real_dhcp_ip, real_eui_ip = [real_ips[sub['id']]
for sub in subnet_dhcp,
subnet_slaac]
self.client.delete_port(port['id'])
self.ports.pop()
body = self.client.list_ports()
ports_id_list = [i['id'] for i in body['ports']]
self.assertNotIn(port['id'], ports_id_list)
self._clean_network()
self.assertEqual(real_eui_ip,
eui_ip,
'Real IP is {0}, but shall be {1}'.format(
real_eui_ip,
eui_ip))
self.assertIn(
real_dhcp_ip, dhcp_ip,
'Real IP is {0}, but shall be one from {1}'.format(
real_dhcp_ip,
str(dhcp_ip)))
@test.idempotent_id('4256c61d-c538-41ea-9147-3c450c36669e')
def test_dhcpv6_64_subnets(self):
"""When a Network contains two subnets, one being an IPv6 subnet
configured with ipv6_ra_mode either as slaac or dhcpv6-stateless,
and the other subnet being an IPv4 subnet, a port attached to the
network shall receive IP addresses from the subnets as follows: An
IPv6 address calculated using EUI-64 from the first subnet, and an
IPv4 address from the second subnet. The ordering of the subnets
that the port is associated with should not affect this behavior.
"""
for order in ("slaac_first", "dhcp_first"):
for ra_mode, add_mode in (
('slaac', 'slaac'),
('dhcpv6-stateless', 'dhcpv6-stateless'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
if order == "slaac_first":
subnet_slaac = self.create_subnet(self.network, **kwargs)
subnet_dhcp = self.create_subnet(
self.network, ip_version=4)
else:
subnet_dhcp = self.create_subnet(
self.network, ip_version=4)
subnet_slaac = self.create_subnet(self.network, **kwargs)
port_mac = data_utils.rand_mac_address()
dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"]
eui_ip = data_utils.get_ipv6_addr_by_EUI64(
subnet_slaac['cidr'],
port_mac
).format()
# TODO(sergsh): remove this when 1219795 is fixed
dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
port = self.create_port(self.network, mac_address=port_mac)
real_ips = dict([(k['subnet_id'], k['ip_address'])
for k in port['fixed_ips']])
real_dhcp_ip, real_eui_ip = [real_ips[sub['id']]
for sub in subnet_dhcp,
subnet_slaac]
self._clean_network()
self.assertTrue({real_eui_ip,
real_dhcp_ip}.issubset([eui_ip] + dhcp_ip))
self.assertEqual(real_eui_ip,
eui_ip,
'Real IP is {0}, but shall be {1}'.format(
real_eui_ip,
eui_ip))
self.assertIn(
real_dhcp_ip, dhcp_ip,
'Real IP is {0}, but shall be one from {1}'.format(
real_dhcp_ip,
str(dhcp_ip)))
@test.idempotent_id('4ab211a0-276f-4552-9070-51e27f58fecf')
def test_dhcp_stateful(self):
"""With all options below, DHCPv6 shall allocate first
address from subnet pool to port.
"""
for ra_mode, add_mode in (
('dhcpv6-stateful', 'dhcpv6-stateful'),
('dhcpv6-stateful', None),
(None, 'dhcpv6-stateful'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet = self.create_subnet(self.network, **kwargs)
port = self.create_port(self.network)
port_ip = next(iter(port['fixed_ips']), None)['ip_address']
dhcp_ip = subnet["allocation_pools"][0]["start"]
# TODO(sergsh): remove this when 1219795 is fixed
dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
self._clean_network()
self.assertIn(
port_ip, dhcp_ip,
'Real IP is {0}, but shall be one from {1}'.format(
port_ip,
str(dhcp_ip)))
@test.idempotent_id('51a5e97f-f02e-4e4e-9a17-a69811d300e3')
def test_dhcp_stateful_fixedips(self):
"""With all options below, port shall be able to get
requested IP from fixed IP range not depending on
DHCPv6 stateful (not SLAAC!) settings configured.
"""
for ra_mode, add_mode in (
('dhcpv6-stateful', 'dhcpv6-stateful'),
('dhcpv6-stateful', None),
(None, 'dhcpv6-stateful'),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet = self.create_subnet(self.network, **kwargs)
ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
subnet["allocation_pools"][0]["end"])
ip = netaddr.IPAddress(random.randrange(ip_range.first,
ip_range.last)).format()
port = self.create_port(self.network,
fixed_ips=[{'subnet_id': subnet['id'],
'ip_address': ip}])
port_ip = next(iter(port['fixed_ips']), None)['ip_address']
self._clean_network()
self.assertEqual(port_ip, ip,
("Port IP %s is not as fixed IP from "
"port create request: %s") % (
port_ip, ip))
@test.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af') @test.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af')
def test_dhcp_stateful_fixedips_outrange(self): def test_dhcp_stateful_fixedips_outrange(self):
"""When port gets IP address from fixed IP range it """When port gets IP address from fixed IP range it
@ -350,59 +93,6 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
fixed_ips=[{'subnet_id': subnet['id'], fixed_ips=[{'subnet_id': subnet['id'],
'ip_address': ip}]) 'ip_address': ip}])
@test.idempotent_id('57b8302b-cba9-4fbb-8835-9168df029051')
def test_dhcp_stateful_fixedips_duplicate(self):
"""When port gets IP address from fixed IP range it
shall be checked if it's not duplicate.
"""
kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful',
'ipv6_address_mode': 'dhcpv6-stateful'}
subnet = self.create_subnet(self.network, **kwargs)
ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
subnet["allocation_pools"][0]["end"])
ip = netaddr.IPAddress(random.randrange(
ip_range.first, ip_range.last)).format()
self.create_port(self.network,
fixed_ips=[
{'subnet_id': subnet['id'],
'ip_address': ip}])
self.assertRaisesRegexp(lib_exc.Conflict,
"object with that identifier already exists",
self.create_port,
self.network,
fixed_ips=[{'subnet_id': subnet['id'],
'ip_address': ip}])
def _create_subnet_router(self, kwargs):
subnet = self.create_subnet(self.network, **kwargs)
router = self.create_router(
router_name=data_utils.rand_name("routerv6-"),
admin_state_up=True)
port = self.create_router_interface(router['id'],
subnet['id'])
body = self.client.show_port(port['port_id'])
return subnet, body['port']
@test.idempotent_id('e98f65db-68f4-4330-9fea-abd8c5192d4d')
def test_dhcp_stateful_router(self):
"""With all options below the router interface shall
receive DHCPv6 IP address from allocation pool.
"""
for ra_mode, add_mode in (
('dhcpv6-stateful', 'dhcpv6-stateful'),
('dhcpv6-stateful', None),
):
kwargs = {'ipv6_ra_mode': ra_mode,
'ipv6_address_mode': add_mode}
kwargs = {k: v for k, v in six.iteritems(kwargs) if v}
subnet, port = self._create_subnet_router(kwargs)
port_ip = next(iter(port['fixed_ips']), None)['ip_address']
self._clean_network()
self.assertEqual(port_ip, subnet['gateway_ip'],
("Port IP %s is not as first IP from "
"subnets allocation pool: %s") % (
port_ip, subnet['gateway_ip']))
def tearDown(self): def tearDown(self):
self._clean_network() self._clean_network()
super(NetworksTestDHCPv6, self).tearDown() super(NetworksTestDHCPv6, self).tearDown()

View File

@ -1,74 +0,0 @@
# Copyright 2013 OpenStack, Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import test
from neutron.tests.api import base
class ExtensionsTestJSON(base.BaseNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
List all available extensions
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
"""
@classmethod
def resource_setup(cls):
super(ExtensionsTestJSON, cls).resource_setup()
@test.attr(type='smoke')
@test.idempotent_id('ef28c7e6-e646-4979-9d67-deb207bc5564')
def test_list_show_extensions(self):
# List available extensions for the tenant
expected_alias = ['security-group', 'l3_agent_scheduler',
'ext-gw-mode', 'binding', 'quotas',
'agent', 'dhcp_agent_scheduler', 'provider',
'router', 'extraroute', 'external-net',
'allowed-address-pairs', 'extra_dhcp_opt']
expected_alias = [ext for ext in expected_alias if
test.is_extension_enabled(ext, 'network')]
actual_alias = list()
extensions = self.client.list_extensions()
list_extensions = extensions['extensions']
# Show and verify the details of the available extensions
for ext in list_extensions:
ext_name = ext['name']
ext_alias = ext['alias']
actual_alias.append(ext['alias'])
ext_details = self.client.show_extension(ext_alias)
ext_details = ext_details['extension']
self.assertIsNotNone(ext_details)
self.assertIn('updated', ext_details.keys())
self.assertIn('name', ext_details.keys())
self.assertIn('description', ext_details.keys())
self.assertIn('links', ext_details.keys())
self.assertIn('alias', ext_details.keys())
self.assertEqual(ext_details['name'], ext_name)
self.assertEqual(ext_details['alias'], ext_alias)
self.assertEqual(ext_details, ext)
# Verify if expected extensions are present in the actual list
# of extensions returned, but only for those that have been
# enabled via configuration
for e in expected_alias:
if test.is_extension_enabled(e, 'network'):
self.assertIn(e, actual_alias)

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import netaddr
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest import test from tempest import test
@ -25,25 +24,6 @@ CONF = config.CONF
class FloatingIPTestJSON(base.BaseNetworkTest): class FloatingIPTestJSON(base.BaseNetworkTest):
"""
Tests the following operations in the Quantum API using the REST client for
Neutron:
Create a Floating IP
Update a Floating IP
Delete a Floating IP
List all Floating IPs
Show Floating IP details
Associate a Floating IP with a port and then delete that port
Associate a Floating IP with a port and then with a port on another
router
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
public_network_id which is the id for the external network present
"""
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(FloatingIPTestJSON, cls).resource_setup() super(FloatingIPTestJSON, cls).resource_setup()
@ -63,61 +43,6 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
for i in range(2): for i in range(2):
cls.create_port(cls.network) cls.create_port(cls.network)
@test.attr(type='smoke')
@test.idempotent_id('62595970-ab1c-4b7f-8fcc-fddfe55e8718')
def test_create_list_show_update_delete_floating_ip(self):
# Creates a floating IP
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id,
port_id=self.ports[0]['id'])
created_floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip,
created_floating_ip['id'])
self.assertIsNotNone(created_floating_ip['id'])
self.assertIsNotNone(created_floating_ip['tenant_id'])
self.assertIsNotNone(created_floating_ip['floating_ip_address'])
self.assertEqual(created_floating_ip['port_id'], self.ports[0]['id'])
self.assertEqual(created_floating_ip['floating_network_id'],
self.ext_net_id)
self.assertIn(created_floating_ip['fixed_ip_address'],
[ip['ip_address'] for ip in self.ports[0]['fixed_ips']])
# Verifies the details of a floating_ip
floating_ip = self.client.show_floatingip(created_floating_ip['id'])
shown_floating_ip = floating_ip['floatingip']
self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
self.assertEqual(shown_floating_ip['floating_network_id'],
self.ext_net_id)
self.assertEqual(shown_floating_ip['tenant_id'],
created_floating_ip['tenant_id'])
self.assertEqual(shown_floating_ip['floating_ip_address'],
created_floating_ip['floating_ip_address'])
self.assertEqual(shown_floating_ip['port_id'], self.ports[0]['id'])
# Verify the floating ip exists in the list of all floating_ips
floating_ips = self.client.list_floatingips()
floatingip_id_list = list()
for f in floating_ips['floatingips']:
floatingip_id_list.append(f['id'])
self.assertIn(created_floating_ip['id'], floatingip_id_list)
# Associate floating IP to the other port
floating_ip = self.client.update_floatingip(
created_floating_ip['id'],
port_id=self.ports[1]['id'])
updated_floating_ip = floating_ip['floatingip']
self.assertEqual(updated_floating_ip['port_id'], self.ports[1]['id'])
self.assertEqual(updated_floating_ip['fixed_ip_address'],
self.ports[1]['fixed_ips'][0]['ip_address'])
self.assertEqual(updated_floating_ip['router_id'], self.router['id'])
# Disassociate floating IP from the port
floating_ip = self.client.update_floatingip(
created_floating_ip['id'],
port_id=None)
updated_floating_ip = floating_ip['floatingip']
self.assertIsNone(updated_floating_ip['port_id'])
self.assertIsNone(updated_floating_ip['fixed_ip_address'])
self.assertIsNone(updated_floating_ip['router_id'])
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff') @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff')
def test_create_update_floatingip_description(self): def test_create_update_floatingip_description(self):
@ -137,103 +62,3 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
self.assertEqual('d2', body['floatingip']['description']) self.assertEqual('d2', body['floatingip']['description'])
body = self.client.show_floatingip(body['floatingip']['id']) body = self.client.show_floatingip(body['floatingip']['id'])
self.assertEqual('d2', body['floatingip']['description']) self.assertEqual('d2', body['floatingip']['description'])
@test.attr(type='smoke')
@test.idempotent_id('e1f6bffd-442f-4668-b30e-df13f2705e77')
def test_floating_ip_delete_port(self):
# Create a floating IP
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id)
created_floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip,
created_floating_ip['id'])
# Create a port
port = self.client.create_port(network_id=self.network['id'])
created_port = port['port']
floating_ip = self.client.update_floatingip(
created_floating_ip['id'],
port_id=created_port['id'])
# Delete port
self.client.delete_port(created_port['id'])
# Verifies the details of the floating_ip
floating_ip = self.client.show_floatingip(created_floating_ip['id'])
shown_floating_ip = floating_ip['floatingip']
# Confirm the fields are back to None
self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
self.assertIsNone(shown_floating_ip['port_id'])
self.assertIsNone(shown_floating_ip['fixed_ip_address'])
self.assertIsNone(shown_floating_ip['router_id'])
@test.attr(type='smoke')
@test.idempotent_id('1bb2f731-fe5a-4b8c-8409-799ade1bed4d')
def test_floating_ip_update_different_router(self):
# Associate a floating IP to a port on a router
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id,
port_id=self.ports[1]['id'])
created_floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip,
created_floating_ip['id'])
self.assertEqual(created_floating_ip['router_id'], self.router['id'])
network2 = self.create_network()
subnet2 = self.create_subnet(network2)
router2 = self.create_router(data_utils.rand_name('router-'),
external_network_id=self.ext_net_id)
self.create_router_interface(router2['id'], subnet2['id'])
port_other_router = self.create_port(network2)
# Associate floating IP to the other port on another router
floating_ip = self.client.update_floatingip(
created_floating_ip['id'],
port_id=port_other_router['id'])
updated_floating_ip = floating_ip['floatingip']
self.assertEqual(updated_floating_ip['router_id'], router2['id'])
self.assertEqual(updated_floating_ip['port_id'],
port_other_router['id'])
self.assertIsNotNone(updated_floating_ip['fixed_ip_address'])
@test.attr(type='smoke')
@test.idempotent_id('36de4bd0-f09c-43e3-a8e1-1decc1ffd3a5')
def test_create_floating_ip_specifying_a_fixed_ip_address(self):
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id,
port_id=self.ports[1]['id'],
fixed_ip_address=self.ports[1]['fixed_ips'][0]['ip_address'])
created_floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip,
created_floating_ip['id'])
self.assertIsNotNone(created_floating_ip['id'])
self.assertEqual(created_floating_ip['fixed_ip_address'],
self.ports[1]['fixed_ips'][0]['ip_address'])
floating_ip = self.client.update_floatingip(
created_floating_ip['id'],
port_id=None)
self.assertIsNone(floating_ip['floatingip']['port_id'])
@test.attr(type='smoke')
@test.idempotent_id('45c4c683-ea97-41ef-9c51-5e9802f2f3d7')
def test_create_update_floatingip_with_port_multiple_ip_address(self):
# Find out ips that can be used for tests
ips = list(netaddr.IPNetwork(self.subnet['cidr']))
list_ips = [str(ip) for ip in ips[-3:-1]]
fixed_ips = [{'ip_address': list_ips[0]}, {'ip_address': list_ips[1]}]
# Create port
body = self.client.create_port(network_id=self.network['id'],
fixed_ips=fixed_ips)
port = body['port']
self.addCleanup(self.client.delete_port, port['id'])
# Create floating ip
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id,
port_id=port['id'],
fixed_ip_address=list_ips[0])
floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip, floating_ip['id'])
self.assertIsNotNone(floating_ip['id'])
self.assertEqual(floating_ip['fixed_ip_address'], list_ips[0])
# Update floating ip
body = self.client.update_floatingip(floating_ip['id'],
port_id=port['id'],
fixed_ip_address=list_ips[1])
update_floating_ip = body['floatingip']
self.assertEqual(update_floating_ip['fixed_ip_address'],
list_ips[1])

View File

@ -26,16 +26,6 @@ CONF = config.CONF
class FloatingIPNegativeTestJSON(base.BaseNetworkTest): class FloatingIPNegativeTestJSON(base.BaseNetworkTest):
"""
Test the following negative operations for floating ips:
Create floatingip with a port that is unreachable to external network
Create floatingip in private network
Associate floatingip with port that is unreachable to external network
Associate floating ip to port that has already another floating ip
Associate floating ip with port from another tenant
"""
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(FloatingIPNegativeTestJSON, cls).resource_setup() super(FloatingIPNegativeTestJSON, cls).resource_setup()
@ -50,39 +40,6 @@ class FloatingIPNegativeTestJSON(base.BaseNetworkTest):
cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.port = cls.create_port(cls.network) cls.port = cls.create_port(cls.network)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('22996ea8-4a81-4b27-b6e1-fa5df92fa5e8')
def test_create_floatingip_with_port_ext_net_unreachable(self):
self.assertRaises(lib_exc.NotFound, self.client.create_floatingip,
floating_network_id=self.ext_net_id,
port_id=self.port['id'],
fixed_ip_address=self.port['fixed_ips'][0]
['ip_address'])
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('50b9aeb4-9f0b-48ee-aa31-fa955a48ff54')
def test_create_floatingip_in_private_network(self):
self.assertRaises(lib_exc.BadRequest,
self.client.create_floatingip,
floating_network_id=self.network['id'],
port_id=self.port['id'],
fixed_ip_address=self.port['fixed_ips'][0]
['ip_address'])
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('6b3b8797-6d43-4191-985c-c48b773eb429')
def test_associate_floatingip_port_ext_net_unreachable(self):
# Create floating ip
body = self.client.create_floatingip(
floating_network_id=self.ext_net_id)
floating_ip = body['floatingip']
self.addCleanup(self.client.delete_floatingip, floating_ip['id'])
# Associate floating IP to the other port
self.assertRaises(lib_exc.NotFound, self.client.update_floatingip,
floating_ip['id'], port_id=self.port['id'],
fixed_ip_address=self.port['fixed_ips'][0]
['ip_address'])
@test.attr(type=['negative', 'smoke']) @test.attr(type=['negative', 'smoke'])
@test.idempotent_id('0b5b8797-6de7-4191-905c-a48b888eb429') @test.idempotent_id('0b5b8797-6de7-4191-905c-a48b888eb429')
def test_associate_floatingip_with_port_with_floatingip(self): def test_associate_floatingip_with_port_with_floatingip(self):

View File

@ -15,11 +15,6 @@
import itertools import itertools
import netaddr
import six
from tempest.common import custom_matchers
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
from neutron.tests.api import base from neutron.tests.api import base
@ -34,172 +29,17 @@ class NetworksTestJSON(base.BaseNetworkTest):
Tests the following operations in the Neutron API using the REST client for Tests the following operations in the Neutron API using the REST client for
Neutron: Neutron:
create a network for a tenant
list tenant's networks list tenant's networks
show a network
show a tenant network details show a tenant network details
create a subnet for a tenant
list tenant's subnets
show a tenant subnet details
network update
subnet update
delete a network also deletes its subnets
list external networks
All subnet tests are run once with ipv4 and once with ipv6. v2.0 of the Neutron API is assumed.
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
tenant_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant ipv4 subnets
tenant_network_v6_cidr is the equivalent for ipv6 subnets
tenant_network_mask_bits with the mask bits to be used to partition the
block defined by tenant_network_cidr
tenant_network_v6_mask_bits is the equivalent for ipv6 subnets
""" """
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(NetworksTestJSON, cls).resource_setup() super(NetworksTestJSON, cls).resource_setup()
cls.network = cls.create_network() cls.network = cls.create_network()
cls.name = cls.network['name']
cls.subnet = cls._create_subnet_with_last_subnet_block(cls.network,
cls._ip_version)
cls.cidr = cls.subnet['cidr']
cls._subnet_data = {6: {'gateway':
str(cls._get_gateway_from_tempest_conf(6)),
'allocation_pools':
cls._get_allocation_pools_from_gateway(6),
'dns_nameservers': ['2001:4860:4860::8844',
'2001:4860:4860::8888'],
'host_routes': [{'destination': '2001::/64',
'nexthop': '2003::1'}],
'new_host_routes': [{'destination':
'2001::/64',
'nexthop': '2005::1'}],
'new_dns_nameservers':
['2001:4860:4860::7744',
'2001:4860:4860::7888']},
4: {'gateway':
str(cls._get_gateway_from_tempest_conf(4)),
'allocation_pools':
cls._get_allocation_pools_from_gateway(4),
'dns_nameservers': ['8.8.4.4', '8.8.8.8'],
'host_routes': [{'destination': '10.20.0.0/32',
'nexthop': '10.100.1.1'}],
'new_host_routes': [{'destination':
'10.20.0.0/32',
'nexthop':
'10.100.1.2'}],
'new_dns_nameservers': ['7.8.8.8', '7.8.4.4']}}
@classmethod
def _create_subnet_with_last_subnet_block(cls, network, ip_version):
"""Derive last subnet CIDR block from tenant CIDR and
create the subnet with that derived CIDR
"""
if ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
mask_bits = CONF.network.tenant_network_mask_bits
elif ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
mask_bits = CONF.network.tenant_network_v6_mask_bits
subnet_cidr = list(cidr.subnet(mask_bits))[-1]
gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
return cls.create_subnet(network, gateway=gateway_ip,
cidr=subnet_cidr, mask_bits=mask_bits)
@classmethod
def _get_gateway_from_tempest_conf(cls, ip_version):
"""Return first subnet gateway for configured CIDR """
if ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
mask_bits = CONF.network.tenant_network_mask_bits
elif ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
mask_bits = CONF.network.tenant_network_v6_mask_bits
if mask_bits >= cidr.prefixlen:
return netaddr.IPAddress(cidr) + 1
else:
for subnet in cidr.subnet(mask_bits):
return netaddr.IPAddress(subnet) + 1
@classmethod
def _get_allocation_pools_from_gateway(cls, ip_version):
"""Return allocation range for subnet of given gateway"""
gateway = cls._get_gateway_from_tempest_conf(ip_version)
return [{'start': str(gateway + 2), 'end': str(gateway + 3)}]
def subnet_dict(self, include_keys):
"""Return a subnet dict which has include_keys and their corresponding
value from self._subnet_data
"""
return dict((key, self._subnet_data[self._ip_version][key])
for key in include_keys)
def _compare_resource_attrs(self, actual, expected):
exclude_keys = set(actual).symmetric_difference(expected)
self.assertThat(actual, custom_matchers.MatchesDictExceptForKeys(
expected, exclude_keys))
def _delete_network(self, network):
# Deleting network also deletes its subnets if exists
self.client.delete_network(network['id'])
if network in self.networks:
self.networks.remove(network)
for subnet in self.subnets:
if subnet['network_id'] == network['id']:
self.subnets.remove(subnet)
def _create_verify_delete_subnet(self, cidr=None, mask_bits=None,
**kwargs):
network = self.create_network()
net_id = network['id']
gateway = kwargs.pop('gateway', None)
subnet = self.create_subnet(network, gateway, cidr, mask_bits,
**kwargs)
compare_args_full = dict(gateway_ip=gateway, cidr=cidr,
mask_bits=mask_bits, **kwargs)
compare_args = dict((k, v) for k, v in six.iteritems(compare_args_full)
if v is not None)
if 'dns_nameservers' in set(subnet).intersection(compare_args):
self.assertEqual(sorted(compare_args['dns_nameservers']),
sorted(subnet['dns_nameservers']))
del subnet['dns_nameservers'], compare_args['dns_nameservers']
self._compare_resource_attrs(subnet, compare_args)
self.client.delete_network(net_id)
self.networks.pop()
self.subnets.pop()
@test.attr(type='smoke')
@test.idempotent_id('0e269138-0da6-4efc-a46d-578161e7b221')
def test_create_update_delete_network_subnet(self):
# Create a network
name = data_utils.rand_name('network-')
network = self.create_network(network_name=name)
self.addCleanup(self._delete_network, network)
net_id = network['id']
self.assertEqual('ACTIVE', network['status'])
# Verify network update
new_name = "New_network"
body = self.client.update_network(net_id, name=new_name)
updated_net = body['network']
self.assertEqual(updated_net['name'], new_name)
# Find a cidr that is not in use yet and create a subnet with it
subnet = self.create_subnet(network)
subnet_id = subnet['id']
# Verify subnet update
new_name = "New_subnet"
body = self.client.update_subnet(subnet_id, name=new_name)
updated_subnet = body['subnet']
self.assertEqual(updated_subnet['name'], new_name)
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e') @test.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e')
@ -227,15 +67,6 @@ class NetworksTestJSON(base.BaseNetworkTest):
for field_name in fields: for field_name in fields:
self.assertEqual(network[field_name], self.network[field_name]) self.assertEqual(network[field_name], self.network[field_name])
@test.attr(type='smoke')
@test.idempotent_id('f7ffdeda-e200-4a7a-bcbe-05716e86bf43')
def test_list_networks(self):
# Verify the network exists in the list of all networks
body = self.client.list_networks()
networks = [network['id'] for network in body['networks']
if network['id'] == self.network['id']]
self.assertNotEmpty(networks, "Created network not found in the list")
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-ccc4-b1442640bbbb') @test.idempotent_id('c72c1c0c-2193-4aca-ccc4-b1442640bbbb')
def test_create_update_network_description(self): def test_create_update_network_description(self):
@ -267,176 +98,6 @@ class NetworksTestJSON(base.BaseNetworkTest):
for network in networks: for network in networks:
self.assertEqual(sorted(network.keys()), sorted(fields)) self.assertEqual(sorted(network.keys()), sorted(fields))
@test.attr(type='smoke')
@test.idempotent_id('bd635d81-6030-4dd1-b3b9-31ba0cfdf6cc')
def test_show_subnet(self):
# Verify the details of a subnet
body = self.client.show_subnet(self.subnet['id'])
subnet = body['subnet']
self.assertNotEmpty(subnet, "Subnet returned has no fields")
for key in ['id', 'cidr']:
self.assertIn(key, subnet)
self.assertEqual(subnet[key], self.subnet[key])
@test.attr(type='smoke')
@test.idempotent_id('270fff0b-8bfc-411f-a184-1e8fd35286f0')
def test_show_subnet_fields(self):
# Verify specific fields of a subnet
fields = ['id', 'network_id']
body = self.client.show_subnet(self.subnet['id'],
fields=fields)
subnet = body['subnet']
self.assertEqual(sorted(subnet.keys()), sorted(fields))
for field_name in fields:
self.assertEqual(subnet[field_name], self.subnet[field_name])
@test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640bbbb')
def test_create_update_subnet_description(self):
if not test.is_extension_enabled('standard-attr-description',
'network'):
msg = "standard-attr-description not enabled."
raise self.skipException(msg)
body = self.create_subnet(self.network, description='d1')
self.assertEqual('d1', body['description'])
sub_id = body['id']
body = self.client.list_subnets(id=sub_id)['subnets'][0]
self.assertEqual('d1', body['description'])
body = self.client.update_subnet(body['id'],
description='d2')
self.assertEqual('d2', body['subnet']['description'])
body = self.client.list_subnets(id=sub_id)['subnets'][0]
self.assertEqual('d2', body['description'])
@test.attr(type='smoke')
@test.idempotent_id('db68ba48-f4ea-49e9-81d1-e367f6d0b20a')
def test_list_subnets(self):
# Verify the subnet exists in the list of all subnets
body = self.client.list_subnets()
subnets = [subnet['id'] for subnet in body['subnets']
if subnet['id'] == self.subnet['id']]
self.assertNotEmpty(subnets, "Created subnet not found in the list")
@test.attr(type='smoke')
@test.idempotent_id('842589e3-9663-46b0-85e4-7f01273b0412')
def test_list_subnets_fields(self):
# Verify specific fields of subnets
fields = ['id', 'network_id']
body = self.client.list_subnets(fields=fields)
subnets = body['subnets']
self.assertNotEmpty(subnets, "Subnet list returned is empty")
for subnet in subnets:
self.assertEqual(sorted(subnet.keys()), sorted(fields))
def _try_delete_network(self, net_id):
# delete network, if it exists
try:
self.client.delete_network(net_id)
# if network is not found, this means it was deleted in the test
except lib_exc.NotFound:
pass
@test.attr(type='smoke')
@test.idempotent_id('f04f61a9-b7f3-4194-90b2-9bcf660d1bfe')
def test_delete_network_with_subnet(self):
# Creates a network
name = data_utils.rand_name('network-')
body = self.client.create_network(name=name)
network = body['network']
net_id = network['id']
self.addCleanup(self._try_delete_network, net_id)
# Find a cidr that is not in use yet and create a subnet with it
subnet = self.create_subnet(network)
subnet_id = subnet['id']
# Delete network while the subnet still exists
body = self.client.delete_network(net_id)
# Verify that the subnet got automatically deleted.
self.assertRaises(lib_exc.NotFound, self.client.show_subnet,
subnet_id)
# Since create_subnet adds the subnet to the delete list, and it is
# is actually deleted here - this will create and issue, hence remove
# it from the list.
self.subnets.pop()
@test.attr(type='smoke')
@test.idempotent_id('d2d596e2-8e76-47a9-ac51-d4648009f4d3')
def test_create_delete_subnet_without_gateway(self):
self._create_verify_delete_subnet()
@test.attr(type='smoke')
@test.idempotent_id('9393b468-186d-496d-aa36-732348cd76e7')
def test_create_delete_subnet_with_gw(self):
self._create_verify_delete_subnet(
**self.subnet_dict(['gateway']))
@test.attr(type='smoke')
@test.idempotent_id('bec949c4-3147-4ba6-af5f-cd2306118404')
def test_create_delete_subnet_with_allocation_pools(self):
self._create_verify_delete_subnet(
**self.subnet_dict(['allocation_pools']))
@test.attr(type='smoke')
@test.idempotent_id('8217a149-0c6c-4cfb-93db-0486f707d13f')
def test_create_delete_subnet_with_gw_and_allocation_pools(self):
self._create_verify_delete_subnet(**self.subnet_dict(
['gateway', 'allocation_pools']))
@test.attr(type='smoke')
@test.idempotent_id('d830de0a-be47-468f-8f02-1fd996118289')
def test_create_delete_subnet_with_host_routes_and_dns_nameservers(self):
self._create_verify_delete_subnet(
**self.subnet_dict(['host_routes', 'dns_nameservers']))
@test.attr(type='smoke')
@test.idempotent_id('94ce038d-ff0a-4a4c-a56b-09da3ca0b55d')
def test_create_delete_subnet_with_dhcp_enabled(self):
self._create_verify_delete_subnet(enable_dhcp=True)
@test.attr(type='smoke')
@test.idempotent_id('3d3852eb-3009-49ec-97ac-5ce83b73010a')
def test_update_subnet_gw_dns_host_routes_dhcp(self):
network = self.create_network()
self.addCleanup(self._delete_network, network)
subnet = self.create_subnet(
network, **self.subnet_dict(['gateway', 'host_routes',
'dns_nameservers',
'allocation_pools']))
subnet_id = subnet['id']
new_gateway = str(netaddr.IPAddress(
self._subnet_data[self._ip_version]['gateway']) + 1)
# Verify subnet update
new_host_routes = self._subnet_data[self._ip_version][
'new_host_routes']
new_dns_nameservers = self._subnet_data[self._ip_version][
'new_dns_nameservers']
kwargs = {'host_routes': new_host_routes,
'dns_nameservers': new_dns_nameservers,
'gateway_ip': new_gateway, 'enable_dhcp': True}
new_name = "New_subnet"
body = self.client.update_subnet(subnet_id, name=new_name,
**kwargs)
updated_subnet = body['subnet']
kwargs['name'] = new_name
self.assertEqual(sorted(updated_subnet['dns_nameservers']),
sorted(kwargs['dns_nameservers']))
del subnet['dns_nameservers'], kwargs['dns_nameservers']
self._compare_resource_attrs(updated_subnet, kwargs)
@test.attr(type='smoke')
@test.idempotent_id('a4d9ec4c-0306-4111-a75c-db01a709030b')
def test_create_delete_subnet_all_attributes(self):
self._create_verify_delete_subnet(
enable_dhcp=True,
**self.subnet_dict(['gateway', 'host_routes', 'dns_nameservers']))
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('af774677-42a9-4e4b-bb58-16fe6a5bc1ec') @test.idempotent_id('af774677-42a9-4e4b-bb58-16fe6a5bc1ec')
def test_external_network_visibility(self): def test_external_network_visibility(self):
@ -462,262 +123,3 @@ class NetworksTestJSON(base.BaseNetworkTest):
subnets = [sub['id'] for sub in body['subnets'] subnets = [sub['id'] for sub in body['subnets']
if sub['id'] in public_subnets_iter] if sub['id'] in public_subnets_iter]
self.assertEmpty(subnets, "Public subnets visible") self.assertEmpty(subnets, "Public subnets visible")
class BulkNetworkOpsTestJSON(base.BaseNetworkTest):
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
bulk network creation
bulk subnet creation
bulk port creation
list tenant's networks
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
tenant_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
tenant_network_mask_bits with the mask bits to be used to partition the
block defined by tenant-network_cidr
"""
def _delete_networks(self, created_networks):
for n in created_networks:
self.client.delete_network(n['id'])
# Asserting that the networks are not found in the list after deletion
body = self.client.list_networks()
networks_list = [network['id'] for network in body['networks']]
for n in created_networks:
self.assertNotIn(n['id'], networks_list)
def _delete_subnets(self, created_subnets):
for n in created_subnets:
self.client.delete_subnet(n['id'])
# Asserting that the subnets are not found in the list after deletion
body = self.client.list_subnets()
subnets_list = [subnet['id'] for subnet in body['subnets']]
for n in created_subnets:
self.assertNotIn(n['id'], subnets_list)
def _delete_ports(self, created_ports):
for n in created_ports:
self.client.delete_port(n['id'])
# Asserting that the ports are not found in the list after deletion
body = self.client.list_ports()
ports_list = [port['id'] for port in body['ports']]
for n in created_ports:
self.assertNotIn(n['id'], ports_list)
@test.attr(type='smoke')
@test.idempotent_id('d4f9024d-1e28-4fc1-a6b1-25dbc6fa11e2')
def test_bulk_create_delete_network(self):
# Creates 2 networks in one request
network_names = [data_utils.rand_name('network-'),
data_utils.rand_name('network-')]
body = self.client.create_bulk_network(network_names)
created_networks = body['networks']
self.addCleanup(self._delete_networks, created_networks)
# Asserting that the networks are found in the list after creation
body = self.client.list_networks()
networks_list = [network['id'] for network in body['networks']]
for n in created_networks:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], networks_list)
@test.attr(type='smoke')
@test.idempotent_id('8936533b-c0aa-4f29-8e53-6cc873aec489')
def test_bulk_create_delete_subnet(self):
networks = [self.create_network(), self.create_network()]
# Creates 2 subnets in one request
if self._ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
mask_bits = CONF.network.tenant_network_mask_bits
else:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
mask_bits = CONF.network.tenant_network_v6_mask_bits
cidrs = [subnet_cidr for subnet_cidr in cidr.subnet(mask_bits)]
names = [data_utils.rand_name('subnet-') for i in range(len(networks))]
subnets_list = []
for i in range(len(names)):
p1 = {
'network_id': networks[i]['id'],
'cidr': str(cidrs[(i)]),
'name': names[i],
'ip_version': self._ip_version
}
subnets_list.append(p1)
del subnets_list[1]['name']
body = self.client.create_bulk_subnet(subnets_list)
created_subnets = body['subnets']
self.addCleanup(self._delete_subnets, created_subnets)
# Asserting that the subnets are found in the list after creation
body = self.client.list_subnets()
subnets_list = [subnet['id'] for subnet in body['subnets']]
for n in created_subnets:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], subnets_list)
@test.attr(type='smoke')
@test.idempotent_id('48037ff2-e889-4c3b-b86a-8e3f34d2d060')
def test_bulk_create_delete_port(self):
networks = [self.create_network(), self.create_network()]
# Creates 2 ports in one request
names = [data_utils.rand_name('port-') for i in range(len(networks))]
port_list = []
state = [True, False]
for i in range(len(names)):
p1 = {
'network_id': networks[i]['id'],
'name': names[i],
'admin_state_up': state[i],
}
port_list.append(p1)
del port_list[1]['name']
body = self.client.create_bulk_port(port_list)
created_ports = body['ports']
self.addCleanup(self._delete_ports, created_ports)
# Asserting that the ports are found in the list after creation
body = self.client.list_ports()
ports_list = [port['id'] for port in body['ports']]
for n in created_ports:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], ports_list)
class BulkNetworkOpsIpV6TestJSON(BulkNetworkOpsTestJSON):
_ip_version = 6
class NetworksIpV6TestJSON(NetworksTestJSON):
_ip_version = 6
@test.attr(type='smoke')
@test.idempotent_id('e41a4888-65a6-418c-a095-f7c2ef4ad59a')
def test_create_delete_subnet_with_gw(self):
net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
gateway = str(netaddr.IPAddress(net.first + 2))
name = data_utils.rand_name('network-')
network = self.create_network(network_name=name)
subnet = self.create_subnet(network, gateway)
# Verifies Subnet GW in IPv6
self.assertEqual(subnet['gateway_ip'], gateway)
@test.attr(type='smoke')
@test.idempotent_id('ebb4fd95-524f-46af-83c1-0305b239338f')
def test_create_delete_subnet_with_default_gw(self):
net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
gateway_ip = str(netaddr.IPAddress(net.first + 1))
name = data_utils.rand_name('network-')
network = self.create_network(network_name=name)
subnet = self.create_subnet(network)
# Verifies Subnet GW in IPv6
self.assertEqual(subnet['gateway_ip'], gateway_ip)
@test.attr(type='smoke')
@test.idempotent_id('a9653883-b2a4-469b-8c3c-4518430a7e55')
def test_create_list_subnet_with_no_gw64_one_network(self):
name = data_utils.rand_name('network-')
network = self.create_network(name)
ipv6_gateway = self.subnet_dict(['gateway'])['gateway']
subnet1 = self.create_subnet(network,
ip_version=6,
gateway=ipv6_gateway)
self.assertEqual(netaddr.IPNetwork(subnet1['cidr']).version, 6,
'The created subnet is not IPv6')
subnet2 = self.create_subnet(network,
gateway=None,
ip_version=4)
self.assertEqual(netaddr.IPNetwork(subnet2['cidr']).version, 4,
'The created subnet is not IPv4')
# Verifies Subnet GW is set in IPv6
self.assertEqual(subnet1['gateway_ip'], ipv6_gateway)
# Verifies Subnet GW is None in IPv4
self.assertIsNone(subnet2['gateway_ip'])
# Verifies all 2 subnets in the same network
body = self.client.list_subnets()
subnets = [sub['id'] for sub in body['subnets']
if sub['network_id'] == network['id']]
test_subnet_ids = [sub['id'] for sub in (subnet1, subnet2)]
self.assertItemsEqual(subnets,
test_subnet_ids,
'Subnet are not in the same network')
class NetworksIpV6TestAttrs(NetworksIpV6TestJSON):
@classmethod
def resource_setup(cls):
if not CONF.network_feature_enabled.ipv6_subnet_attributes:
raise cls.skipException("IPv6 extended attributes for "
"subnets not available")
super(NetworksIpV6TestAttrs, cls).resource_setup()
@test.attr(type='smoke')
@test.idempotent_id('da40cd1b-a833-4354-9a85-cd9b8a3b74ca')
def test_create_delete_subnet_with_v6_attributes_stateful(self):
self._create_verify_delete_subnet(
gateway=self._subnet_data[self._ip_version]['gateway'],
ipv6_ra_mode='dhcpv6-stateful',
ipv6_address_mode='dhcpv6-stateful')
@test.attr(type='smoke')
@test.idempotent_id('176b030f-a923-4040-a755-9dc94329e60c')
def test_create_delete_subnet_with_v6_attributes_slaac(self):
self._create_verify_delete_subnet(
ipv6_ra_mode='slaac',
ipv6_address_mode='slaac')
@test.attr(type='smoke')
@test.idempotent_id('7d410310-8c86-4902-adf9-865d08e31adb')
def test_create_delete_subnet_with_v6_attributes_stateless(self):
self._create_verify_delete_subnet(
ipv6_ra_mode='dhcpv6-stateless',
ipv6_address_mode='dhcpv6-stateless')
def _test_delete_subnet_with_ports(self, mode):
"""Create subnet and delete it with existing ports"""
slaac_network = self.create_network()
subnet_slaac = self.create_subnet(slaac_network,
**{'ipv6_ra_mode': mode,
'ipv6_address_mode': mode})
port = self.create_port(slaac_network)
self.assertIsNotNone(port['fixed_ips'][0]['ip_address'])
self.client.delete_subnet(subnet_slaac['id'])
self.subnets.pop()
subnets = self.client.list_subnets()
subnet_ids = [subnet['id'] for subnet in subnets['subnets']]
self.assertNotIn(subnet_slaac['id'], subnet_ids,
"Subnet wasn't deleted")
self.assertRaisesRegexp(
lib_exc.Conflict,
"There are one or more ports still in use on the network",
self.client.delete_network,
slaac_network['id'])
@test.attr(type='smoke')
@test.idempotent_id('88554555-ebf8-41ef-9300-4926d45e06e9')
def test_create_delete_slaac_subnet_with_ports(self):
"""Test deleting subnet with SLAAC ports
Create subnet with SLAAC, create ports in network
and then you shall be able to delete subnet without port
deletion. But you still can not delete the network.
"""
self._test_delete_subnet_with_ports("slaac")
@test.attr(type='smoke')
@test.idempotent_id('2de6ab5a-fcf0-4144-9813-f91a940291f1')
def test_create_delete_stateless_subnet_with_ports(self):
"""Test deleting subnet with DHCPv6 stateless ports
Create subnet with DHCPv6 stateless, create ports in network
and then you shall be able to delete subnet without port
deletion. But you still can not delete the network.
"""
self._test_delete_subnet_with_ports("dhcpv6-stateless")

View File

@ -1,59 +0,0 @@
# Copyright 2013 Huawei Technologies Co.,LTD.
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from tempest import test
from neutron.tests.api import base
class NetworksNegativeTestJSON(base.BaseNetworkTest):
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('9293e937-824d-42d2-8d5b-e985ea67002a')
def test_show_non_existent_network(self):
non_exist_id = data_utils.rand_name('network')
self.assertRaises(lib_exc.NotFound, self.client.show_network,
non_exist_id)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('d746b40c-5e09-4043-99f7-cba1be8b70df')
def test_show_non_existent_subnet(self):
non_exist_id = data_utils.rand_name('subnet')
self.assertRaises(lib_exc.NotFound, self.client.show_subnet,
non_exist_id)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('a954861d-cbfd-44e8-b0a9-7fab111f235d')
def test_show_non_existent_port(self):
non_exist_id = data_utils.rand_name('port')
self.assertRaises(lib_exc.NotFound, self.client.show_port,
non_exist_id)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('98bfe4e3-574e-4012-8b17-b2647063de87')
def test_update_non_existent_network(self):
non_exist_id = data_utils.rand_name('network')
self.assertRaises(lib_exc.NotFound, self.client.update_network,
non_exist_id, name="new_name")
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('03795047-4a94-4120-a0a1-bd376e36fd4e')
def test_delete_non_existent_network(self):
non_exist_id = data_utils.rand_name('network')
self.assertRaises(lib_exc.NotFound, self.client.delete_network,
non_exist_id)

View File

@ -13,61 +13,17 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import socket
import netaddr
from tempest.common import custom_matchers
from tempest.lib.common.utils import data_utils
from tempest import test from tempest import test
from neutron.tests.api import base from neutron.tests.api import base
from neutron.tests.api import base_security_groups as sec_base
from neutron.tests.tempest import config
CONF = config.CONF
class PortsTestJSON(sec_base.BaseSecGroupTest): class PortsTestJSON(base.BaseNetworkTest):
"""
Test the following operations for ports:
port create
port delete
port list
port show
port update
"""
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(PortsTestJSON, cls).resource_setup() super(PortsTestJSON, cls).resource_setup()
cls.network = cls.create_network() cls.network = cls.create_network()
cls.port = cls.create_port(cls.network)
def _delete_port(self, port_id):
self.client.delete_port(port_id)
body = self.client.list_ports()
ports_list = body['ports']
self.assertNotIn(port_id, [n['id'] for n in ports_list])
@test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-aaa4-b1442640f51c')
def test_create_update_delete_port(self):
# Verify port creation
body = self.client.create_port(network_id=self.network['id'])
port = body['port']
# Schedule port deletion with verification upon test completion
self.addCleanup(self._delete_port, port['id'])
self.assertTrue(port['admin_state_up'])
# Verify port update
new_name = "New_Port"
body = self.client.update_port(port['id'],
name=new_name,
admin_state_up=False)
updated_port = body['port']
self.assertEqual(updated_port['name'], new_name)
self.assertFalse(updated_port['admin_state_up'])
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-bbb4-b1442640bbbb') @test.idempotent_id('c72c1c0c-2193-4aca-bbb4-b1442640bbbb')
@ -86,333 +42,3 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
self.assertEqual('d2', body['port']['description']) self.assertEqual('d2', body['port']['description'])
body = self.client.list_ports(id=body['port']['id'])['ports'][0] body = self.client.list_ports(id=body['port']['id'])['ports'][0]
self.assertEqual('d2', body['description']) self.assertEqual('d2', body['description'])
@test.idempotent_id('67f1b811-f8db-43e2-86bd-72c074d4a42c')
def test_create_bulk_port(self):
network1 = self.network
name = data_utils.rand_name('network-')
network2 = self.create_network(network_name=name)
network_list = [network1['id'], network2['id']]
port_list = [{'network_id': net_id} for net_id in network_list]
body = self.client.create_bulk_port(port_list)
created_ports = body['ports']
port1 = created_ports[0]
port2 = created_ports[1]
self.addCleanup(self._delete_port, port1['id'])
self.addCleanup(self._delete_port, port2['id'])
self.assertEqual(port1['network_id'], network1['id'])
self.assertEqual(port2['network_id'], network2['id'])
self.assertTrue(port1['admin_state_up'])
self.assertTrue(port2['admin_state_up'])
@classmethod
def _get_ipaddress_from_tempest_conf(cls):
"""Return first subnet gateway for configured CIDR """
if cls._ip_version == 4:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
elif cls._ip_version == 6:
cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
return netaddr.IPAddress(cidr)
@test.attr(type='smoke')
@test.idempotent_id('0435f278-40ae-48cb-a404-b8a087bc09b1')
def test_create_port_in_allowed_allocation_pools(self):
network = self.create_network()
net_id = network['id']
address = self._get_ipaddress_from_tempest_conf()
allocation_pools = {'allocation_pools': [{'start': str(address + 4),
'end': str(address + 6)}]}
subnet = self.create_subnet(network, **allocation_pools)
self.addCleanup(self.client.delete_subnet, subnet['id'])
body = self.client.create_port(network_id=net_id)
self.addCleanup(self.client.delete_port, body['port']['id'])
port = body['port']
ip_address = port['fixed_ips'][0]['ip_address']
start_ip_address = allocation_pools['allocation_pools'][0]['start']
end_ip_address = allocation_pools['allocation_pools'][0]['end']
ip_range = netaddr.IPRange(start_ip_address, end_ip_address)
self.assertIn(ip_address, ip_range)
@test.attr(type='smoke')
@test.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f')
def test_show_port(self):
# Verify the details of port
body = self.client.show_port(self.port['id'])
port = body['port']
self.assertIn('id', port)
# TODO(Santosh)- This is a temporary workaround to compare create_port
# and show_port dict elements.Remove this once extra_dhcp_opts issue
# gets fixed in neutron.( bug - 1365341.)
self.assertThat(self.port,
custom_matchers.MatchesDictExceptForKeys
(port, excluded_keys=['extra_dhcp_opts']))
@test.attr(type='smoke')
@test.idempotent_id('45fcdaf2-dab0-4c13-ac6c-fcddfb579dbd')
def test_show_port_fields(self):
# Verify specific fields of a port
fields = ['id', 'mac_address']
body = self.client.show_port(self.port['id'],
fields=fields)
port = body['port']
self.assertEqual(sorted(port.keys()), sorted(fields))
for field_name in fields:
self.assertEqual(port[field_name], self.port[field_name])
@test.attr(type='smoke')
@test.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e')
def test_list_ports(self):
# Verify the port exists in the list of all ports
body = self.client.list_ports()
ports = [port['id'] for port in body['ports']
if port['id'] == self.port['id']]
self.assertNotEmpty(ports, "Created port not found in the list")
@test.attr(type='smoke')
@test.idempotent_id('5ad01ed0-0e6e-4c5d-8194-232801b15c72')
def test_port_list_filter_by_router_id(self):
# Create a router
network = self.create_network()
self.addCleanup(self.client.delete_network, network['id'])
subnet = self.create_subnet(network)
self.addCleanup(self.client.delete_subnet, subnet['id'])
router = self.create_router(data_utils.rand_name('router-'))
self.addCleanup(self.client.delete_router, router['id'])
port = self.client.create_port(network_id=network['id'])
# Add router interface to port created above
self.client.add_router_interface_with_port_id(
router['id'], port['port']['id'])
self.addCleanup(self.client.remove_router_interface_with_port_id,
router['id'], port['port']['id'])
# List ports filtered by router_id
port_list = self.client.list_ports(device_id=router['id'])
ports = port_list['ports']
self.assertEqual(len(ports), 1)
self.assertEqual(ports[0]['id'], port['port']['id'])
self.assertEqual(ports[0]['device_id'], router['id'])
@test.attr(type='smoke')
@test.idempotent_id('ff7f117f-f034-4e0e-abff-ccef05c454b4')
def test_list_ports_fields(self):
# Verify specific fields of ports
fields = ['id', 'mac_address']
body = self.client.list_ports(fields=fields)
ports = body['ports']
self.assertNotEmpty(ports, "Port list returned is empty")
# Asserting the fields returned are correct
for port in ports:
self.assertEqual(sorted(fields), sorted(port.keys()))
@test.attr(type='smoke')
@test.idempotent_id('63aeadd4-3b49-427f-a3b1-19ca81f06270')
def test_create_update_port_with_second_ip(self):
# Create a network with two subnets
network = self.create_network()
self.addCleanup(self.client.delete_network, network['id'])
subnet_1 = self.create_subnet(network)
self.addCleanup(self.client.delete_subnet, subnet_1['id'])
subnet_2 = self.create_subnet(network)
self.addCleanup(self.client.delete_subnet, subnet_2['id'])
fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
fixed_ips = fixed_ip_1 + fixed_ip_2
# Create a port with multiple IP addresses
port = self.create_port(network,
fixed_ips=fixed_ips)
self.addCleanup(self.client.delete_port, port['id'])
self.assertEqual(2, len(port['fixed_ips']))
check_fixed_ips = [subnet_1['id'], subnet_2['id']]
for item in port['fixed_ips']:
self.assertIn(item['subnet_id'], check_fixed_ips)
# Update the port to return to a single IP address
port = self.update_port(port, fixed_ips=fixed_ip_1)
self.assertEqual(1, len(port['fixed_ips']))
# Update the port with a second IP address from second subnet
port = self.update_port(port, fixed_ips=fixed_ips)
self.assertEqual(2, len(port['fixed_ips']))
def _update_port_with_security_groups(self, security_groups_names):
subnet_1 = self.create_subnet(self.network)
self.addCleanup(self.client.delete_subnet, subnet_1['id'])
fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
security_groups_list = list()
for name in security_groups_names:
group_create_body = self.client.create_security_group(
name=name)
self.addCleanup(self.client.delete_security_group,
group_create_body['security_group']['id'])
security_groups_list.append(group_create_body['security_group']
['id'])
# Create a port
sec_grp_name = data_utils.rand_name('secgroup')
security_group = self.client.create_security_group(name=sec_grp_name)
self.addCleanup(self.client.delete_security_group,
security_group['security_group']['id'])
post_body = {
"name": data_utils.rand_name('port-'),
"security_groups": [security_group['security_group']['id']],
"network_id": self.network['id'],
"admin_state_up": True,
"fixed_ips": fixed_ip_1}
body = self.client.create_port(**post_body)
self.addCleanup(self.client.delete_port, body['port']['id'])
port = body['port']
# Update the port with security groups
subnet_2 = self.create_subnet(self.network)
fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
update_body = {"name": data_utils.rand_name('port-'),
"admin_state_up": False,
"fixed_ips": fixed_ip_2,
"security_groups": security_groups_list}
body = self.client.update_port(port['id'], **update_body)
port_show = body['port']
# Verify the security groups and other attributes updated to port
exclude_keys = set(port_show).symmetric_difference(update_body)
exclude_keys.add('fixed_ips')
exclude_keys.add('security_groups')
self.assertThat(port_show, custom_matchers.MatchesDictExceptForKeys(
update_body, exclude_keys))
self.assertEqual(fixed_ip_2[0]['subnet_id'],
port_show['fixed_ips'][0]['subnet_id'])
for security_group in security_groups_list:
self.assertIn(security_group, port_show['security_groups'])
@test.attr(type='smoke')
@test.idempotent_id('58091b66-4ff4-4cc1-a549-05d60c7acd1a')
def test_update_port_with_security_group_and_extra_attributes(self):
self._update_port_with_security_groups(
[data_utils.rand_name('secgroup')])
@test.attr(type='smoke')
@test.idempotent_id('edf6766d-3d40-4621-bc6e-2521a44c257d')
def test_update_port_with_two_security_groups_and_extra_attributes(self):
self._update_port_with_security_groups(
[data_utils.rand_name('secgroup'),
data_utils.rand_name('secgroup')])
@test.attr(type='smoke')
@test.idempotent_id('13e95171-6cbd-489c-9d7c-3f9c58215c18')
def test_create_show_delete_port_user_defined_mac(self):
# Create a port for a legal mac
body = self.client.create_port(network_id=self.network['id'])
old_port = body['port']
free_mac_address = old_port['mac_address']
self.client.delete_port(old_port['id'])
# Create a new port with user defined mac
body = self.client.create_port(network_id=self.network['id'],
mac_address=free_mac_address)
self.addCleanup(self.client.delete_port, body['port']['id'])
port = body['port']
body = self.client.show_port(port['id'])
show_port = body['port']
self.assertEqual(free_mac_address,
show_port['mac_address'])
@test.attr(type='smoke')
@test.idempotent_id('4179dcb9-1382-4ced-84fe-1b91c54f5735')
def test_create_port_with_no_securitygroups(self):
network = self.create_network()
self.addCleanup(self.client.delete_network, network['id'])
subnet = self.create_subnet(network)
self.addCleanup(self.client.delete_subnet, subnet['id'])
port = self.create_port(network, security_groups=[])
self.addCleanup(self.client.delete_port, port['id'])
self.assertIsNotNone(port['security_groups'])
self.assertEmpty(port['security_groups'])
class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest):
@classmethod
def resource_setup(cls):
super(PortsAdminExtendedAttrsTestJSON, cls).resource_setup()
cls.network = cls.create_network()
cls.host_id = socket.gethostname()
@test.attr(type='smoke')
@test.idempotent_id('8e8569c1-9ac7-44db-8bc1-f5fb2814f29b')
def test_create_port_binding_ext_attr(self):
post_body = {"network_id": self.network['id'],
"binding:host_id": self.host_id}
body = self.admin_client.create_port(**post_body)
port = body['port']
self.addCleanup(self.admin_client.delete_port, port['id'])
host_id = port['binding:host_id']
self.assertIsNotNone(host_id)
self.assertEqual(self.host_id, host_id)
@test.attr(type='smoke')
@test.idempotent_id('6f6c412c-711f-444d-8502-0ac30fbf5dd5')
def test_update_port_binding_ext_attr(self):
post_body = {"network_id": self.network['id']}
body = self.admin_client.create_port(**post_body)
port = body['port']
self.addCleanup(self.admin_client.delete_port, port['id'])
update_body = {"binding:host_id": self.host_id}
body = self.admin_client.update_port(port['id'], **update_body)
updated_port = body['port']
host_id = updated_port['binding:host_id']
self.assertIsNotNone(host_id)
self.assertEqual(self.host_id, host_id)
@test.attr(type='smoke')
@test.idempotent_id('1c82a44a-6c6e-48ff-89e1-abe7eaf8f9f8')
def test_list_ports_binding_ext_attr(self):
# Create a new port
post_body = {"network_id": self.network['id']}
body = self.admin_client.create_port(**post_body)
port = body['port']
self.addCleanup(self.admin_client.delete_port, port['id'])
# Update the port's binding attributes so that is now 'bound'
# to a host
update_body = {"binding:host_id": self.host_id}
self.admin_client.update_port(port['id'], **update_body)
# List all ports, ensure new port is part of list and its binding
# attributes are set and accurate
body = self.admin_client.list_ports()
ports_list = body['ports']
pids_list = [p['id'] for p in ports_list]
self.assertIn(port['id'], pids_list)
listed_port = [p for p in ports_list if p['id'] == port['id']]
self.assertEqual(1, len(listed_port),
'Multiple ports listed with id %s in ports listing: '
'%s' % (port['id'], ports_list))
self.assertEqual(self.host_id, listed_port[0]['binding:host_id'])
@test.attr(type='smoke')
@test.idempotent_id('b54ac0ff-35fc-4c79-9ca3-c7dbd4ea4f13')
def test_show_port_binding_ext_attr(self):
body = self.admin_client.create_port(network_id=self.network['id'])
port = body['port']
self.addCleanup(self.admin_client.delete_port, port['id'])
body = self.admin_client.show_port(port['id'])
show_port = body['port']
self.assertEqual(port['binding:host_id'],
show_port['binding:host_id'])
self.assertEqual(port['binding:vif_type'],
show_port['binding:vif_type'])
self.assertEqual(port['binding:vif_details'],
show_port['binding:vif_details'])
class PortsIpV6TestJSON(PortsTestJSON):
_ip_version = 6
_tenant_network_cidr = CONF.network.tenant_network_v6_cidr
_tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON):
_ip_version = 6
_tenant_network_cidr = CONF.network.tenant_network_v6_cidr
_tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits

View File

@ -40,45 +40,6 @@ class RoutersTest(base.BaseRouterTest):
if cls._ip_version == 4 else if cls._ip_version == 4 else
CONF.network.tenant_network_v6_cidr) CONF.network.tenant_network_v6_cidr)
@test.attr(type='smoke')
@test.idempotent_id('f64403e2-8483-4b34-8ccd-b09a87bcc68c')
def test_create_show_list_update_delete_router(self):
# Create a router
# NOTE(salv-orlando): Do not invoke self.create_router
# as we need to check the response code
name = data_utils.rand_name('router-')
create_body = self.client.create_router(
name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False)
self.addCleanup(self._delete_router, create_body['router']['id'])
self.assertEqual(create_body['router']['name'], name)
self.assertEqual(
create_body['router']['external_gateway_info']['network_id'],
CONF.network.public_network_id)
self.assertFalse(create_body['router']['admin_state_up'])
# Show details of the created router
show_body = self.client.show_router(create_body['router']['id'])
self.assertEqual(show_body['router']['name'], name)
self.assertEqual(
show_body['router']['external_gateway_info']['network_id'],
CONF.network.public_network_id)
self.assertFalse(show_body['router']['admin_state_up'])
# List routers and verify if created router is there in response
list_body = self.client.list_routers()
routers_list = list()
for router in list_body['routers']:
routers_list.append(router['id'])
self.assertIn(create_body['router']['id'], routers_list)
# Update the name of router and verify if it is updated
updated_name = 'updated ' + name
update_body = self.client.update_router(create_body['router']['id'],
name=updated_name)
self.assertEqual(update_body['router']['name'], updated_name)
show_body = self.client.show_router(
create_body['router']['id'])
self.assertEqual(show_body['router']['name'], updated_name)
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640eeee') @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640eeee')
def test_create_update_router_description(self): def test_create_update_router_description(self):
@ -95,24 +56,6 @@ class RoutersTest(base.BaseRouterTest):
body = self.client.show_router(body['router']['id'])['router'] body = self.client.show_router(body['router']['id'])['router']
self.assertEqual('d2', body['description']) self.assertEqual('d2', body['description'])
@test.attr(type='smoke')
@test.idempotent_id('e54dd3a3-4352-4921-b09d-44369ae17397')
def test_create_router_setting_tenant_id(self):
# Test creating router from admin user setting tenant_id.
test_tenant = data_utils.rand_name('test_tenant_')
test_description = data_utils.rand_name('desc_')
tenant = self.identity_admin_client.create_tenant(
name=test_tenant, description=test_description)['tenant']
tenant_id = tenant['id']
self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id)
name = data_utils.rand_name('router-')
create_body = self.admin_client.create_router(name,
tenant_id=tenant_id)
self.addCleanup(self.admin_client.delete_router,
create_body['router']['id'])
self.assertEqual(tenant_id, create_body['router']['tenant_id'])
@test.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8') @test.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8')
@test.requires_ext(extension='ext-gw-mode', service='network') @test.requires_ext(extension='ext-gw-mode', service='network')
@test.attr(type='smoke') @test.attr(type='smoke')
@ -144,46 +87,6 @@ class RoutersTest(base.BaseRouterTest):
self._verify_router_gateway(create_body['router']['id'], self._verify_router_gateway(create_body['router']['id'],
exp_ext_gw_info=external_gateway_info) exp_ext_gw_info=external_gateway_info)
@test.attr(type='smoke')
@test.idempotent_id('b42e6e39-2e37-49cc-a6f4-8467e940900a')
def test_add_remove_router_interface_with_subnet_id(self):
network = self.create_network()
subnet = self.create_subnet(network)
router = self._create_router(data_utils.rand_name('router-'))
# Add router interface with subnet id
interface = self.client.add_router_interface_with_subnet_id(
router['id'], subnet['id'])
self.addCleanup(self._remove_router_interface_with_subnet_id,
router['id'], subnet['id'])
self.assertIn('subnet_id', interface.keys())
self.assertIn('port_id', interface.keys())
# Verify router id is equal to device id in port details
show_port_body = self.client.show_port(
interface['port_id'])
self.assertEqual(show_port_body['port']['device_id'],
router['id'])
@test.attr(type='smoke')
@test.idempotent_id('2b7d2f37-6748-4d78-92e5-1d590234f0d5')
def test_add_remove_router_interface_with_port_id(self):
network = self.create_network()
self.create_subnet(network)
router = self._create_router(data_utils.rand_name('router-'))
port_body = self.client.create_port(
network_id=network['id'])
# add router interface to port created above
interface = self.client.add_router_interface_with_port_id(
router['id'], port_body['port']['id'])
self.addCleanup(self._remove_router_interface_with_port_id,
router['id'], port_body['port']['id'])
self.assertIn('subnet_id', interface.keys())
self.assertIn('port_id', interface.keys())
# Verify router id is equal to device id in port details
show_port_body = self.client.show_port(
interface['port_id'])
self.assertEqual(show_port_body['port']['device_id'],
router['id'])
def _verify_router_gateway(self, router_id, exp_ext_gw_info=None): def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
show_body = self.admin_client.show_router(router_id) show_body = self.admin_client.show_router(router_id)
actual_ext_gw_info = show_body['router']['external_gateway_info'] actual_ext_gw_info = show_body['router']['external_gateway_info']
@ -208,20 +111,6 @@ class RoutersTest(base.BaseRouterTest):
self.assertIn(public_subnet_id, self.assertIn(public_subnet_id,
[x['subnet_id'] for x in fixed_ips]) [x['subnet_id'] for x in fixed_ips])
@test.attr(type='smoke')
@test.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79')
def test_update_router_set_gateway(self):
router = self._create_router(data_utils.rand_name('router-'))
self.client.update_router(
router['id'],
external_gateway_info={
'network_id': CONF.network.public_network_id})
# Verify operation - router
self._verify_router_gateway(
router['id'],
{'network_id': CONF.network.public_network_id})
self._verify_gateway_port(router['id'])
@test.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33') @test.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33')
@test.requires_ext(extension='ext-gw-mode', service='network') @test.requires_ext(extension='ext-gw-mode', service='network')
@test.attr(type='smoke') @test.attr(type='smoke')
@ -254,20 +143,6 @@ class RoutersTest(base.BaseRouterTest):
'enable_snat': False}) 'enable_snat': False})
self._verify_gateway_port(router['id']) self._verify_gateway_port(router['id'])
@test.attr(type='smoke')
@test.idempotent_id('ad81b7ee-4f81-407b-a19c-17e623f763e8')
def test_update_router_unset_gateway(self):
router = self._create_router(
data_utils.rand_name('router-'),
external_network_id=CONF.network.public_network_id)
self.client.update_router(router['id'], external_gateway_info={})
self._verify_router_gateway(router['id'])
# No gateway port expected
list_body = self.admin_client.list_ports(
network_id=CONF.network.public_network_id,
device_id=router['id'])
self.assertFalse(list_body['ports'])
@test.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374') @test.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374')
@test.requires_ext(extension='ext-gw-mode', service='network') @test.requires_ext(extension='ext-gw-mode', service='network')
@test.attr(type='smoke') @test.attr(type='smoke')
@ -321,45 +196,6 @@ class RoutersTest(base.BaseRouterTest):
def _delete_extra_routes(self, router_id): def _delete_extra_routes(self, router_id):
self.client.delete_extra_routes(router_id) self.client.delete_extra_routes(router_id)
@test.attr(type='smoke')
@test.idempotent_id('a8902683-c788-4246-95c7-ad9c6d63a4d9')
def test_update_router_admin_state(self):
self.router = self._create_router(data_utils.rand_name('router-'))
self.assertFalse(self.router['admin_state_up'])
# Update router admin state
update_body = self.client.update_router(self.router['id'],
admin_state_up=True)
self.assertTrue(update_body['router']['admin_state_up'])
show_body = self.client.show_router(self.router['id'])
self.assertTrue(show_body['router']['admin_state_up'])
@test.attr(type='smoke')
@test.idempotent_id('802c73c9-c937-4cef-824b-2191e24a6aab')
def test_add_multiple_router_interfaces(self):
network01 = self.create_network(
network_name=data_utils.rand_name('router-network01-'))
network02 = self.create_network(
network_name=data_utils.rand_name('router-network02-'))
subnet01 = self.create_subnet(network01)
sub02_cidr = netaddr.IPNetwork(self.tenant_cidr).next()
subnet02 = self.create_subnet(network02, cidr=sub02_cidr)
router = self._create_router(data_utils.rand_name('router-'))
interface01 = self._add_router_interface_with_subnet_id(router['id'],
subnet01['id'])
self._verify_router_interface(router['id'], subnet01['id'],
interface01['port_id'])
interface02 = self._add_router_interface_with_subnet_id(router['id'],
subnet02['id'])
self._verify_router_interface(router['id'], subnet02['id'],
interface02['port_id'])
def _verify_router_interface(self, router_id, subnet_id, port_id):
show_port_body = self.client.show_port(port_id)
interface_port = show_port_body['port']
self.assertEqual(router_id, interface_port['device_id'])
self.assertEqual(subnet_id,
interface_port['fixed_ips'][0]['subnet_id'])
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('01f185d1-d1a6-4cf9-abf7-e0e1384c169c') @test.idempotent_id('01f185d1-d1a6-4cf9-abf7-e0e1384c169c')
def test_network_attached_with_two_routers(self): def test_network_attached_with_two_routers(self):

View File

@ -13,104 +13,12 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import netaddr
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
import testtools import testtools
from neutron.tests.api import base_routers as base from neutron.tests.api import base_routers as base
from neutron.tests.tempest import config
CONF = config.CONF
class RoutersNegativeTest(base.BaseRouterTest):
@classmethod
def resource_setup(cls):
super(RoutersNegativeTest, cls).resource_setup()
if not test.is_extension_enabled('router', 'network'):
msg = "router extension not enabled."
raise cls.skipException(msg)
cls.router = cls.create_router(data_utils.rand_name('router-'))
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.tenant_cidr = (CONF.network.tenant_network_cidr
if cls._ip_version == 4 else
CONF.network.tenant_network_v6_cidr)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('37a94fc0-a834-45b9-bd23-9a81d2fd1e22')
def test_router_add_gateway_invalid_network_returns_404(self):
self.assertRaises(lib_exc.NotFound,
self.client.update_router,
self.router['id'],
external_gateway_info={
'network_id': self.router['id']})
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('11836a18-0b15-4327-a50b-f0d9dc66bddd')
def test_router_add_gateway_net_not_external_returns_400(self):
alt_network = self.create_network(
network_name=data_utils.rand_name('router-negative-'))
sub_cidr = netaddr.IPNetwork(self.tenant_cidr).next()
self.create_subnet(alt_network, cidr=sub_cidr)
self.assertRaises(lib_exc.BadRequest,
self.client.update_router,
self.router['id'],
external_gateway_info={
'network_id': alt_network['id']})
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('957751a3-3c68-4fa2-93b6-eb52ea10db6e')
def test_add_router_interfaces_on_overlapping_subnets_returns_400(self):
network01 = self.create_network(
network_name=data_utils.rand_name('router-network01-'))
network02 = self.create_network(
network_name=data_utils.rand_name('router-network02-'))
subnet01 = self.create_subnet(network01)
subnet02 = self.create_subnet(network02)
self._add_router_interface_with_subnet_id(self.router['id'],
subnet01['id'])
self.assertRaises(lib_exc.BadRequest,
self._add_router_interface_with_subnet_id,
self.router['id'],
subnet02['id'])
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('04df80f9-224d-47f5-837a-bf23e33d1c20')
def test_router_remove_interface_in_use_returns_409(self):
self.client.add_router_interface_with_subnet_id(
self.router['id'], self.subnet['id'])
self.assertRaises(lib_exc.Conflict,
self.client.delete_router,
self.router['id'])
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('c2a70d72-8826-43a7-8208-0209e6360c47')
def test_show_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.show_router,
router)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('b23d1569-8b0c-4169-8d4b-6abd34fad5c7')
def test_update_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.update_router,
router, name="new_name")
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('c7edc5ad-d09d-41e6-a344-5c0c31e2e3e4')
def test_delete_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.delete_router,
router)
class RoutersNegativeIpV6Test(RoutersNegativeTest):
_ip_version = 6
class DvrRoutersNegativeTest(base.BaseRouterTest): class DvrRoutersNegativeTest(base.BaseRouterTest):

View File

@ -13,20 +13,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import six
from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import data_utils
from tempest import test from tempest import test
from neutron.tests.api import base_security_groups as base from neutron.tests.api import base_security_groups as base
from neutron.tests.tempest import config
CONF = config.CONF
class SecGroupTest(base.BaseSecGroupTest): class SecGroupTest(base.BaseSecGroupTest):
_tenant_network_cidr = CONF.network.tenant_network_cidr
@classmethod @classmethod
def resource_setup(cls): def resource_setup(cls):
super(SecGroupTest, cls).resource_setup() super(SecGroupTest, cls).resource_setup()
@ -34,53 +28,6 @@ class SecGroupTest(base.BaseSecGroupTest):
msg = "security-group extension not enabled." msg = "security-group extension not enabled."
raise cls.skipException(msg) raise cls.skipException(msg)
def _create_verify_security_group_rule(self, sg_id, direction,
ethertype, protocol,
port_range_min,
port_range_max,
remote_group_id=None,
remote_ip_prefix=None):
# Create Security Group rule with the input params and validate
# that SG rule is created with the same parameters.
rule_create_body = self.client.create_security_group_rule(
security_group_id=sg_id,
direction=direction,
ethertype=ethertype,
protocol=protocol,
port_range_min=port_range_min,
port_range_max=port_range_max,
remote_group_id=remote_group_id,
remote_ip_prefix=remote_ip_prefix
)
sec_group_rule = rule_create_body['security_group_rule']
self.addCleanup(self._delete_security_group_rule,
sec_group_rule['id'])
expected = {'direction': direction, 'protocol': protocol,
'ethertype': ethertype, 'port_range_min': port_range_min,
'port_range_max': port_range_max,
'remote_group_id': remote_group_id,
'remote_ip_prefix': remote_ip_prefix}
for key, value in six.iteritems(expected):
self.assertEqual(value, sec_group_rule[key],
"Field %s of the created security group "
"rule does not match with %s." %
(key, value))
@test.attr(type='smoke')
@test.idempotent_id('e30abd17-fef9-4739-8617-dc26da88e686')
def test_list_security_groups(self):
# Verify the that security group belonging to tenant exist in list
body = self.client.list_security_groups()
security_groups = body['security_groups']
found = None
for n in security_groups:
if (n['name'] == 'default'):
found = n['id']
msg = "Security-group list doesn't contain default security-group"
self.assertIsNotNone(found, msg)
@test.attr(type='smoke') @test.attr(type='smoke')
@test.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802') @test.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
def test_create_list_update_show_delete_security_group(self): def test_create_list_update_show_delete_security_group(self):
@ -109,153 +56,3 @@ class SecGroupTest(base.BaseSecGroupTest):
self.assertEqual(show_body['security_group']['name'], new_name) self.assertEqual(show_body['security_group']['name'], new_name)
self.assertEqual(show_body['security_group']['description'], self.assertEqual(show_body['security_group']['description'],
new_description) new_description)
@test.attr(type='smoke')
@test.idempotent_id('cfb99e0e-7410-4a3d-8a0c-959a63ee77e9')
def test_create_show_delete_security_group_rule(self):
group_create_body, _ = self._create_security_group()
# Create rules for each protocol
protocols = ['tcp', 'udp', 'icmp']
for protocol in protocols:
rule_create_body = self.client.create_security_group_rule(
security_group_id=group_create_body['security_group']['id'],
protocol=protocol,
direction='ingress',
ethertype=self.ethertype
)
# Show details of the created security rule
show_rule_body = self.client.show_security_group_rule(
rule_create_body['security_group_rule']['id']
)
create_dict = rule_create_body['security_group_rule']
for key, value in six.iteritems(create_dict):
self.assertEqual(value,
show_rule_body['security_group_rule'][key],
"%s does not match." % key)
# List rules and verify created rule is in response
rule_list_body = self.client.list_security_group_rules()
rule_list = [rule['id']
for rule in rule_list_body['security_group_rules']]
self.assertIn(rule_create_body['security_group_rule']['id'],
rule_list)
@test.attr(type='smoke')
@test.idempotent_id('c72c1c0c-2193-4aca-fff2-b1442640bbbb')
def test_create_security_group_rule_description(self):
if not test.is_extension_enabled('standard-attr-description',
'network'):
msg = "standard-attr-description not enabled."
raise self.skipException(msg)
sg = self._create_security_group()[0]['security_group']
rule = self.client.create_security_group_rule(
security_group_id=sg['id'], protocol='tcp',
direction='ingress', ethertype=self.ethertype,
description='d1'
)['security_group_rule']
self.assertEqual('d1', rule['description'])
body = self.client.show_security_group_rule(rule['id'])
self.assertEqual('d1', body['security_group_rule']['description'])
@test.attr(type='smoke')
@test.idempotent_id('87dfbcf9-1849-43ea-b1e4-efa3eeae9f71')
def test_create_security_group_rule_with_additional_args(self):
"""Verify security group rule with additional arguments works.
direction:ingress, ethertype:[IPv4/IPv6],
protocol:tcp, port_range_min:77, port_range_max:77
"""
group_create_body, _ = self._create_security_group()
sg_id = group_create_body['security_group']['id']
direction = 'ingress'
protocol = 'tcp'
port_range_min = 77
port_range_max = 77
self._create_verify_security_group_rule(sg_id, direction,
self.ethertype, protocol,
port_range_min,
port_range_max)
@test.attr(type='smoke')
@test.idempotent_id('c9463db8-b44d-4f52-b6c0-8dbda99f26ce')
def test_create_security_group_rule_with_icmp_type_code(self):
"""Verify security group rule for icmp protocol works.
Specify icmp type (port_range_min) and icmp code
(port_range_max) with different values. A separate testcase
is added for icmp protocol as icmp validation would be
different from tcp/udp.
"""
group_create_body, _ = self._create_security_group()
sg_id = group_create_body['security_group']['id']
direction = 'ingress'
protocol = 'icmp'
icmp_type_codes = [(3, 2), (3, 0), (8, 0), (0, 0), (11, None)]
for icmp_type, icmp_code in icmp_type_codes:
self._create_verify_security_group_rule(sg_id, direction,
self.ethertype, protocol,
icmp_type, icmp_code)
@test.attr(type='smoke')
@test.idempotent_id('c2ed2deb-7a0c-44d8-8b4c-a5825b5c310b')
def test_create_security_group_rule_with_remote_group_id(self):
# Verify creating security group rule with remote_group_id works
sg1_body, _ = self._create_security_group()
sg2_body, _ = self._create_security_group()
sg_id = sg1_body['security_group']['id']
direction = 'ingress'
protocol = 'udp'
port_range_min = 50
port_range_max = 55
remote_id = sg2_body['security_group']['id']
self._create_verify_security_group_rule(sg_id, direction,
self.ethertype, protocol,
port_range_min,
port_range_max,
remote_group_id=remote_id)
@test.attr(type='smoke')
@test.idempotent_id('16459776-5da2-4634-bce4-4b55ee3ec188')
def test_create_security_group_rule_with_remote_ip_prefix(self):
# Verify creating security group rule with remote_ip_prefix works
sg1_body, _ = self._create_security_group()
sg_id = sg1_body['security_group']['id']
direction = 'ingress'
protocol = 'tcp'
port_range_min = 76
port_range_max = 77
ip_prefix = self._tenant_network_cidr
self._create_verify_security_group_rule(sg_id, direction,
self.ethertype, protocol,
port_range_min,
port_range_max,
remote_ip_prefix=ip_prefix)
@test.attr(type='smoke')
@test.idempotent_id('0a307599-6655-4220-bebc-fd70c64f2290')
def test_create_security_group_rule_with_protocol_integer_value(self):
# Verify creating security group rule with the
# protocol as integer value
# arguments : "protocol": 17
group_create_body, _ = self._create_security_group()
direction = 'ingress'
protocol = 17
security_group_id = group_create_body['security_group']['id']
rule_create_body = self.client.create_security_group_rule(
security_group_id=security_group_id,
direction=direction,
protocol=protocol
)
sec_group_rule = rule_create_body['security_group_rule']
self.assertEqual(sec_group_rule['direction'], direction)
self.assertEqual(int(sec_group_rule['protocol']), protocol)
class SecGroupIPv6Test(SecGroupTest):
_ip_version = 6
_tenant_network_cidr = CONF.network.tenant_network_v6_cidr

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import uuid
from tempest.lib import exceptions as lib_exc from tempest.lib import exceptions as lib_exc
from tempest import test from tempest import test
@ -35,98 +33,6 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
msg = "security-group extension not enabled." msg = "security-group extension not enabled."
raise cls.skipException(msg) raise cls.skipException(msg)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('424fd5c3-9ddc-486a-b45f-39bf0c820fc6')
def test_show_non_existent_security_group(self):
non_exist_id = str(uuid.uuid4())
self.assertRaises(lib_exc.NotFound, self.client.show_security_group,
non_exist_id)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('4c094c09-000b-4e41-8100-9617600c02a6')
def test_show_non_existent_security_group_rule(self):
non_exist_id = str(uuid.uuid4())
self.assertRaises(lib_exc.NotFound,
self.client.show_security_group_rule,
non_exist_id)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('1f1bb89d-5664-4956-9fcd-83ee0fa603df')
def test_delete_non_existent_security_group(self):
non_exist_id = str(uuid.uuid4())
self.assertRaises(lib_exc.NotFound,
self.client.delete_security_group,
non_exist_id
)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('981bdc22-ce48-41ed-900a-73148b583958')
def test_create_security_group_rule_with_bad_protocol(self):
group_create_body, _ = self._create_security_group()
# Create rule with bad protocol name
pname = 'bad_protocol_name'
self.assertRaises(
lib_exc.BadRequest, self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol=pname, direction='ingress', ethertype=self.ethertype)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('5f8daf69-3c5f-4aaa-88c9-db1d66f68679')
def test_create_security_group_rule_with_bad_remote_ip_prefix(self):
group_create_body, _ = self._create_security_group()
# Create rule with bad remote_ip_prefix
prefix = ['192.168.1./24', '192.168.1.1/33', 'bad_prefix', '256']
for remote_ip_prefix in prefix:
self.assertRaises(
lib_exc.BadRequest, self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol='tcp', direction='ingress', ethertype=self.ethertype,
remote_ip_prefix=remote_ip_prefix)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('4bf786fd-2f02-443c-9716-5b98e159a49a')
def test_create_security_group_rule_with_non_existent_remote_groupid(self):
group_create_body, _ = self._create_security_group()
non_exist_id = str(uuid.uuid4())
# Create rule with non existent remote_group_id
group_ids = ['bad_group_id', non_exist_id]
for remote_group_id in group_ids:
self.assertRaises(
lib_exc.NotFound, self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol='tcp', direction='ingress', ethertype=self.ethertype,
remote_group_id=remote_group_id)
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('b5c4b247-6b02-435b-b088-d10d45650881')
def test_create_security_group_rule_with_remote_ip_and_group(self):
sg1_body, _ = self._create_security_group()
sg2_body, _ = self._create_security_group()
# Create rule specifying both remote_ip_prefix and remote_group_id
prefix = self._tenant_network_cidr
self.assertRaises(
lib_exc.BadRequest, self.client.create_security_group_rule,
security_group_id=sg1_body['security_group']['id'],
protocol='tcp', direction='ingress',
ethertype=self.ethertype, remote_ip_prefix=prefix,
remote_group_id=sg2_body['security_group']['id'])
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('5666968c-fff3-40d6-9efc-df1c8bd01abb')
def test_create_security_group_rule_with_bad_ethertype(self):
group_create_body, _ = self._create_security_group()
# Create rule with bad ethertype
ethertype = 'bad_ethertype'
self.assertRaises(
lib_exc.BadRequest, self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol='udp', direction='ingress', ethertype=ethertype)
@test.attr(type=['negative', 'gate']) @test.attr(type=['negative', 'gate'])
@test.idempotent_id('0d9c7791-f2ad-4e2f-ac73-abf2373b0d2d') @test.idempotent_id('0d9c7791-f2ad-4e2f-ac73-abf2373b0d2d')
def test_create_security_group_rule_with_invalid_ports(self): def test_create_security_group_rule_with_invalid_ports(self):
@ -159,50 +65,6 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
direction='ingress', ethertype=self.ethertype) direction='ingress', ethertype=self.ethertype)
self.assertIn(msg, str(ex)) self.assertIn(msg, str(ex))
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('2323061e-9fbf-4eb0-b547-7e8fafc90849')
def test_create_additional_default_security_group_fails(self):
# Create security group named 'default', it should be failed.
name = 'default'
self.assertRaises(lib_exc.Conflict,
self.client.create_security_group,
name=name)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('8fde898f-ce88-493b-adc9-4e4692879fc5')
def test_create_duplicate_security_group_rule_fails(self):
# Create duplicate security group rule, it should fail.
body, _ = self._create_security_group()
min_port = 66
max_port = 67
# Create a rule with valid params
self.client.create_security_group_rule(
security_group_id=body['security_group']['id'],
direction='ingress',
ethertype=self.ethertype,
protocol='tcp',
port_range_min=min_port,
port_range_max=max_port
)
# Try creating the same security group rule, it should fail
self.assertRaises(
lib_exc.Conflict, self.client.create_security_group_rule,
security_group_id=body['security_group']['id'],
protocol='tcp', direction='ingress', ethertype=self.ethertype,
port_range_min=min_port, port_range_max=max_port)
@test.attr(type=['negative', 'smoke'])
@test.idempotent_id('be308db6-a7cf-4d5c-9baf-71bafd73f35e')
def test_create_security_group_rule_with_non_existent_security_group(self):
# Create security group rules with not existing security group.
non_existent_sg = str(uuid.uuid4())
self.assertRaises(lib_exc.NotFound,
self.client.create_security_group_rule,
security_group_id=non_existent_sg,
direction='ingress', ethertype=self.ethertype)
@test.attr(type=['negative', 'smoke']) @test.attr(type=['negative', 'smoke'])
@test.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c') @test.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c')
def test_update_default_security_group_name(self): def test_update_default_security_group_name(self):
@ -215,23 +77,3 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
class NegativeSecGroupIPv6Test(NegativeSecGroupTest): class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
_ip_version = 6 _ip_version = 6
_tenant_network_cidr = CONF.network.tenant_network_v6_cidr _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
@test.attr(type=['negative', 'gate'])
@test.idempotent_id('7607439c-af73-499e-bf64-f687fd12a842')
def test_create_security_group_rule_wrong_ip_prefix_version(self):
group_create_body, _ = self._create_security_group()
# Create rule with bad remote_ip_prefix
pairs = ({'ethertype': 'IPv6',
'ip_prefix': CONF.network.tenant_network_cidr},
{'ethertype': 'IPv4',
'ip_prefix': CONF.network.tenant_network_v6_cidr})
for pair in pairs:
self.assertRaisesRegexp(
lib_exc.BadRequest,
"Conflicting value ethertype",
self.client.create_security_group_rule,
security_group_id=group_create_body['security_group']['id'],
protocol='tcp', direction='ingress',
ethertype=pair['ethertype'],
remote_ip_prefix=pair['ip_prefix'])