From ddd5df95266cf872a852577d16d9c5d188b4e13a Mon Sep 17 00:00:00 2001 From: Assaf Muller Date: Thu, 3 Mar 2016 17:19:13 -0500 Subject: [PATCH] Delete 118~ API tests from Neutron Deleted tests that duplicate tests from Tempest, according to the new criteria specified in TESTING.rst. Follow up steps are detailed here: https://etherpad.openstack.org/p/neutron-tempest-defork For reviewers, here's how to get a complete list of network tests from Tempest. From a Tempest directory, execute: testr list-tests tempest.api.network | cut -d"[" -f1 | cut -d"." -f4- I verified that every test removed here actually exists in that list. Here's a list of patches that modified tests after the fork: https://etherpad.openstack.org/p/neutron-tempest-defork-patches-since-initial-sync And the list of tests we care about: https://etherpad.openstack.org/p/neutron-tempest-defork, Ctrl-F for: 'Tests that should be synced from Neutron' Related-bug: #1552960 Change-Id: I685291058f221a7ef0b5b7485280cdabceaa4af3 --- TESTING.rst | 28 +- .../admin/test_external_network_extension.py | 111 ---- .../admin/test_external_networks_negative.py | 54 -- .../admin/test_floating_ips_admin_actions.py | 69 -- neutron/tests/api/base_routers.py | 18 - neutron/tests/api/base_security_groups.py | 10 - neutron/tests/api/test_dhcp_ipv6.py | 310 --------- neutron/tests/api/test_extensions.py | 74 --- neutron/tests/api/test_floating_ips.py | 175 ----- .../tests/api/test_floating_ips_negative.py | 43 -- neutron/tests/api/test_networks.py | 602 +----------------- neutron/tests/api/test_networks_negative.py | 59 -- neutron/tests/api/test_ports.py | 376 +---------- neutron/tests/api/test_routers.py | 164 ----- neutron/tests/api/test_routers_negative.py | 92 --- neutron/tests/api/test_security_groups.py | 203 ------ .../api/test_security_groups_negative.py | 158 ----- 17 files changed, 29 insertions(+), 2517 deletions(-) delete mode 100644 neutron/tests/api/admin/test_external_networks_negative.py delete mode 100644 neutron/tests/api/test_extensions.py delete mode 100644 neutron/tests/api/test_networks_negative.py diff --git a/TESTING.rst b/TESTING.rst index 25f5d53e596..06a16a0c00f 100644 --- a/TESTING.rst +++ b/TESTING.rst @@ -287,8 +287,32 @@ be made about implementation. Only the contract defined by Neutron's REST API should be validated, and all interaction with the daemon should be via a REST client. -neutron/tests/api was copied from the Tempest project. The Tempest networking -API directory was frozen and any new tests belong to the Neutron repository. +neutron/tests/api was copied from the Tempest project. At the time, there was +an overlap of tests between the Tempest and Neutron repositories. This overlap +was then eliminated by carving out a subset of resources that belong to +Tempest, with the rest in Neutron. + +API tests that belong to Tempest deal with a subset of Neutron's resources: + +* Port +* Network +* Subnet +* Security Group +* Router +* Floating IP + +These resources were chosen for their ubiquitously. They are found in most +Neutron depoloyments regardless of plugin, and are directly involved in the +networking and security of an instance. Together, they form the bare minimum +needed by Neutron. + +This is excluding extensions to these resources (For example: Extra DHCP +options to subnets, or snat_gateway mode to routers) that are not mandatory +in the majority of cases. + +Tests for other resources should be contributed to the Neutron repository. +Scenario tests should be similarly split up between Tempest and Neutron +according to the API they're targeting. Development Process ------------------- diff --git a/neutron/tests/api/admin/test_external_network_extension.py b/neutron/tests/api/admin/test_external_network_extension.py index 87fa50982d9..dab05a33e86 100644 --- a/neutron/tests/api/admin/test_external_network_extension.py +++ b/neutron/tests/api/admin/test_external_network_extension.py @@ -19,117 +19,6 @@ import testtools from neutron.tests.api import base -class ExternalNetworksTestJSON(base.BaseAdminNetworkTest): - - @classmethod - def resource_setup(cls): - super(ExternalNetworksTestJSON, cls).resource_setup() - cls.network = cls.create_network() - - def _create_network(self, external=True): - post_body = {'name': data_utils.rand_name('network-')} - if external: - post_body['router:external'] = external - body = self.admin_client.create_network(**post_body) - network = body['network'] - self.addCleanup(self.admin_client.delete_network, network['id']) - return network - - @test.idempotent_id('462be770-b310-4df9-9c42-773217e4c8b1') - def test_create_external_network(self): - # Create a network as an admin user specifying the - # external network extension attribute - ext_network = self._create_network() - # Verifies router:external parameter - self.assertIsNotNone(ext_network['id']) - self.assertTrue(ext_network['router:external']) - - @test.idempotent_id('4db5417a-e11c-474d-a361-af00ebef57c5') - def test_update_external_network(self): - # Update a network as an admin user specifying the - # external network extension attribute - network = self._create_network(external=False) - self.assertFalse(network.get('router:external', False)) - update_body = {'router:external': True} - body = self.admin_client.update_network(network['id'], - **update_body) - updated_network = body['network'] - # Verify that router:external parameter was updated - self.assertTrue(updated_network['router:external']) - - @test.idempotent_id('39be4c9b-a57e-4ff9-b7c7-b218e209dfcc') - def test_list_external_networks(self): - # Create external_net - external_network = self._create_network() - # List networks as a normal user and confirm the external - # network extension attribute is returned for those networks - # that were created as external - body = self.client.list_networks() - networks_list = [net['id'] for net in body['networks']] - self.assertIn(external_network['id'], networks_list) - self.assertIn(self.network['id'], networks_list) - for net in body['networks']: - if net['id'] == self.network['id']: - self.assertFalse(net['router:external']) - elif net['id'] == external_network['id']: - self.assertTrue(net['router:external']) - - @test.idempotent_id('2ac50ab2-7ebd-4e27-b3ce-a9e399faaea2') - def test_show_external_networks_attribute(self): - # Create external_net - external_network = self._create_network() - # Show an external network as a normal user and confirm the - # external network extension attribute is returned. - body = self.client.show_network(external_network['id']) - show_ext_net = body['network'] - self.assertEqual(external_network['name'], show_ext_net['name']) - self.assertEqual(external_network['id'], show_ext_net['id']) - self.assertTrue(show_ext_net['router:external']) - body = self.client.show_network(self.network['id']) - show_net = body['network'] - # Verify with show that router:external is False for network - self.assertEqual(self.network['name'], show_net['name']) - self.assertEqual(self.network['id'], show_net['id']) - self.assertFalse(show_net['router:external']) - - @test.idempotent_id('82068503-2cf2-4ed4-b3be-ecb89432e4bb') - def test_delete_external_networks_with_floating_ip(self): - """Verifies external network can be deleted while still holding - (unassociated) floating IPs - - """ - # Set cls.client to admin to use base.create_subnet() - client = self.admin_client - body = client.create_network(**{'router:external': True}) - external_network = body['network'] - self.addCleanup(self._try_delete_resource, - client.delete_network, - external_network['id']) - subnet = self.create_subnet(external_network, client=client, - enable_dhcp=False) - body = client.create_floatingip( - floating_network_id=external_network['id']) - created_floating_ip = body['floatingip'] - self.addCleanup(self._try_delete_resource, - client.delete_floatingip, - created_floating_ip['id']) - floatingip_list = client.list_floatingips( - network=external_network['id']) - self.assertIn(created_floating_ip['id'], - (f['id'] for f in floatingip_list['floatingips'])) - client.delete_network(external_network['id']) - # Verifies floating ip is deleted - floatingip_list = client.list_floatingips() - self.assertNotIn(created_floating_ip['id'], - (f['id'] for f in floatingip_list['floatingips'])) - # Verifies subnet is deleted - subnet_list = client.list_subnets() - self.assertNotIn(subnet['id'], - (s['id'] for s in subnet_list)) - # Removes subnet from the cleanup list - self.subnets.remove(subnet) - - class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest): credentials = ['primary', 'alt', 'admin'] diff --git a/neutron/tests/api/admin/test_external_networks_negative.py b/neutron/tests/api/admin/test_external_networks_negative.py deleted file mode 100644 index 81f9b0e0c69..00000000000 --- a/neutron/tests/api/admin/test_external_networks_negative.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2014 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest.lib import exceptions as lib_exc -from tempest import test - -from neutron.tests.api import base -from neutron.tests.tempest import config - -CONF = config.CONF - - -class ExternalNetworksAdminNegativeTestJSON(base.BaseAdminNetworkTest): - - @test.attr(type=['negative']) - @test.idempotent_id('d402ae6c-0be0-4d8e-833b-a738895d98d0') - def test_create_port_with_precreated_floatingip_as_fixed_ip(self): - """ - External networks can be used to create both floating-ip as well - as instance-ip. So, creating an instance-ip with a value of a - pre-created floating-ip should be denied. - """ - - # create a floating ip - client = self.admin_client - body = client.create_floatingip( - floating_network_id=CONF.network.public_network_id) - created_floating_ip = body['floatingip'] - self.addCleanup(self._try_delete_resource, - client.delete_floatingip, - created_floating_ip['id']) - floating_ip_address = created_floating_ip['floating_ip_address'] - self.assertIsNotNone(floating_ip_address) - - # use the same value of floatingip as fixed-ip to create_port() - fixed_ips = [{'ip_address': floating_ip_address}] - - # create a port which will internally create an instance-ip - self.assertRaises(lib_exc.Conflict, - client.create_port, - network_id=CONF.network.public_network_id, - fixed_ips=fixed_ips) diff --git a/neutron/tests/api/admin/test_floating_ips_admin_actions.py b/neutron/tests/api/admin/test_floating_ips_admin_actions.py index 533ef3614a1..d9043517a83 100644 --- a/neutron/tests/api/admin/test_floating_ips_admin_actions.py +++ b/neutron/tests/api/admin/test_floating_ips_admin_actions.py @@ -41,75 +41,6 @@ class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest): cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.port = cls.create_port(cls.network) - @test.attr(type='smoke') - @test.idempotent_id('64f2100b-5471-4ded-b46c-ddeeeb4f231b') - def test_list_floating_ips_from_admin_and_nonadmin(self): - # Create floating ip from admin user - floating_ip_admin = self.admin_client.create_floatingip( - floating_network_id=self.ext_net_id) - self.addCleanup(self.admin_client.delete_floatingip, - floating_ip_admin['floatingip']['id']) - # Create floating ip from alt user - body = self.alt_client.create_floatingip( - floating_network_id=self.ext_net_id) - floating_ip_alt = body['floatingip'] - self.addCleanup(self.alt_client.delete_floatingip, - floating_ip_alt['id']) - # List floating ips from admin - body = self.admin_client.list_floatingips() - floating_ip_ids_admin = [f['id'] for f in body['floatingips']] - # Check that admin sees all floating ips - self.assertIn(self.floating_ip['id'], floating_ip_ids_admin) - self.assertIn(floating_ip_admin['floatingip']['id'], - floating_ip_ids_admin) - self.assertIn(floating_ip_alt['id'], floating_ip_ids_admin) - # List floating ips from nonadmin - body = self.client.list_floatingips() - floating_ip_ids = [f['id'] for f in body['floatingips']] - # Check that nonadmin user doesn't see floating ip created from admin - # and floating ip that is created in another tenant (alt user) - self.assertIn(self.floating_ip['id'], floating_ip_ids) - self.assertNotIn(floating_ip_admin['floatingip']['id'], - floating_ip_ids) - self.assertNotIn(floating_ip_alt['id'], floating_ip_ids) - - @test.attr(type='smoke') - @test.idempotent_id('32727cc3-abe2-4485-a16e-48f2d54c14f2') - def test_create_list_show_floating_ip_with_tenant_id_by_admin(self): - # Creates a floating IP - body = self.admin_client.create_floatingip( - floating_network_id=self.ext_net_id, - tenant_id=self.network['tenant_id'], - port_id=self.port['id']) - created_floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, - created_floating_ip['id']) - self.assertIsNotNone(created_floating_ip['id']) - self.assertIsNotNone(created_floating_ip['tenant_id']) - self.assertIsNotNone(created_floating_ip['floating_ip_address']) - self.assertEqual(created_floating_ip['port_id'], self.port['id']) - self.assertEqual(created_floating_ip['floating_network_id'], - self.ext_net_id) - port = self.port['fixed_ips'] - self.assertEqual(created_floating_ip['fixed_ip_address'], - port[0]['ip_address']) - # Verifies the details of a floating_ip - floating_ip = self.admin_client.show_floatingip( - created_floating_ip['id']) - shown_floating_ip = floating_ip['floatingip'] - self.assertEqual(shown_floating_ip['id'], created_floating_ip['id']) - self.assertEqual(shown_floating_ip['floating_network_id'], - self.ext_net_id) - self.assertEqual(shown_floating_ip['tenant_id'], - self.network['tenant_id']) - self.assertEqual(shown_floating_ip['floating_ip_address'], - created_floating_ip['floating_ip_address']) - self.assertEqual(shown_floating_ip['port_id'], self.port['id']) - # Verify the floating ip exists in the list of all floating_ips - floating_ips = self.admin_client.list_floatingips() - floatingip_id_list = [f['id'] for f in floating_ips['floatingips']] - self.assertIn(created_floating_ip['id'], floatingip_id_list) - @test.attr(type=['negative', 'smoke']) @test.idempotent_id('11116ee9-4e99-5b15-b8e1-aa7df92ca589') def test_associate_floating_ip_with_port_from_another_tenant(self): diff --git a/neutron/tests/api/base_routers.py b/neutron/tests/api/base_routers.py index 045f8835eaf..95ffa1411f5 100644 --- a/neutron/tests/api/base_routers.py +++ b/neutron/tests/api/base_routers.py @@ -43,21 +43,3 @@ class BaseRouterTest(base.BaseAdminNetworkTest): for router in list_body['routers']: routers_list.append(router['id']) self.assertNotIn(router_id, routers_list) - - def _add_router_interface_with_subnet_id(self, router_id, subnet_id): - interface = self.client.add_router_interface_with_subnet_id( - router_id, subnet_id) - self.addCleanup(self._remove_router_interface_with_subnet_id, - router_id, subnet_id) - self.assertEqual(subnet_id, interface['subnet_id']) - return interface - - def _remove_router_interface_with_subnet_id(self, router_id, subnet_id): - body = self.client.remove_router_interface_with_subnet_id( - router_id, subnet_id) - self.assertEqual(subnet_id, body['subnet_id']) - - def _remove_router_interface_with_port_id(self, router_id, port_id): - body = self.client.remove_router_interface_with_port_id(router_id, - port_id) - self.assertEqual(port_id, body['port_id']) diff --git a/neutron/tests/api/base_security_groups.py b/neutron/tests/api/base_security_groups.py index e3c165113c5..c6fb42dbe86 100644 --- a/neutron/tests/api/base_security_groups.py +++ b/neutron/tests/api/base_security_groups.py @@ -43,13 +43,3 @@ class BaseSecGroupTest(base.BaseNetworkTest): for secgroup in list_body['security_groups']: secgroup_list.append(secgroup['id']) self.assertNotIn(secgroup_id, secgroup_list) - - def _delete_security_group_rule(self, rule_id): - self.client.delete_security_group_rule(rule_id) - # Asserting that the security group is not found in the list - # after deletion - list_body = self.client.list_security_group_rules() - rules_list = list() - for rule in list_body['security_group_rules']: - rules_list.append(rule['id']) - self.assertNotIn(rule_id, rules_list) diff --git a/neutron/tests/api/test_dhcp_ipv6.py b/neutron/tests/api/test_dhcp_ipv6.py index 3b9f2e4199a..9682a633d25 100644 --- a/neutron/tests/api/test_dhcp_ipv6.py +++ b/neutron/tests/api/test_dhcp_ipv6.py @@ -13,11 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -import random - import netaddr -import six -from tempest.lib.common.utils import data_utils from tempest.lib import exceptions as lib_exc from tempest import test @@ -31,16 +27,6 @@ CONF = config.CONF class NetworksTestDHCPv6(base.BaseNetworkTest): _ip_version = 6 - """ Test DHCPv6 specific features using SLAAC, stateless and - stateful settings for subnets. Also it shall check dual-stack - functionality (IPv4 + IPv6 together). - The tests include: - generating of SLAAC EUI-64 address in subnets with various settings - receiving SLAAC addresses in combinations of various subnets - receiving stateful IPv6 addresses - addressing in subnets with router - """ - @classmethod def skip_checks(cls): msg = None @@ -89,249 +75,6 @@ class NetworksTestDHCPv6(base.BaseNetworkTest): self.client.delete_router(router['id']) self._remove_from_list_by_index(self.routers, router) - def _get_ips_from_subnet(self, **kwargs): - subnet = self.create_subnet(self.network, **kwargs) - port_mac = data_utils.rand_mac_address() - port = self.create_port(self.network, mac_address=port_mac) - real_ip = next(iter(port['fixed_ips']), None)['ip_address'] - eui_ip = data_utils.get_ipv6_addr_by_EUI64(subnet['cidr'], - port_mac).format() - return real_ip, eui_ip - - @test.idempotent_id('e5517e62-6f16-430d-a672-f80875493d4c') - def test_dhcpv6_stateless_eui64(self): - """When subnets configured with IPv6 SLAAC (AOM=100) and DHCPv6 - stateless (AOM=110) both for radvd and dnsmasq, port shall receive IP - address calculated from its MAC. - """ - for ra_mode, add_mode in ( - ('slaac', 'slaac'), - ('dhcpv6-stateless', 'dhcpv6-stateless'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - real_ip, eui_ip = self._get_ips_from_subnet(**kwargs) - self._clean_network() - self.assertEqual(eui_ip, real_ip, - ('Real port IP is %s, but shall be %s when ' - 'ipv6_ra_mode=%s and ipv6_address_mode=%s') % ( - real_ip, eui_ip, ra_mode, add_mode)) - - @test.idempotent_id('ae2f4a5d-03ff-4c42-a3b0-ce2fcb7ea832') - def test_dhcpv6_stateless_no_ra(self): - """When subnets configured with IPv6 SLAAC and DHCPv6 stateless - and there is no radvd, port shall receive IP address calculated - from its MAC and mask of subnet. - """ - for ra_mode, add_mode in ( - (None, 'slaac'), - (None, 'dhcpv6-stateless'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - kwargs = {k: v for k, v in six.iteritems(kwargs) if v} - real_ip, eui_ip = self._get_ips_from_subnet(**kwargs) - self._clean_network() - self.assertEqual(eui_ip, real_ip, - ('Real port IP %s shall be equal to EUI-64 %s' - 'when ipv6_ra_mode=%s,ipv6_address_mode=%s') % ( - real_ip, eui_ip, - ra_mode if ra_mode else "Off", - add_mode if add_mode else "Off")) - - @test.idempotent_id('81f18ef6-95b5-4584-9966-10d480b7496a') - def test_dhcpv6_invalid_options(self): - """Different configurations for radvd and dnsmasq are not allowed""" - for ra_mode, add_mode in ( - ('dhcpv6-stateless', 'dhcpv6-stateful'), - ('dhcpv6-stateless', 'slaac'), - ('slaac', 'dhcpv6-stateful'), - ('dhcpv6-stateful', 'dhcpv6-stateless'), - ('dhcpv6-stateful', 'slaac'), - ('slaac', 'dhcpv6-stateless'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - self.assertRaises(lib_exc.BadRequest, - self.create_subnet, - self.network, - **kwargs) - - @test.idempotent_id('21635b6f-165a-4d42-bf49-7d195e47342f') - def test_dhcpv6_stateless_no_ra_no_dhcp(self): - """If no radvd option and no dnsmasq option is configured - port shall receive IP from fixed IPs list of subnet. - """ - real_ip, eui_ip = self._get_ips_from_subnet() - self._clean_network() - self.assertNotEqual(eui_ip, real_ip, - ('Real port IP %s equal to EUI-64 %s when ' - 'ipv6_ra_mode=Off and ipv6_address_mode=Off,' - 'but shall be taken from fixed IPs') % ( - real_ip, eui_ip)) - - @test.idempotent_id('4544adf7-bb5f-4bdc-b769-b3e77026cef2') - def test_dhcpv6_two_subnets(self): - """When one IPv6 subnet configured with IPv6 SLAAC or DHCPv6 stateless - and other IPv6 is with DHCPv6 stateful, port shall receive EUI-64 IP - addresses from first subnet and DHCPv6 address from second one. - Order of subnet creating should be unimportant. - """ - for order in ("slaac_first", "dhcp_first"): - for ra_mode, add_mode in ( - ('slaac', 'slaac'), - ('dhcpv6-stateless', 'dhcpv6-stateless'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - kwargs_dhcp = {'ipv6_address_mode': 'dhcpv6-stateful'} - if order == "slaac_first": - subnet_slaac = self.create_subnet(self.network, **kwargs) - subnet_dhcp = self.create_subnet( - self.network, **kwargs_dhcp) - else: - subnet_dhcp = self.create_subnet( - self.network, **kwargs_dhcp) - subnet_slaac = self.create_subnet(self.network, **kwargs) - port_mac = data_utils.rand_mac_address() - dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"] - eui_ip = data_utils.get_ipv6_addr_by_EUI64( - subnet_slaac['cidr'], - port_mac - ).format() - # TODO(sergsh): remove this when 1219795 is fixed - dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()] - port = self.create_port(self.network, mac_address=port_mac) - real_ips = dict([(k['subnet_id'], k['ip_address']) - for k in port['fixed_ips']]) - real_dhcp_ip, real_eui_ip = [real_ips[sub['id']] - for sub in subnet_dhcp, - subnet_slaac] - self.client.delete_port(port['id']) - self.ports.pop() - body = self.client.list_ports() - ports_id_list = [i['id'] for i in body['ports']] - self.assertNotIn(port['id'], ports_id_list) - self._clean_network() - self.assertEqual(real_eui_ip, - eui_ip, - 'Real IP is {0}, but shall be {1}'.format( - real_eui_ip, - eui_ip)) - self.assertIn( - real_dhcp_ip, dhcp_ip, - 'Real IP is {0}, but shall be one from {1}'.format( - real_dhcp_ip, - str(dhcp_ip))) - - @test.idempotent_id('4256c61d-c538-41ea-9147-3c450c36669e') - def test_dhcpv6_64_subnets(self): - """When a Network contains two subnets, one being an IPv6 subnet - configured with ipv6_ra_mode either as slaac or dhcpv6-stateless, - and the other subnet being an IPv4 subnet, a port attached to the - network shall receive IP addresses from the subnets as follows: An - IPv6 address calculated using EUI-64 from the first subnet, and an - IPv4 address from the second subnet. The ordering of the subnets - that the port is associated with should not affect this behavior. - """ - for order in ("slaac_first", "dhcp_first"): - for ra_mode, add_mode in ( - ('slaac', 'slaac'), - ('dhcpv6-stateless', 'dhcpv6-stateless'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - if order == "slaac_first": - subnet_slaac = self.create_subnet(self.network, **kwargs) - subnet_dhcp = self.create_subnet( - self.network, ip_version=4) - else: - subnet_dhcp = self.create_subnet( - self.network, ip_version=4) - subnet_slaac = self.create_subnet(self.network, **kwargs) - port_mac = data_utils.rand_mac_address() - dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"] - eui_ip = data_utils.get_ipv6_addr_by_EUI64( - subnet_slaac['cidr'], - port_mac - ).format() - # TODO(sergsh): remove this when 1219795 is fixed - dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()] - port = self.create_port(self.network, mac_address=port_mac) - real_ips = dict([(k['subnet_id'], k['ip_address']) - for k in port['fixed_ips']]) - real_dhcp_ip, real_eui_ip = [real_ips[sub['id']] - for sub in subnet_dhcp, - subnet_slaac] - self._clean_network() - self.assertTrue({real_eui_ip, - real_dhcp_ip}.issubset([eui_ip] + dhcp_ip)) - self.assertEqual(real_eui_ip, - eui_ip, - 'Real IP is {0}, but shall be {1}'.format( - real_eui_ip, - eui_ip)) - self.assertIn( - real_dhcp_ip, dhcp_ip, - 'Real IP is {0}, but shall be one from {1}'.format( - real_dhcp_ip, - str(dhcp_ip))) - - @test.idempotent_id('4ab211a0-276f-4552-9070-51e27f58fecf') - def test_dhcp_stateful(self): - """With all options below, DHCPv6 shall allocate first - address from subnet pool to port. - """ - for ra_mode, add_mode in ( - ('dhcpv6-stateful', 'dhcpv6-stateful'), - ('dhcpv6-stateful', None), - (None, 'dhcpv6-stateful'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - kwargs = {k: v for k, v in six.iteritems(kwargs) if v} - subnet = self.create_subnet(self.network, **kwargs) - port = self.create_port(self.network) - port_ip = next(iter(port['fixed_ips']), None)['ip_address'] - dhcp_ip = subnet["allocation_pools"][0]["start"] - # TODO(sergsh): remove this when 1219795 is fixed - dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()] - self._clean_network() - self.assertIn( - port_ip, dhcp_ip, - 'Real IP is {0}, but shall be one from {1}'.format( - port_ip, - str(dhcp_ip))) - - @test.idempotent_id('51a5e97f-f02e-4e4e-9a17-a69811d300e3') - def test_dhcp_stateful_fixedips(self): - """With all options below, port shall be able to get - requested IP from fixed IP range not depending on - DHCPv6 stateful (not SLAAC!) settings configured. - """ - for ra_mode, add_mode in ( - ('dhcpv6-stateful', 'dhcpv6-stateful'), - ('dhcpv6-stateful', None), - (None, 'dhcpv6-stateful'), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - kwargs = {k: v for k, v in six.iteritems(kwargs) if v} - subnet = self.create_subnet(self.network, **kwargs) - ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"], - subnet["allocation_pools"][0]["end"]) - ip = netaddr.IPAddress(random.randrange(ip_range.first, - ip_range.last)).format() - port = self.create_port(self.network, - fixed_ips=[{'subnet_id': subnet['id'], - 'ip_address': ip}]) - port_ip = next(iter(port['fixed_ips']), None)['ip_address'] - self._clean_network() - self.assertEqual(port_ip, ip, - ("Port IP %s is not as fixed IP from " - "port create request: %s") % ( - port_ip, ip)) - @test.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af') def test_dhcp_stateful_fixedips_outrange(self): """When port gets IP address from fixed IP range it @@ -350,59 +93,6 @@ class NetworksTestDHCPv6(base.BaseNetworkTest): fixed_ips=[{'subnet_id': subnet['id'], 'ip_address': ip}]) - @test.idempotent_id('57b8302b-cba9-4fbb-8835-9168df029051') - def test_dhcp_stateful_fixedips_duplicate(self): - """When port gets IP address from fixed IP range it - shall be checked if it's not duplicate. - """ - kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful', - 'ipv6_address_mode': 'dhcpv6-stateful'} - subnet = self.create_subnet(self.network, **kwargs) - ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"], - subnet["allocation_pools"][0]["end"]) - ip = netaddr.IPAddress(random.randrange( - ip_range.first, ip_range.last)).format() - self.create_port(self.network, - fixed_ips=[ - {'subnet_id': subnet['id'], - 'ip_address': ip}]) - self.assertRaisesRegexp(lib_exc.Conflict, - "object with that identifier already exists", - self.create_port, - self.network, - fixed_ips=[{'subnet_id': subnet['id'], - 'ip_address': ip}]) - - def _create_subnet_router(self, kwargs): - subnet = self.create_subnet(self.network, **kwargs) - router = self.create_router( - router_name=data_utils.rand_name("routerv6-"), - admin_state_up=True) - port = self.create_router_interface(router['id'], - subnet['id']) - body = self.client.show_port(port['port_id']) - return subnet, body['port'] - - @test.idempotent_id('e98f65db-68f4-4330-9fea-abd8c5192d4d') - def test_dhcp_stateful_router(self): - """With all options below the router interface shall - receive DHCPv6 IP address from allocation pool. - """ - for ra_mode, add_mode in ( - ('dhcpv6-stateful', 'dhcpv6-stateful'), - ('dhcpv6-stateful', None), - ): - kwargs = {'ipv6_ra_mode': ra_mode, - 'ipv6_address_mode': add_mode} - kwargs = {k: v for k, v in six.iteritems(kwargs) if v} - subnet, port = self._create_subnet_router(kwargs) - port_ip = next(iter(port['fixed_ips']), None)['ip_address'] - self._clean_network() - self.assertEqual(port_ip, subnet['gateway_ip'], - ("Port IP %s is not as first IP from " - "subnets allocation pool: %s") % ( - port_ip, subnet['gateway_ip'])) - def tearDown(self): self._clean_network() super(NetworksTestDHCPv6, self).tearDown() diff --git a/neutron/tests/api/test_extensions.py b/neutron/tests/api/test_extensions.py deleted file mode 100644 index 71eb98d2696..00000000000 --- a/neutron/tests/api/test_extensions.py +++ /dev/null @@ -1,74 +0,0 @@ -# Copyright 2013 OpenStack, Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest import test - -from neutron.tests.api import base - - -class ExtensionsTestJSON(base.BaseNetworkTest): - - """ - Tests the following operations in the Neutron API using the REST client for - Neutron: - - List all available extensions - - v2.0 of the Neutron API is assumed. It is also assumed that the following - options are defined in the [network] section of etc/tempest.conf: - - """ - - @classmethod - def resource_setup(cls): - super(ExtensionsTestJSON, cls).resource_setup() - - @test.attr(type='smoke') - @test.idempotent_id('ef28c7e6-e646-4979-9d67-deb207bc5564') - def test_list_show_extensions(self): - # List available extensions for the tenant - expected_alias = ['security-group', 'l3_agent_scheduler', - 'ext-gw-mode', 'binding', 'quotas', - 'agent', 'dhcp_agent_scheduler', 'provider', - 'router', 'extraroute', 'external-net', - 'allowed-address-pairs', 'extra_dhcp_opt'] - expected_alias = [ext for ext in expected_alias if - test.is_extension_enabled(ext, 'network')] - actual_alias = list() - extensions = self.client.list_extensions() - list_extensions = extensions['extensions'] - # Show and verify the details of the available extensions - for ext in list_extensions: - ext_name = ext['name'] - ext_alias = ext['alias'] - actual_alias.append(ext['alias']) - ext_details = self.client.show_extension(ext_alias) - ext_details = ext_details['extension'] - - self.assertIsNotNone(ext_details) - self.assertIn('updated', ext_details.keys()) - self.assertIn('name', ext_details.keys()) - self.assertIn('description', ext_details.keys()) - self.assertIn('links', ext_details.keys()) - self.assertIn('alias', ext_details.keys()) - self.assertEqual(ext_details['name'], ext_name) - self.assertEqual(ext_details['alias'], ext_alias) - self.assertEqual(ext_details, ext) - # Verify if expected extensions are present in the actual list - # of extensions returned, but only for those that have been - # enabled via configuration - for e in expected_alias: - if test.is_extension_enabled(e, 'network'): - self.assertIn(e, actual_alias) diff --git a/neutron/tests/api/test_floating_ips.py b/neutron/tests/api/test_floating_ips.py index 472b3e05506..793b4ed9d2a 100644 --- a/neutron/tests/api/test_floating_ips.py +++ b/neutron/tests/api/test_floating_ips.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import netaddr from tempest.lib.common.utils import data_utils from tempest import test @@ -25,25 +24,6 @@ CONF = config.CONF class FloatingIPTestJSON(base.BaseNetworkTest): - """ - Tests the following operations in the Quantum API using the REST client for - Neutron: - - Create a Floating IP - Update a Floating IP - Delete a Floating IP - List all Floating IPs - Show Floating IP details - Associate a Floating IP with a port and then delete that port - Associate a Floating IP with a port and then with a port on another - router - - v2.0 of the Neutron API is assumed. It is also assumed that the following - options are defined in the [network] section of etc/tempest.conf: - - public_network_id which is the id for the external network present - """ - @classmethod def resource_setup(cls): super(FloatingIPTestJSON, cls).resource_setup() @@ -63,61 +43,6 @@ class FloatingIPTestJSON(base.BaseNetworkTest): for i in range(2): cls.create_port(cls.network) - @test.attr(type='smoke') - @test.idempotent_id('62595970-ab1c-4b7f-8fcc-fddfe55e8718') - def test_create_list_show_update_delete_floating_ip(self): - # Creates a floating IP - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id, - port_id=self.ports[0]['id']) - created_floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, - created_floating_ip['id']) - self.assertIsNotNone(created_floating_ip['id']) - self.assertIsNotNone(created_floating_ip['tenant_id']) - self.assertIsNotNone(created_floating_ip['floating_ip_address']) - self.assertEqual(created_floating_ip['port_id'], self.ports[0]['id']) - self.assertEqual(created_floating_ip['floating_network_id'], - self.ext_net_id) - self.assertIn(created_floating_ip['fixed_ip_address'], - [ip['ip_address'] for ip in self.ports[0]['fixed_ips']]) - # Verifies the details of a floating_ip - floating_ip = self.client.show_floatingip(created_floating_ip['id']) - shown_floating_ip = floating_ip['floatingip'] - self.assertEqual(shown_floating_ip['id'], created_floating_ip['id']) - self.assertEqual(shown_floating_ip['floating_network_id'], - self.ext_net_id) - self.assertEqual(shown_floating_ip['tenant_id'], - created_floating_ip['tenant_id']) - self.assertEqual(shown_floating_ip['floating_ip_address'], - created_floating_ip['floating_ip_address']) - self.assertEqual(shown_floating_ip['port_id'], self.ports[0]['id']) - - # Verify the floating ip exists in the list of all floating_ips - floating_ips = self.client.list_floatingips() - floatingip_id_list = list() - for f in floating_ips['floatingips']: - floatingip_id_list.append(f['id']) - self.assertIn(created_floating_ip['id'], floatingip_id_list) - # Associate floating IP to the other port - floating_ip = self.client.update_floatingip( - created_floating_ip['id'], - port_id=self.ports[1]['id']) - updated_floating_ip = floating_ip['floatingip'] - self.assertEqual(updated_floating_ip['port_id'], self.ports[1]['id']) - self.assertEqual(updated_floating_ip['fixed_ip_address'], - self.ports[1]['fixed_ips'][0]['ip_address']) - self.assertEqual(updated_floating_ip['router_id'], self.router['id']) - - # Disassociate floating IP from the port - floating_ip = self.client.update_floatingip( - created_floating_ip['id'], - port_id=None) - updated_floating_ip = floating_ip['floatingip'] - self.assertIsNone(updated_floating_ip['port_id']) - self.assertIsNone(updated_floating_ip['fixed_ip_address']) - self.assertIsNone(updated_floating_ip['router_id']) - @test.attr(type='smoke') @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff') def test_create_update_floatingip_description(self): @@ -137,103 +62,3 @@ class FloatingIPTestJSON(base.BaseNetworkTest): self.assertEqual('d2', body['floatingip']['description']) body = self.client.show_floatingip(body['floatingip']['id']) self.assertEqual('d2', body['floatingip']['description']) - - @test.attr(type='smoke') - @test.idempotent_id('e1f6bffd-442f-4668-b30e-df13f2705e77') - def test_floating_ip_delete_port(self): - # Create a floating IP - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id) - created_floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, - created_floating_ip['id']) - # Create a port - port = self.client.create_port(network_id=self.network['id']) - created_port = port['port'] - floating_ip = self.client.update_floatingip( - created_floating_ip['id'], - port_id=created_port['id']) - # Delete port - self.client.delete_port(created_port['id']) - # Verifies the details of the floating_ip - floating_ip = self.client.show_floatingip(created_floating_ip['id']) - shown_floating_ip = floating_ip['floatingip'] - # Confirm the fields are back to None - self.assertEqual(shown_floating_ip['id'], created_floating_ip['id']) - self.assertIsNone(shown_floating_ip['port_id']) - self.assertIsNone(shown_floating_ip['fixed_ip_address']) - self.assertIsNone(shown_floating_ip['router_id']) - - @test.attr(type='smoke') - @test.idempotent_id('1bb2f731-fe5a-4b8c-8409-799ade1bed4d') - def test_floating_ip_update_different_router(self): - # Associate a floating IP to a port on a router - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id, - port_id=self.ports[1]['id']) - created_floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, - created_floating_ip['id']) - self.assertEqual(created_floating_ip['router_id'], self.router['id']) - network2 = self.create_network() - subnet2 = self.create_subnet(network2) - router2 = self.create_router(data_utils.rand_name('router-'), - external_network_id=self.ext_net_id) - self.create_router_interface(router2['id'], subnet2['id']) - port_other_router = self.create_port(network2) - # Associate floating IP to the other port on another router - floating_ip = self.client.update_floatingip( - created_floating_ip['id'], - port_id=port_other_router['id']) - updated_floating_ip = floating_ip['floatingip'] - self.assertEqual(updated_floating_ip['router_id'], router2['id']) - self.assertEqual(updated_floating_ip['port_id'], - port_other_router['id']) - self.assertIsNotNone(updated_floating_ip['fixed_ip_address']) - - @test.attr(type='smoke') - @test.idempotent_id('36de4bd0-f09c-43e3-a8e1-1decc1ffd3a5') - def test_create_floating_ip_specifying_a_fixed_ip_address(self): - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id, - port_id=self.ports[1]['id'], - fixed_ip_address=self.ports[1]['fixed_ips'][0]['ip_address']) - created_floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, - created_floating_ip['id']) - self.assertIsNotNone(created_floating_ip['id']) - self.assertEqual(created_floating_ip['fixed_ip_address'], - self.ports[1]['fixed_ips'][0]['ip_address']) - floating_ip = self.client.update_floatingip( - created_floating_ip['id'], - port_id=None) - self.assertIsNone(floating_ip['floatingip']['port_id']) - - @test.attr(type='smoke') - @test.idempotent_id('45c4c683-ea97-41ef-9c51-5e9802f2f3d7') - def test_create_update_floatingip_with_port_multiple_ip_address(self): - # Find out ips that can be used for tests - ips = list(netaddr.IPNetwork(self.subnet['cidr'])) - list_ips = [str(ip) for ip in ips[-3:-1]] - fixed_ips = [{'ip_address': list_ips[0]}, {'ip_address': list_ips[1]}] - # Create port - body = self.client.create_port(network_id=self.network['id'], - fixed_ips=fixed_ips) - port = body['port'] - self.addCleanup(self.client.delete_port, port['id']) - # Create floating ip - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id, - port_id=port['id'], - fixed_ip_address=list_ips[0]) - floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, floating_ip['id']) - self.assertIsNotNone(floating_ip['id']) - self.assertEqual(floating_ip['fixed_ip_address'], list_ips[0]) - # Update floating ip - body = self.client.update_floatingip(floating_ip['id'], - port_id=port['id'], - fixed_ip_address=list_ips[1]) - update_floating_ip = body['floatingip'] - self.assertEqual(update_floating_ip['fixed_ip_address'], - list_ips[1]) diff --git a/neutron/tests/api/test_floating_ips_negative.py b/neutron/tests/api/test_floating_ips_negative.py index e66997c76a3..04d04bcdc7d 100644 --- a/neutron/tests/api/test_floating_ips_negative.py +++ b/neutron/tests/api/test_floating_ips_negative.py @@ -26,16 +26,6 @@ CONF = config.CONF class FloatingIPNegativeTestJSON(base.BaseNetworkTest): - """ - Test the following negative operations for floating ips: - - Create floatingip with a port that is unreachable to external network - Create floatingip in private network - Associate floatingip with port that is unreachable to external network - Associate floating ip to port that has already another floating ip - Associate floating ip with port from another tenant - """ - @classmethod def resource_setup(cls): super(FloatingIPNegativeTestJSON, cls).resource_setup() @@ -50,39 +40,6 @@ class FloatingIPNegativeTestJSON(base.BaseNetworkTest): cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.port = cls.create_port(cls.network) - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('22996ea8-4a81-4b27-b6e1-fa5df92fa5e8') - def test_create_floatingip_with_port_ext_net_unreachable(self): - self.assertRaises(lib_exc.NotFound, self.client.create_floatingip, - floating_network_id=self.ext_net_id, - port_id=self.port['id'], - fixed_ip_address=self.port['fixed_ips'][0] - ['ip_address']) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('50b9aeb4-9f0b-48ee-aa31-fa955a48ff54') - def test_create_floatingip_in_private_network(self): - self.assertRaises(lib_exc.BadRequest, - self.client.create_floatingip, - floating_network_id=self.network['id'], - port_id=self.port['id'], - fixed_ip_address=self.port['fixed_ips'][0] - ['ip_address']) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('6b3b8797-6d43-4191-985c-c48b773eb429') - def test_associate_floatingip_port_ext_net_unreachable(self): - # Create floating ip - body = self.client.create_floatingip( - floating_network_id=self.ext_net_id) - floating_ip = body['floatingip'] - self.addCleanup(self.client.delete_floatingip, floating_ip['id']) - # Associate floating IP to the other port - self.assertRaises(lib_exc.NotFound, self.client.update_floatingip, - floating_ip['id'], port_id=self.port['id'], - fixed_ip_address=self.port['fixed_ips'][0] - ['ip_address']) - @test.attr(type=['negative', 'smoke']) @test.idempotent_id('0b5b8797-6de7-4191-905c-a48b888eb429') def test_associate_floatingip_with_port_with_floatingip(self): diff --git a/neutron/tests/api/test_networks.py b/neutron/tests/api/test_networks.py index eb33f7a8360..37bd47a683a 100644 --- a/neutron/tests/api/test_networks.py +++ b/neutron/tests/api/test_networks.py @@ -15,11 +15,6 @@ import itertools -import netaddr -import six -from tempest.common import custom_matchers -from tempest.lib.common.utils import data_utils -from tempest.lib import exceptions as lib_exc from tempest import test from neutron.tests.api import base @@ -34,172 +29,17 @@ class NetworksTestJSON(base.BaseNetworkTest): Tests the following operations in the Neutron API using the REST client for Neutron: - create a network for a tenant list tenant's networks + show a network show a tenant network details - create a subnet for a tenant - list tenant's subnets - show a tenant subnet details - network update - subnet update - delete a network also deletes its subnets - list external networks - All subnet tests are run once with ipv4 and once with ipv6. - - v2.0 of the Neutron API is assumed. It is also assumed that the following - options are defined in the [network] section of etc/tempest.conf: - - tenant_network_cidr with a block of cidr's from which smaller blocks - can be allocated for tenant ipv4 subnets - - tenant_network_v6_cidr is the equivalent for ipv6 subnets - - tenant_network_mask_bits with the mask bits to be used to partition the - block defined by tenant_network_cidr - - tenant_network_v6_mask_bits is the equivalent for ipv6 subnets + v2.0 of the Neutron API is assumed. """ @classmethod def resource_setup(cls): super(NetworksTestJSON, cls).resource_setup() cls.network = cls.create_network() - cls.name = cls.network['name'] - cls.subnet = cls._create_subnet_with_last_subnet_block(cls.network, - cls._ip_version) - cls.cidr = cls.subnet['cidr'] - cls._subnet_data = {6: {'gateway': - str(cls._get_gateway_from_tempest_conf(6)), - 'allocation_pools': - cls._get_allocation_pools_from_gateway(6), - 'dns_nameservers': ['2001:4860:4860::8844', - '2001:4860:4860::8888'], - 'host_routes': [{'destination': '2001::/64', - 'nexthop': '2003::1'}], - 'new_host_routes': [{'destination': - '2001::/64', - 'nexthop': '2005::1'}], - 'new_dns_nameservers': - ['2001:4860:4860::7744', - '2001:4860:4860::7888']}, - 4: {'gateway': - str(cls._get_gateway_from_tempest_conf(4)), - 'allocation_pools': - cls._get_allocation_pools_from_gateway(4), - 'dns_nameservers': ['8.8.4.4', '8.8.8.8'], - 'host_routes': [{'destination': '10.20.0.0/32', - 'nexthop': '10.100.1.1'}], - 'new_host_routes': [{'destination': - '10.20.0.0/32', - 'nexthop': - '10.100.1.2'}], - 'new_dns_nameservers': ['7.8.8.8', '7.8.4.4']}} - - @classmethod - def _create_subnet_with_last_subnet_block(cls, network, ip_version): - """Derive last subnet CIDR block from tenant CIDR and - create the subnet with that derived CIDR - """ - if ip_version == 4: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr) - mask_bits = CONF.network.tenant_network_mask_bits - elif ip_version == 6: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - mask_bits = CONF.network.tenant_network_v6_mask_bits - - subnet_cidr = list(cidr.subnet(mask_bits))[-1] - gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1) - return cls.create_subnet(network, gateway=gateway_ip, - cidr=subnet_cidr, mask_bits=mask_bits) - - @classmethod - def _get_gateway_from_tempest_conf(cls, ip_version): - """Return first subnet gateway for configured CIDR """ - if ip_version == 4: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr) - mask_bits = CONF.network.tenant_network_mask_bits - elif ip_version == 6: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - mask_bits = CONF.network.tenant_network_v6_mask_bits - - if mask_bits >= cidr.prefixlen: - return netaddr.IPAddress(cidr) + 1 - else: - for subnet in cidr.subnet(mask_bits): - return netaddr.IPAddress(subnet) + 1 - - @classmethod - def _get_allocation_pools_from_gateway(cls, ip_version): - """Return allocation range for subnet of given gateway""" - gateway = cls._get_gateway_from_tempest_conf(ip_version) - return [{'start': str(gateway + 2), 'end': str(gateway + 3)}] - - def subnet_dict(self, include_keys): - """Return a subnet dict which has include_keys and their corresponding - value from self._subnet_data - """ - return dict((key, self._subnet_data[self._ip_version][key]) - for key in include_keys) - - def _compare_resource_attrs(self, actual, expected): - exclude_keys = set(actual).symmetric_difference(expected) - self.assertThat(actual, custom_matchers.MatchesDictExceptForKeys( - expected, exclude_keys)) - - def _delete_network(self, network): - # Deleting network also deletes its subnets if exists - self.client.delete_network(network['id']) - if network in self.networks: - self.networks.remove(network) - for subnet in self.subnets: - if subnet['network_id'] == network['id']: - self.subnets.remove(subnet) - - def _create_verify_delete_subnet(self, cidr=None, mask_bits=None, - **kwargs): - network = self.create_network() - net_id = network['id'] - gateway = kwargs.pop('gateway', None) - subnet = self.create_subnet(network, gateway, cidr, mask_bits, - **kwargs) - compare_args_full = dict(gateway_ip=gateway, cidr=cidr, - mask_bits=mask_bits, **kwargs) - compare_args = dict((k, v) for k, v in six.iteritems(compare_args_full) - if v is not None) - - if 'dns_nameservers' in set(subnet).intersection(compare_args): - self.assertEqual(sorted(compare_args['dns_nameservers']), - sorted(subnet['dns_nameservers'])) - del subnet['dns_nameservers'], compare_args['dns_nameservers'] - - self._compare_resource_attrs(subnet, compare_args) - self.client.delete_network(net_id) - self.networks.pop() - self.subnets.pop() - - @test.attr(type='smoke') - @test.idempotent_id('0e269138-0da6-4efc-a46d-578161e7b221') - def test_create_update_delete_network_subnet(self): - # Create a network - name = data_utils.rand_name('network-') - network = self.create_network(network_name=name) - self.addCleanup(self._delete_network, network) - net_id = network['id'] - self.assertEqual('ACTIVE', network['status']) - # Verify network update - new_name = "New_network" - body = self.client.update_network(net_id, name=new_name) - updated_net = body['network'] - self.assertEqual(updated_net['name'], new_name) - # Find a cidr that is not in use yet and create a subnet with it - subnet = self.create_subnet(network) - subnet_id = subnet['id'] - # Verify subnet update - new_name = "New_subnet" - body = self.client.update_subnet(subnet_id, name=new_name) - updated_subnet = body['subnet'] - self.assertEqual(updated_subnet['name'], new_name) @test.attr(type='smoke') @test.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e') @@ -227,15 +67,6 @@ class NetworksTestJSON(base.BaseNetworkTest): for field_name in fields: self.assertEqual(network[field_name], self.network[field_name]) - @test.attr(type='smoke') - @test.idempotent_id('f7ffdeda-e200-4a7a-bcbe-05716e86bf43') - def test_list_networks(self): - # Verify the network exists in the list of all networks - body = self.client.list_networks() - networks = [network['id'] for network in body['networks'] - if network['id'] == self.network['id']] - self.assertNotEmpty(networks, "Created network not found in the list") - @test.attr(type='smoke') @test.idempotent_id('c72c1c0c-2193-4aca-ccc4-b1442640bbbb') def test_create_update_network_description(self): @@ -267,176 +98,6 @@ class NetworksTestJSON(base.BaseNetworkTest): for network in networks: self.assertEqual(sorted(network.keys()), sorted(fields)) - @test.attr(type='smoke') - @test.idempotent_id('bd635d81-6030-4dd1-b3b9-31ba0cfdf6cc') - def test_show_subnet(self): - # Verify the details of a subnet - body = self.client.show_subnet(self.subnet['id']) - subnet = body['subnet'] - self.assertNotEmpty(subnet, "Subnet returned has no fields") - for key in ['id', 'cidr']: - self.assertIn(key, subnet) - self.assertEqual(subnet[key], self.subnet[key]) - - @test.attr(type='smoke') - @test.idempotent_id('270fff0b-8bfc-411f-a184-1e8fd35286f0') - def test_show_subnet_fields(self): - # Verify specific fields of a subnet - fields = ['id', 'network_id'] - body = self.client.show_subnet(self.subnet['id'], - fields=fields) - subnet = body['subnet'] - self.assertEqual(sorted(subnet.keys()), sorted(fields)) - for field_name in fields: - self.assertEqual(subnet[field_name], self.subnet[field_name]) - - @test.attr(type='smoke') - @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640bbbb') - def test_create_update_subnet_description(self): - if not test.is_extension_enabled('standard-attr-description', - 'network'): - msg = "standard-attr-description not enabled." - raise self.skipException(msg) - body = self.create_subnet(self.network, description='d1') - self.assertEqual('d1', body['description']) - sub_id = body['id'] - body = self.client.list_subnets(id=sub_id)['subnets'][0] - self.assertEqual('d1', body['description']) - body = self.client.update_subnet(body['id'], - description='d2') - self.assertEqual('d2', body['subnet']['description']) - body = self.client.list_subnets(id=sub_id)['subnets'][0] - self.assertEqual('d2', body['description']) - - @test.attr(type='smoke') - @test.idempotent_id('db68ba48-f4ea-49e9-81d1-e367f6d0b20a') - def test_list_subnets(self): - # Verify the subnet exists in the list of all subnets - body = self.client.list_subnets() - subnets = [subnet['id'] for subnet in body['subnets'] - if subnet['id'] == self.subnet['id']] - self.assertNotEmpty(subnets, "Created subnet not found in the list") - - @test.attr(type='smoke') - @test.idempotent_id('842589e3-9663-46b0-85e4-7f01273b0412') - def test_list_subnets_fields(self): - # Verify specific fields of subnets - fields = ['id', 'network_id'] - body = self.client.list_subnets(fields=fields) - subnets = body['subnets'] - self.assertNotEmpty(subnets, "Subnet list returned is empty") - for subnet in subnets: - self.assertEqual(sorted(subnet.keys()), sorted(fields)) - - def _try_delete_network(self, net_id): - # delete network, if it exists - try: - self.client.delete_network(net_id) - # if network is not found, this means it was deleted in the test - except lib_exc.NotFound: - pass - - @test.attr(type='smoke') - @test.idempotent_id('f04f61a9-b7f3-4194-90b2-9bcf660d1bfe') - def test_delete_network_with_subnet(self): - # Creates a network - name = data_utils.rand_name('network-') - body = self.client.create_network(name=name) - network = body['network'] - net_id = network['id'] - self.addCleanup(self._try_delete_network, net_id) - - # Find a cidr that is not in use yet and create a subnet with it - subnet = self.create_subnet(network) - subnet_id = subnet['id'] - - # Delete network while the subnet still exists - body = self.client.delete_network(net_id) - - # Verify that the subnet got automatically deleted. - self.assertRaises(lib_exc.NotFound, self.client.show_subnet, - subnet_id) - - # Since create_subnet adds the subnet to the delete list, and it is - # is actually deleted here - this will create and issue, hence remove - # it from the list. - self.subnets.pop() - - @test.attr(type='smoke') - @test.idempotent_id('d2d596e2-8e76-47a9-ac51-d4648009f4d3') - def test_create_delete_subnet_without_gateway(self): - self._create_verify_delete_subnet() - - @test.attr(type='smoke') - @test.idempotent_id('9393b468-186d-496d-aa36-732348cd76e7') - def test_create_delete_subnet_with_gw(self): - self._create_verify_delete_subnet( - **self.subnet_dict(['gateway'])) - - @test.attr(type='smoke') - @test.idempotent_id('bec949c4-3147-4ba6-af5f-cd2306118404') - def test_create_delete_subnet_with_allocation_pools(self): - self._create_verify_delete_subnet( - **self.subnet_dict(['allocation_pools'])) - - @test.attr(type='smoke') - @test.idempotent_id('8217a149-0c6c-4cfb-93db-0486f707d13f') - def test_create_delete_subnet_with_gw_and_allocation_pools(self): - self._create_verify_delete_subnet(**self.subnet_dict( - ['gateway', 'allocation_pools'])) - - @test.attr(type='smoke') - @test.idempotent_id('d830de0a-be47-468f-8f02-1fd996118289') - def test_create_delete_subnet_with_host_routes_and_dns_nameservers(self): - self._create_verify_delete_subnet( - **self.subnet_dict(['host_routes', 'dns_nameservers'])) - - @test.attr(type='smoke') - @test.idempotent_id('94ce038d-ff0a-4a4c-a56b-09da3ca0b55d') - def test_create_delete_subnet_with_dhcp_enabled(self): - self._create_verify_delete_subnet(enable_dhcp=True) - - @test.attr(type='smoke') - @test.idempotent_id('3d3852eb-3009-49ec-97ac-5ce83b73010a') - def test_update_subnet_gw_dns_host_routes_dhcp(self): - network = self.create_network() - self.addCleanup(self._delete_network, network) - - subnet = self.create_subnet( - network, **self.subnet_dict(['gateway', 'host_routes', - 'dns_nameservers', - 'allocation_pools'])) - subnet_id = subnet['id'] - new_gateway = str(netaddr.IPAddress( - self._subnet_data[self._ip_version]['gateway']) + 1) - # Verify subnet update - new_host_routes = self._subnet_data[self._ip_version][ - 'new_host_routes'] - - new_dns_nameservers = self._subnet_data[self._ip_version][ - 'new_dns_nameservers'] - kwargs = {'host_routes': new_host_routes, - 'dns_nameservers': new_dns_nameservers, - 'gateway_ip': new_gateway, 'enable_dhcp': True} - - new_name = "New_subnet" - body = self.client.update_subnet(subnet_id, name=new_name, - **kwargs) - updated_subnet = body['subnet'] - kwargs['name'] = new_name - self.assertEqual(sorted(updated_subnet['dns_nameservers']), - sorted(kwargs['dns_nameservers'])) - del subnet['dns_nameservers'], kwargs['dns_nameservers'] - - self._compare_resource_attrs(updated_subnet, kwargs) - - @test.attr(type='smoke') - @test.idempotent_id('a4d9ec4c-0306-4111-a75c-db01a709030b') - def test_create_delete_subnet_all_attributes(self): - self._create_verify_delete_subnet( - enable_dhcp=True, - **self.subnet_dict(['gateway', 'host_routes', 'dns_nameservers'])) - @test.attr(type='smoke') @test.idempotent_id('af774677-42a9-4e4b-bb58-16fe6a5bc1ec') def test_external_network_visibility(self): @@ -462,262 +123,3 @@ class NetworksTestJSON(base.BaseNetworkTest): subnets = [sub['id'] for sub in body['subnets'] if sub['id'] in public_subnets_iter] self.assertEmpty(subnets, "Public subnets visible") - - -class BulkNetworkOpsTestJSON(base.BaseNetworkTest): - - """ - Tests the following operations in the Neutron API using the REST client for - Neutron: - - bulk network creation - bulk subnet creation - bulk port creation - list tenant's networks - - v2.0 of the Neutron API is assumed. It is also assumed that the following - options are defined in the [network] section of etc/tempest.conf: - - tenant_network_cidr with a block of cidr's from which smaller blocks - can be allocated for tenant networks - - tenant_network_mask_bits with the mask bits to be used to partition the - block defined by tenant-network_cidr - """ - - def _delete_networks(self, created_networks): - for n in created_networks: - self.client.delete_network(n['id']) - # Asserting that the networks are not found in the list after deletion - body = self.client.list_networks() - networks_list = [network['id'] for network in body['networks']] - for n in created_networks: - self.assertNotIn(n['id'], networks_list) - - def _delete_subnets(self, created_subnets): - for n in created_subnets: - self.client.delete_subnet(n['id']) - # Asserting that the subnets are not found in the list after deletion - body = self.client.list_subnets() - subnets_list = [subnet['id'] for subnet in body['subnets']] - for n in created_subnets: - self.assertNotIn(n['id'], subnets_list) - - def _delete_ports(self, created_ports): - for n in created_ports: - self.client.delete_port(n['id']) - # Asserting that the ports are not found in the list after deletion - body = self.client.list_ports() - ports_list = [port['id'] for port in body['ports']] - for n in created_ports: - self.assertNotIn(n['id'], ports_list) - - @test.attr(type='smoke') - @test.idempotent_id('d4f9024d-1e28-4fc1-a6b1-25dbc6fa11e2') - def test_bulk_create_delete_network(self): - # Creates 2 networks in one request - network_names = [data_utils.rand_name('network-'), - data_utils.rand_name('network-')] - body = self.client.create_bulk_network(network_names) - created_networks = body['networks'] - self.addCleanup(self._delete_networks, created_networks) - # Asserting that the networks are found in the list after creation - body = self.client.list_networks() - networks_list = [network['id'] for network in body['networks']] - for n in created_networks: - self.assertIsNotNone(n['id']) - self.assertIn(n['id'], networks_list) - - @test.attr(type='smoke') - @test.idempotent_id('8936533b-c0aa-4f29-8e53-6cc873aec489') - def test_bulk_create_delete_subnet(self): - networks = [self.create_network(), self.create_network()] - # Creates 2 subnets in one request - if self._ip_version == 4: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr) - mask_bits = CONF.network.tenant_network_mask_bits - else: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - mask_bits = CONF.network.tenant_network_v6_mask_bits - - cidrs = [subnet_cidr for subnet_cidr in cidr.subnet(mask_bits)] - - names = [data_utils.rand_name('subnet-') for i in range(len(networks))] - subnets_list = [] - for i in range(len(names)): - p1 = { - 'network_id': networks[i]['id'], - 'cidr': str(cidrs[(i)]), - 'name': names[i], - 'ip_version': self._ip_version - } - subnets_list.append(p1) - del subnets_list[1]['name'] - body = self.client.create_bulk_subnet(subnets_list) - created_subnets = body['subnets'] - self.addCleanup(self._delete_subnets, created_subnets) - # Asserting that the subnets are found in the list after creation - body = self.client.list_subnets() - subnets_list = [subnet['id'] for subnet in body['subnets']] - for n in created_subnets: - self.assertIsNotNone(n['id']) - self.assertIn(n['id'], subnets_list) - - @test.attr(type='smoke') - @test.idempotent_id('48037ff2-e889-4c3b-b86a-8e3f34d2d060') - def test_bulk_create_delete_port(self): - networks = [self.create_network(), self.create_network()] - # Creates 2 ports in one request - names = [data_utils.rand_name('port-') for i in range(len(networks))] - port_list = [] - state = [True, False] - for i in range(len(names)): - p1 = { - 'network_id': networks[i]['id'], - 'name': names[i], - 'admin_state_up': state[i], - } - port_list.append(p1) - del port_list[1]['name'] - body = self.client.create_bulk_port(port_list) - created_ports = body['ports'] - self.addCleanup(self._delete_ports, created_ports) - # Asserting that the ports are found in the list after creation - body = self.client.list_ports() - ports_list = [port['id'] for port in body['ports']] - for n in created_ports: - self.assertIsNotNone(n['id']) - self.assertIn(n['id'], ports_list) - - -class BulkNetworkOpsIpV6TestJSON(BulkNetworkOpsTestJSON): - _ip_version = 6 - - -class NetworksIpV6TestJSON(NetworksTestJSON): - _ip_version = 6 - - @test.attr(type='smoke') - @test.idempotent_id('e41a4888-65a6-418c-a095-f7c2ef4ad59a') - def test_create_delete_subnet_with_gw(self): - net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - gateway = str(netaddr.IPAddress(net.first + 2)) - name = data_utils.rand_name('network-') - network = self.create_network(network_name=name) - subnet = self.create_subnet(network, gateway) - # Verifies Subnet GW in IPv6 - self.assertEqual(subnet['gateway_ip'], gateway) - - @test.attr(type='smoke') - @test.idempotent_id('ebb4fd95-524f-46af-83c1-0305b239338f') - def test_create_delete_subnet_with_default_gw(self): - net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - gateway_ip = str(netaddr.IPAddress(net.first + 1)) - name = data_utils.rand_name('network-') - network = self.create_network(network_name=name) - subnet = self.create_subnet(network) - # Verifies Subnet GW in IPv6 - self.assertEqual(subnet['gateway_ip'], gateway_ip) - - @test.attr(type='smoke') - @test.idempotent_id('a9653883-b2a4-469b-8c3c-4518430a7e55') - def test_create_list_subnet_with_no_gw64_one_network(self): - name = data_utils.rand_name('network-') - network = self.create_network(name) - ipv6_gateway = self.subnet_dict(['gateway'])['gateway'] - subnet1 = self.create_subnet(network, - ip_version=6, - gateway=ipv6_gateway) - self.assertEqual(netaddr.IPNetwork(subnet1['cidr']).version, 6, - 'The created subnet is not IPv6') - subnet2 = self.create_subnet(network, - gateway=None, - ip_version=4) - self.assertEqual(netaddr.IPNetwork(subnet2['cidr']).version, 4, - 'The created subnet is not IPv4') - # Verifies Subnet GW is set in IPv6 - self.assertEqual(subnet1['gateway_ip'], ipv6_gateway) - # Verifies Subnet GW is None in IPv4 - self.assertIsNone(subnet2['gateway_ip']) - # Verifies all 2 subnets in the same network - body = self.client.list_subnets() - subnets = [sub['id'] for sub in body['subnets'] - if sub['network_id'] == network['id']] - test_subnet_ids = [sub['id'] for sub in (subnet1, subnet2)] - self.assertItemsEqual(subnets, - test_subnet_ids, - 'Subnet are not in the same network') - - -class NetworksIpV6TestAttrs(NetworksIpV6TestJSON): - - @classmethod - def resource_setup(cls): - if not CONF.network_feature_enabled.ipv6_subnet_attributes: - raise cls.skipException("IPv6 extended attributes for " - "subnets not available") - super(NetworksIpV6TestAttrs, cls).resource_setup() - - @test.attr(type='smoke') - @test.idempotent_id('da40cd1b-a833-4354-9a85-cd9b8a3b74ca') - def test_create_delete_subnet_with_v6_attributes_stateful(self): - self._create_verify_delete_subnet( - gateway=self._subnet_data[self._ip_version]['gateway'], - ipv6_ra_mode='dhcpv6-stateful', - ipv6_address_mode='dhcpv6-stateful') - - @test.attr(type='smoke') - @test.idempotent_id('176b030f-a923-4040-a755-9dc94329e60c') - def test_create_delete_subnet_with_v6_attributes_slaac(self): - self._create_verify_delete_subnet( - ipv6_ra_mode='slaac', - ipv6_address_mode='slaac') - - @test.attr(type='smoke') - @test.idempotent_id('7d410310-8c86-4902-adf9-865d08e31adb') - def test_create_delete_subnet_with_v6_attributes_stateless(self): - self._create_verify_delete_subnet( - ipv6_ra_mode='dhcpv6-stateless', - ipv6_address_mode='dhcpv6-stateless') - - def _test_delete_subnet_with_ports(self, mode): - """Create subnet and delete it with existing ports""" - slaac_network = self.create_network() - subnet_slaac = self.create_subnet(slaac_network, - **{'ipv6_ra_mode': mode, - 'ipv6_address_mode': mode}) - port = self.create_port(slaac_network) - self.assertIsNotNone(port['fixed_ips'][0]['ip_address']) - self.client.delete_subnet(subnet_slaac['id']) - self.subnets.pop() - subnets = self.client.list_subnets() - subnet_ids = [subnet['id'] for subnet in subnets['subnets']] - self.assertNotIn(subnet_slaac['id'], subnet_ids, - "Subnet wasn't deleted") - self.assertRaisesRegexp( - lib_exc.Conflict, - "There are one or more ports still in use on the network", - self.client.delete_network, - slaac_network['id']) - - @test.attr(type='smoke') - @test.idempotent_id('88554555-ebf8-41ef-9300-4926d45e06e9') - def test_create_delete_slaac_subnet_with_ports(self): - """Test deleting subnet with SLAAC ports - - Create subnet with SLAAC, create ports in network - and then you shall be able to delete subnet without port - deletion. But you still can not delete the network. - """ - self._test_delete_subnet_with_ports("slaac") - - @test.attr(type='smoke') - @test.idempotent_id('2de6ab5a-fcf0-4144-9813-f91a940291f1') - def test_create_delete_stateless_subnet_with_ports(self): - """Test deleting subnet with DHCPv6 stateless ports - - Create subnet with DHCPv6 stateless, create ports in network - and then you shall be able to delete subnet without port - deletion. But you still can not delete the network. - """ - self._test_delete_subnet_with_ports("dhcpv6-stateless") diff --git a/neutron/tests/api/test_networks_negative.py b/neutron/tests/api/test_networks_negative.py deleted file mode 100644 index c77d4c34a38..00000000000 --- a/neutron/tests/api/test_networks_negative.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2013 Huawei Technologies Co.,LTD. -# Copyright 2012 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from tempest.lib.common.utils import data_utils -from tempest.lib import exceptions as lib_exc -from tempest import test - -from neutron.tests.api import base - - -class NetworksNegativeTestJSON(base.BaseNetworkTest): - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('9293e937-824d-42d2-8d5b-e985ea67002a') - def test_show_non_existent_network(self): - non_exist_id = data_utils.rand_name('network') - self.assertRaises(lib_exc.NotFound, self.client.show_network, - non_exist_id) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('d746b40c-5e09-4043-99f7-cba1be8b70df') - def test_show_non_existent_subnet(self): - non_exist_id = data_utils.rand_name('subnet') - self.assertRaises(lib_exc.NotFound, self.client.show_subnet, - non_exist_id) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('a954861d-cbfd-44e8-b0a9-7fab111f235d') - def test_show_non_existent_port(self): - non_exist_id = data_utils.rand_name('port') - self.assertRaises(lib_exc.NotFound, self.client.show_port, - non_exist_id) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('98bfe4e3-574e-4012-8b17-b2647063de87') - def test_update_non_existent_network(self): - non_exist_id = data_utils.rand_name('network') - self.assertRaises(lib_exc.NotFound, self.client.update_network, - non_exist_id, name="new_name") - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('03795047-4a94-4120-a0a1-bd376e36fd4e') - def test_delete_non_existent_network(self): - non_exist_id = data_utils.rand_name('network') - self.assertRaises(lib_exc.NotFound, self.client.delete_network, - non_exist_id) diff --git a/neutron/tests/api/test_ports.py b/neutron/tests/api/test_ports.py index d5841470145..abb637ce518 100644 --- a/neutron/tests/api/test_ports.py +++ b/neutron/tests/api/test_ports.py @@ -13,61 +13,17 @@ # License for the specific language governing permissions and limitations # under the License. -import socket - -import netaddr -from tempest.common import custom_matchers -from tempest.lib.common.utils import data_utils from tempest import test from neutron.tests.api import base -from neutron.tests.api import base_security_groups as sec_base -from neutron.tests.tempest import config - -CONF = config.CONF -class PortsTestJSON(sec_base.BaseSecGroupTest): - - """ - Test the following operations for ports: - - port create - port delete - port list - port show - port update - """ +class PortsTestJSON(base.BaseNetworkTest): @classmethod def resource_setup(cls): super(PortsTestJSON, cls).resource_setup() cls.network = cls.create_network() - cls.port = cls.create_port(cls.network) - - def _delete_port(self, port_id): - self.client.delete_port(port_id) - body = self.client.list_ports() - ports_list = body['ports'] - self.assertNotIn(port_id, [n['id'] for n in ports_list]) - - @test.attr(type='smoke') - @test.idempotent_id('c72c1c0c-2193-4aca-aaa4-b1442640f51c') - def test_create_update_delete_port(self): - # Verify port creation - body = self.client.create_port(network_id=self.network['id']) - port = body['port'] - # Schedule port deletion with verification upon test completion - self.addCleanup(self._delete_port, port['id']) - self.assertTrue(port['admin_state_up']) - # Verify port update - new_name = "New_Port" - body = self.client.update_port(port['id'], - name=new_name, - admin_state_up=False) - updated_port = body['port'] - self.assertEqual(updated_port['name'], new_name) - self.assertFalse(updated_port['admin_state_up']) @test.attr(type='smoke') @test.idempotent_id('c72c1c0c-2193-4aca-bbb4-b1442640bbbb') @@ -86,333 +42,3 @@ class PortsTestJSON(sec_base.BaseSecGroupTest): self.assertEqual('d2', body['port']['description']) body = self.client.list_ports(id=body['port']['id'])['ports'][0] self.assertEqual('d2', body['description']) - - @test.idempotent_id('67f1b811-f8db-43e2-86bd-72c074d4a42c') - def test_create_bulk_port(self): - network1 = self.network - name = data_utils.rand_name('network-') - network2 = self.create_network(network_name=name) - network_list = [network1['id'], network2['id']] - port_list = [{'network_id': net_id} for net_id in network_list] - body = self.client.create_bulk_port(port_list) - created_ports = body['ports'] - port1 = created_ports[0] - port2 = created_ports[1] - self.addCleanup(self._delete_port, port1['id']) - self.addCleanup(self._delete_port, port2['id']) - self.assertEqual(port1['network_id'], network1['id']) - self.assertEqual(port2['network_id'], network2['id']) - self.assertTrue(port1['admin_state_up']) - self.assertTrue(port2['admin_state_up']) - - @classmethod - def _get_ipaddress_from_tempest_conf(cls): - """Return first subnet gateway for configured CIDR """ - if cls._ip_version == 4: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr) - - elif cls._ip_version == 6: - cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr) - - return netaddr.IPAddress(cidr) - - @test.attr(type='smoke') - @test.idempotent_id('0435f278-40ae-48cb-a404-b8a087bc09b1') - def test_create_port_in_allowed_allocation_pools(self): - network = self.create_network() - net_id = network['id'] - address = self._get_ipaddress_from_tempest_conf() - allocation_pools = {'allocation_pools': [{'start': str(address + 4), - 'end': str(address + 6)}]} - subnet = self.create_subnet(network, **allocation_pools) - self.addCleanup(self.client.delete_subnet, subnet['id']) - body = self.client.create_port(network_id=net_id) - self.addCleanup(self.client.delete_port, body['port']['id']) - port = body['port'] - ip_address = port['fixed_ips'][0]['ip_address'] - start_ip_address = allocation_pools['allocation_pools'][0]['start'] - end_ip_address = allocation_pools['allocation_pools'][0]['end'] - ip_range = netaddr.IPRange(start_ip_address, end_ip_address) - self.assertIn(ip_address, ip_range) - - @test.attr(type='smoke') - @test.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f') - def test_show_port(self): - # Verify the details of port - body = self.client.show_port(self.port['id']) - port = body['port'] - self.assertIn('id', port) - # TODO(Santosh)- This is a temporary workaround to compare create_port - # and show_port dict elements.Remove this once extra_dhcp_opts issue - # gets fixed in neutron.( bug - 1365341.) - self.assertThat(self.port, - custom_matchers.MatchesDictExceptForKeys - (port, excluded_keys=['extra_dhcp_opts'])) - - @test.attr(type='smoke') - @test.idempotent_id('45fcdaf2-dab0-4c13-ac6c-fcddfb579dbd') - def test_show_port_fields(self): - # Verify specific fields of a port - fields = ['id', 'mac_address'] - body = self.client.show_port(self.port['id'], - fields=fields) - port = body['port'] - self.assertEqual(sorted(port.keys()), sorted(fields)) - for field_name in fields: - self.assertEqual(port[field_name], self.port[field_name]) - - @test.attr(type='smoke') - @test.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e') - def test_list_ports(self): - # Verify the port exists in the list of all ports - body = self.client.list_ports() - ports = [port['id'] for port in body['ports'] - if port['id'] == self.port['id']] - self.assertNotEmpty(ports, "Created port not found in the list") - - @test.attr(type='smoke') - @test.idempotent_id('5ad01ed0-0e6e-4c5d-8194-232801b15c72') - def test_port_list_filter_by_router_id(self): - # Create a router - network = self.create_network() - self.addCleanup(self.client.delete_network, network['id']) - subnet = self.create_subnet(network) - self.addCleanup(self.client.delete_subnet, subnet['id']) - router = self.create_router(data_utils.rand_name('router-')) - self.addCleanup(self.client.delete_router, router['id']) - port = self.client.create_port(network_id=network['id']) - # Add router interface to port created above - self.client.add_router_interface_with_port_id( - router['id'], port['port']['id']) - self.addCleanup(self.client.remove_router_interface_with_port_id, - router['id'], port['port']['id']) - # List ports filtered by router_id - port_list = self.client.list_ports(device_id=router['id']) - ports = port_list['ports'] - self.assertEqual(len(ports), 1) - self.assertEqual(ports[0]['id'], port['port']['id']) - self.assertEqual(ports[0]['device_id'], router['id']) - - @test.attr(type='smoke') - @test.idempotent_id('ff7f117f-f034-4e0e-abff-ccef05c454b4') - def test_list_ports_fields(self): - # Verify specific fields of ports - fields = ['id', 'mac_address'] - body = self.client.list_ports(fields=fields) - ports = body['ports'] - self.assertNotEmpty(ports, "Port list returned is empty") - # Asserting the fields returned are correct - for port in ports: - self.assertEqual(sorted(fields), sorted(port.keys())) - - @test.attr(type='smoke') - @test.idempotent_id('63aeadd4-3b49-427f-a3b1-19ca81f06270') - def test_create_update_port_with_second_ip(self): - # Create a network with two subnets - network = self.create_network() - self.addCleanup(self.client.delete_network, network['id']) - subnet_1 = self.create_subnet(network) - self.addCleanup(self.client.delete_subnet, subnet_1['id']) - subnet_2 = self.create_subnet(network) - self.addCleanup(self.client.delete_subnet, subnet_2['id']) - fixed_ip_1 = [{'subnet_id': subnet_1['id']}] - fixed_ip_2 = [{'subnet_id': subnet_2['id']}] - - fixed_ips = fixed_ip_1 + fixed_ip_2 - - # Create a port with multiple IP addresses - port = self.create_port(network, - fixed_ips=fixed_ips) - self.addCleanup(self.client.delete_port, port['id']) - self.assertEqual(2, len(port['fixed_ips'])) - check_fixed_ips = [subnet_1['id'], subnet_2['id']] - for item in port['fixed_ips']: - self.assertIn(item['subnet_id'], check_fixed_ips) - - # Update the port to return to a single IP address - port = self.update_port(port, fixed_ips=fixed_ip_1) - self.assertEqual(1, len(port['fixed_ips'])) - - # Update the port with a second IP address from second subnet - port = self.update_port(port, fixed_ips=fixed_ips) - self.assertEqual(2, len(port['fixed_ips'])) - - def _update_port_with_security_groups(self, security_groups_names): - subnet_1 = self.create_subnet(self.network) - self.addCleanup(self.client.delete_subnet, subnet_1['id']) - fixed_ip_1 = [{'subnet_id': subnet_1['id']}] - - security_groups_list = list() - for name in security_groups_names: - group_create_body = self.client.create_security_group( - name=name) - self.addCleanup(self.client.delete_security_group, - group_create_body['security_group']['id']) - security_groups_list.append(group_create_body['security_group'] - ['id']) - # Create a port - sec_grp_name = data_utils.rand_name('secgroup') - security_group = self.client.create_security_group(name=sec_grp_name) - self.addCleanup(self.client.delete_security_group, - security_group['security_group']['id']) - post_body = { - "name": data_utils.rand_name('port-'), - "security_groups": [security_group['security_group']['id']], - "network_id": self.network['id'], - "admin_state_up": True, - "fixed_ips": fixed_ip_1} - body = self.client.create_port(**post_body) - self.addCleanup(self.client.delete_port, body['port']['id']) - port = body['port'] - - # Update the port with security groups - subnet_2 = self.create_subnet(self.network) - fixed_ip_2 = [{'subnet_id': subnet_2['id']}] - update_body = {"name": data_utils.rand_name('port-'), - "admin_state_up": False, - "fixed_ips": fixed_ip_2, - "security_groups": security_groups_list} - body = self.client.update_port(port['id'], **update_body) - port_show = body['port'] - # Verify the security groups and other attributes updated to port - exclude_keys = set(port_show).symmetric_difference(update_body) - exclude_keys.add('fixed_ips') - exclude_keys.add('security_groups') - self.assertThat(port_show, custom_matchers.MatchesDictExceptForKeys( - update_body, exclude_keys)) - self.assertEqual(fixed_ip_2[0]['subnet_id'], - port_show['fixed_ips'][0]['subnet_id']) - - for security_group in security_groups_list: - self.assertIn(security_group, port_show['security_groups']) - - @test.attr(type='smoke') - @test.idempotent_id('58091b66-4ff4-4cc1-a549-05d60c7acd1a') - def test_update_port_with_security_group_and_extra_attributes(self): - self._update_port_with_security_groups( - [data_utils.rand_name('secgroup')]) - - @test.attr(type='smoke') - @test.idempotent_id('edf6766d-3d40-4621-bc6e-2521a44c257d') - def test_update_port_with_two_security_groups_and_extra_attributes(self): - self._update_port_with_security_groups( - [data_utils.rand_name('secgroup'), - data_utils.rand_name('secgroup')]) - - @test.attr(type='smoke') - @test.idempotent_id('13e95171-6cbd-489c-9d7c-3f9c58215c18') - def test_create_show_delete_port_user_defined_mac(self): - # Create a port for a legal mac - body = self.client.create_port(network_id=self.network['id']) - old_port = body['port'] - free_mac_address = old_port['mac_address'] - self.client.delete_port(old_port['id']) - # Create a new port with user defined mac - body = self.client.create_port(network_id=self.network['id'], - mac_address=free_mac_address) - self.addCleanup(self.client.delete_port, body['port']['id']) - port = body['port'] - body = self.client.show_port(port['id']) - show_port = body['port'] - self.assertEqual(free_mac_address, - show_port['mac_address']) - - @test.attr(type='smoke') - @test.idempotent_id('4179dcb9-1382-4ced-84fe-1b91c54f5735') - def test_create_port_with_no_securitygroups(self): - network = self.create_network() - self.addCleanup(self.client.delete_network, network['id']) - subnet = self.create_subnet(network) - self.addCleanup(self.client.delete_subnet, subnet['id']) - port = self.create_port(network, security_groups=[]) - self.addCleanup(self.client.delete_port, port['id']) - self.assertIsNotNone(port['security_groups']) - self.assertEmpty(port['security_groups']) - - -class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest): - - @classmethod - def resource_setup(cls): - super(PortsAdminExtendedAttrsTestJSON, cls).resource_setup() - cls.network = cls.create_network() - cls.host_id = socket.gethostname() - - @test.attr(type='smoke') - @test.idempotent_id('8e8569c1-9ac7-44db-8bc1-f5fb2814f29b') - def test_create_port_binding_ext_attr(self): - post_body = {"network_id": self.network['id'], - "binding:host_id": self.host_id} - body = self.admin_client.create_port(**post_body) - port = body['port'] - self.addCleanup(self.admin_client.delete_port, port['id']) - host_id = port['binding:host_id'] - self.assertIsNotNone(host_id) - self.assertEqual(self.host_id, host_id) - - @test.attr(type='smoke') - @test.idempotent_id('6f6c412c-711f-444d-8502-0ac30fbf5dd5') - def test_update_port_binding_ext_attr(self): - post_body = {"network_id": self.network['id']} - body = self.admin_client.create_port(**post_body) - port = body['port'] - self.addCleanup(self.admin_client.delete_port, port['id']) - update_body = {"binding:host_id": self.host_id} - body = self.admin_client.update_port(port['id'], **update_body) - updated_port = body['port'] - host_id = updated_port['binding:host_id'] - self.assertIsNotNone(host_id) - self.assertEqual(self.host_id, host_id) - - @test.attr(type='smoke') - @test.idempotent_id('1c82a44a-6c6e-48ff-89e1-abe7eaf8f9f8') - def test_list_ports_binding_ext_attr(self): - # Create a new port - post_body = {"network_id": self.network['id']} - body = self.admin_client.create_port(**post_body) - port = body['port'] - self.addCleanup(self.admin_client.delete_port, port['id']) - - # Update the port's binding attributes so that is now 'bound' - # to a host - update_body = {"binding:host_id": self.host_id} - self.admin_client.update_port(port['id'], **update_body) - - # List all ports, ensure new port is part of list and its binding - # attributes are set and accurate - body = self.admin_client.list_ports() - ports_list = body['ports'] - pids_list = [p['id'] for p in ports_list] - self.assertIn(port['id'], pids_list) - listed_port = [p for p in ports_list if p['id'] == port['id']] - self.assertEqual(1, len(listed_port), - 'Multiple ports listed with id %s in ports listing: ' - '%s' % (port['id'], ports_list)) - self.assertEqual(self.host_id, listed_port[0]['binding:host_id']) - - @test.attr(type='smoke') - @test.idempotent_id('b54ac0ff-35fc-4c79-9ca3-c7dbd4ea4f13') - def test_show_port_binding_ext_attr(self): - body = self.admin_client.create_port(network_id=self.network['id']) - port = body['port'] - self.addCleanup(self.admin_client.delete_port, port['id']) - body = self.admin_client.show_port(port['id']) - show_port = body['port'] - self.assertEqual(port['binding:host_id'], - show_port['binding:host_id']) - self.assertEqual(port['binding:vif_type'], - show_port['binding:vif_type']) - self.assertEqual(port['binding:vif_details'], - show_port['binding:vif_details']) - - -class PortsIpV6TestJSON(PortsTestJSON): - _ip_version = 6 - _tenant_network_cidr = CONF.network.tenant_network_v6_cidr - _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits - - -class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON): - _ip_version = 6 - _tenant_network_cidr = CONF.network.tenant_network_v6_cidr - _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits diff --git a/neutron/tests/api/test_routers.py b/neutron/tests/api/test_routers.py index c10dcd68719..35e05618f34 100644 --- a/neutron/tests/api/test_routers.py +++ b/neutron/tests/api/test_routers.py @@ -40,45 +40,6 @@ class RoutersTest(base.BaseRouterTest): if cls._ip_version == 4 else CONF.network.tenant_network_v6_cidr) - @test.attr(type='smoke') - @test.idempotent_id('f64403e2-8483-4b34-8ccd-b09a87bcc68c') - def test_create_show_list_update_delete_router(self): - # Create a router - # NOTE(salv-orlando): Do not invoke self.create_router - # as we need to check the response code - name = data_utils.rand_name('router-') - create_body = self.client.create_router( - name, external_gateway_info={ - "network_id": CONF.network.public_network_id}, - admin_state_up=False) - self.addCleanup(self._delete_router, create_body['router']['id']) - self.assertEqual(create_body['router']['name'], name) - self.assertEqual( - create_body['router']['external_gateway_info']['network_id'], - CONF.network.public_network_id) - self.assertFalse(create_body['router']['admin_state_up']) - # Show details of the created router - show_body = self.client.show_router(create_body['router']['id']) - self.assertEqual(show_body['router']['name'], name) - self.assertEqual( - show_body['router']['external_gateway_info']['network_id'], - CONF.network.public_network_id) - self.assertFalse(show_body['router']['admin_state_up']) - # List routers and verify if created router is there in response - list_body = self.client.list_routers() - routers_list = list() - for router in list_body['routers']: - routers_list.append(router['id']) - self.assertIn(create_body['router']['id'], routers_list) - # Update the name of router and verify if it is updated - updated_name = 'updated ' + name - update_body = self.client.update_router(create_body['router']['id'], - name=updated_name) - self.assertEqual(update_body['router']['name'], updated_name) - show_body = self.client.show_router( - create_body['router']['id']) - self.assertEqual(show_body['router']['name'], updated_name) - @test.attr(type='smoke') @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640eeee') def test_create_update_router_description(self): @@ -95,24 +56,6 @@ class RoutersTest(base.BaseRouterTest): body = self.client.show_router(body['router']['id'])['router'] self.assertEqual('d2', body['description']) - @test.attr(type='smoke') - @test.idempotent_id('e54dd3a3-4352-4921-b09d-44369ae17397') - def test_create_router_setting_tenant_id(self): - # Test creating router from admin user setting tenant_id. - test_tenant = data_utils.rand_name('test_tenant_') - test_description = data_utils.rand_name('desc_') - tenant = self.identity_admin_client.create_tenant( - name=test_tenant, description=test_description)['tenant'] - tenant_id = tenant['id'] - self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id) - - name = data_utils.rand_name('router-') - create_body = self.admin_client.create_router(name, - tenant_id=tenant_id) - self.addCleanup(self.admin_client.delete_router, - create_body['router']['id']) - self.assertEqual(tenant_id, create_body['router']['tenant_id']) - @test.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8') @test.requires_ext(extension='ext-gw-mode', service='network') @test.attr(type='smoke') @@ -144,46 +87,6 @@ class RoutersTest(base.BaseRouterTest): self._verify_router_gateway(create_body['router']['id'], exp_ext_gw_info=external_gateway_info) - @test.attr(type='smoke') - @test.idempotent_id('b42e6e39-2e37-49cc-a6f4-8467e940900a') - def test_add_remove_router_interface_with_subnet_id(self): - network = self.create_network() - subnet = self.create_subnet(network) - router = self._create_router(data_utils.rand_name('router-')) - # Add router interface with subnet id - interface = self.client.add_router_interface_with_subnet_id( - router['id'], subnet['id']) - self.addCleanup(self._remove_router_interface_with_subnet_id, - router['id'], subnet['id']) - self.assertIn('subnet_id', interface.keys()) - self.assertIn('port_id', interface.keys()) - # Verify router id is equal to device id in port details - show_port_body = self.client.show_port( - interface['port_id']) - self.assertEqual(show_port_body['port']['device_id'], - router['id']) - - @test.attr(type='smoke') - @test.idempotent_id('2b7d2f37-6748-4d78-92e5-1d590234f0d5') - def test_add_remove_router_interface_with_port_id(self): - network = self.create_network() - self.create_subnet(network) - router = self._create_router(data_utils.rand_name('router-')) - port_body = self.client.create_port( - network_id=network['id']) - # add router interface to port created above - interface = self.client.add_router_interface_with_port_id( - router['id'], port_body['port']['id']) - self.addCleanup(self._remove_router_interface_with_port_id, - router['id'], port_body['port']['id']) - self.assertIn('subnet_id', interface.keys()) - self.assertIn('port_id', interface.keys()) - # Verify router id is equal to device id in port details - show_port_body = self.client.show_port( - interface['port_id']) - self.assertEqual(show_port_body['port']['device_id'], - router['id']) - def _verify_router_gateway(self, router_id, exp_ext_gw_info=None): show_body = self.admin_client.show_router(router_id) actual_ext_gw_info = show_body['router']['external_gateway_info'] @@ -208,20 +111,6 @@ class RoutersTest(base.BaseRouterTest): self.assertIn(public_subnet_id, [x['subnet_id'] for x in fixed_ips]) - @test.attr(type='smoke') - @test.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79') - def test_update_router_set_gateway(self): - router = self._create_router(data_utils.rand_name('router-')) - self.client.update_router( - router['id'], - external_gateway_info={ - 'network_id': CONF.network.public_network_id}) - # Verify operation - router - self._verify_router_gateway( - router['id'], - {'network_id': CONF.network.public_network_id}) - self._verify_gateway_port(router['id']) - @test.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33') @test.requires_ext(extension='ext-gw-mode', service='network') @test.attr(type='smoke') @@ -254,20 +143,6 @@ class RoutersTest(base.BaseRouterTest): 'enable_snat': False}) self._verify_gateway_port(router['id']) - @test.attr(type='smoke') - @test.idempotent_id('ad81b7ee-4f81-407b-a19c-17e623f763e8') - def test_update_router_unset_gateway(self): - router = self._create_router( - data_utils.rand_name('router-'), - external_network_id=CONF.network.public_network_id) - self.client.update_router(router['id'], external_gateway_info={}) - self._verify_router_gateway(router['id']) - # No gateway port expected - list_body = self.admin_client.list_ports( - network_id=CONF.network.public_network_id, - device_id=router['id']) - self.assertFalse(list_body['ports']) - @test.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374') @test.requires_ext(extension='ext-gw-mode', service='network') @test.attr(type='smoke') @@ -321,45 +196,6 @@ class RoutersTest(base.BaseRouterTest): def _delete_extra_routes(self, router_id): self.client.delete_extra_routes(router_id) - @test.attr(type='smoke') - @test.idempotent_id('a8902683-c788-4246-95c7-ad9c6d63a4d9') - def test_update_router_admin_state(self): - self.router = self._create_router(data_utils.rand_name('router-')) - self.assertFalse(self.router['admin_state_up']) - # Update router admin state - update_body = self.client.update_router(self.router['id'], - admin_state_up=True) - self.assertTrue(update_body['router']['admin_state_up']) - show_body = self.client.show_router(self.router['id']) - self.assertTrue(show_body['router']['admin_state_up']) - - @test.attr(type='smoke') - @test.idempotent_id('802c73c9-c937-4cef-824b-2191e24a6aab') - def test_add_multiple_router_interfaces(self): - network01 = self.create_network( - network_name=data_utils.rand_name('router-network01-')) - network02 = self.create_network( - network_name=data_utils.rand_name('router-network02-')) - subnet01 = self.create_subnet(network01) - sub02_cidr = netaddr.IPNetwork(self.tenant_cidr).next() - subnet02 = self.create_subnet(network02, cidr=sub02_cidr) - router = self._create_router(data_utils.rand_name('router-')) - interface01 = self._add_router_interface_with_subnet_id(router['id'], - subnet01['id']) - self._verify_router_interface(router['id'], subnet01['id'], - interface01['port_id']) - interface02 = self._add_router_interface_with_subnet_id(router['id'], - subnet02['id']) - self._verify_router_interface(router['id'], subnet02['id'], - interface02['port_id']) - - def _verify_router_interface(self, router_id, subnet_id, port_id): - show_port_body = self.client.show_port(port_id) - interface_port = show_port_body['port'] - self.assertEqual(router_id, interface_port['device_id']) - self.assertEqual(subnet_id, - interface_port['fixed_ips'][0]['subnet_id']) - @test.attr(type='smoke') @test.idempotent_id('01f185d1-d1a6-4cf9-abf7-e0e1384c169c') def test_network_attached_with_two_routers(self): diff --git a/neutron/tests/api/test_routers_negative.py b/neutron/tests/api/test_routers_negative.py index 1aa79dd40b5..0367c2f9df8 100644 --- a/neutron/tests/api/test_routers_negative.py +++ b/neutron/tests/api/test_routers_negative.py @@ -13,104 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. -import netaddr from tempest.lib.common.utils import data_utils from tempest.lib import exceptions as lib_exc from tempest import test import testtools from neutron.tests.api import base_routers as base -from neutron.tests.tempest import config - -CONF = config.CONF - - -class RoutersNegativeTest(base.BaseRouterTest): - - @classmethod - def resource_setup(cls): - super(RoutersNegativeTest, cls).resource_setup() - if not test.is_extension_enabled('router', 'network'): - msg = "router extension not enabled." - raise cls.skipException(msg) - cls.router = cls.create_router(data_utils.rand_name('router-')) - cls.network = cls.create_network() - cls.subnet = cls.create_subnet(cls.network) - cls.tenant_cidr = (CONF.network.tenant_network_cidr - if cls._ip_version == 4 else - CONF.network.tenant_network_v6_cidr) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('37a94fc0-a834-45b9-bd23-9a81d2fd1e22') - def test_router_add_gateway_invalid_network_returns_404(self): - self.assertRaises(lib_exc.NotFound, - self.client.update_router, - self.router['id'], - external_gateway_info={ - 'network_id': self.router['id']}) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('11836a18-0b15-4327-a50b-f0d9dc66bddd') - def test_router_add_gateway_net_not_external_returns_400(self): - alt_network = self.create_network( - network_name=data_utils.rand_name('router-negative-')) - sub_cidr = netaddr.IPNetwork(self.tenant_cidr).next() - self.create_subnet(alt_network, cidr=sub_cidr) - self.assertRaises(lib_exc.BadRequest, - self.client.update_router, - self.router['id'], - external_gateway_info={ - 'network_id': alt_network['id']}) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('957751a3-3c68-4fa2-93b6-eb52ea10db6e') - def test_add_router_interfaces_on_overlapping_subnets_returns_400(self): - network01 = self.create_network( - network_name=data_utils.rand_name('router-network01-')) - network02 = self.create_network( - network_name=data_utils.rand_name('router-network02-')) - subnet01 = self.create_subnet(network01) - subnet02 = self.create_subnet(network02) - self._add_router_interface_with_subnet_id(self.router['id'], - subnet01['id']) - self.assertRaises(lib_exc.BadRequest, - self._add_router_interface_with_subnet_id, - self.router['id'], - subnet02['id']) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('04df80f9-224d-47f5-837a-bf23e33d1c20') - def test_router_remove_interface_in_use_returns_409(self): - self.client.add_router_interface_with_subnet_id( - self.router['id'], self.subnet['id']) - self.assertRaises(lib_exc.Conflict, - self.client.delete_router, - self.router['id']) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('c2a70d72-8826-43a7-8208-0209e6360c47') - def test_show_non_existent_router_returns_404(self): - router = data_utils.rand_name('non_exist_router') - self.assertRaises(lib_exc.NotFound, self.client.show_router, - router) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('b23d1569-8b0c-4169-8d4b-6abd34fad5c7') - def test_update_non_existent_router_returns_404(self): - router = data_utils.rand_name('non_exist_router') - self.assertRaises(lib_exc.NotFound, self.client.update_router, - router, name="new_name") - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('c7edc5ad-d09d-41e6-a344-5c0c31e2e3e4') - def test_delete_non_existent_router_returns_404(self): - router = data_utils.rand_name('non_exist_router') - self.assertRaises(lib_exc.NotFound, self.client.delete_router, - router) - - -class RoutersNegativeIpV6Test(RoutersNegativeTest): - _ip_version = 6 class DvrRoutersNegativeTest(base.BaseRouterTest): diff --git a/neutron/tests/api/test_security_groups.py b/neutron/tests/api/test_security_groups.py index bf6092e2916..ae095ad2f98 100644 --- a/neutron/tests/api/test_security_groups.py +++ b/neutron/tests/api/test_security_groups.py @@ -13,20 +13,14 @@ # License for the specific language governing permissions and limitations # under the License. -import six from tempest.lib.common.utils import data_utils from tempest import test from neutron.tests.api import base_security_groups as base -from neutron.tests.tempest import config - -CONF = config.CONF class SecGroupTest(base.BaseSecGroupTest): - _tenant_network_cidr = CONF.network.tenant_network_cidr - @classmethod def resource_setup(cls): super(SecGroupTest, cls).resource_setup() @@ -34,53 +28,6 @@ class SecGroupTest(base.BaseSecGroupTest): msg = "security-group extension not enabled." raise cls.skipException(msg) - def _create_verify_security_group_rule(self, sg_id, direction, - ethertype, protocol, - port_range_min, - port_range_max, - remote_group_id=None, - remote_ip_prefix=None): - # Create Security Group rule with the input params and validate - # that SG rule is created with the same parameters. - rule_create_body = self.client.create_security_group_rule( - security_group_id=sg_id, - direction=direction, - ethertype=ethertype, - protocol=protocol, - port_range_min=port_range_min, - port_range_max=port_range_max, - remote_group_id=remote_group_id, - remote_ip_prefix=remote_ip_prefix - ) - - sec_group_rule = rule_create_body['security_group_rule'] - self.addCleanup(self._delete_security_group_rule, - sec_group_rule['id']) - - expected = {'direction': direction, 'protocol': protocol, - 'ethertype': ethertype, 'port_range_min': port_range_min, - 'port_range_max': port_range_max, - 'remote_group_id': remote_group_id, - 'remote_ip_prefix': remote_ip_prefix} - for key, value in six.iteritems(expected): - self.assertEqual(value, sec_group_rule[key], - "Field %s of the created security group " - "rule does not match with %s." % - (key, value)) - - @test.attr(type='smoke') - @test.idempotent_id('e30abd17-fef9-4739-8617-dc26da88e686') - def test_list_security_groups(self): - # Verify the that security group belonging to tenant exist in list - body = self.client.list_security_groups() - security_groups = body['security_groups'] - found = None - for n in security_groups: - if (n['name'] == 'default'): - found = n['id'] - msg = "Security-group list doesn't contain default security-group" - self.assertIsNotNone(found, msg) - @test.attr(type='smoke') @test.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802') def test_create_list_update_show_delete_security_group(self): @@ -109,153 +56,3 @@ class SecGroupTest(base.BaseSecGroupTest): self.assertEqual(show_body['security_group']['name'], new_name) self.assertEqual(show_body['security_group']['description'], new_description) - - @test.attr(type='smoke') - @test.idempotent_id('cfb99e0e-7410-4a3d-8a0c-959a63ee77e9') - def test_create_show_delete_security_group_rule(self): - group_create_body, _ = self._create_security_group() - - # Create rules for each protocol - protocols = ['tcp', 'udp', 'icmp'] - for protocol in protocols: - rule_create_body = self.client.create_security_group_rule( - security_group_id=group_create_body['security_group']['id'], - protocol=protocol, - direction='ingress', - ethertype=self.ethertype - ) - - # Show details of the created security rule - show_rule_body = self.client.show_security_group_rule( - rule_create_body['security_group_rule']['id'] - ) - create_dict = rule_create_body['security_group_rule'] - for key, value in six.iteritems(create_dict): - self.assertEqual(value, - show_rule_body['security_group_rule'][key], - "%s does not match." % key) - - # List rules and verify created rule is in response - rule_list_body = self.client.list_security_group_rules() - rule_list = [rule['id'] - for rule in rule_list_body['security_group_rules']] - self.assertIn(rule_create_body['security_group_rule']['id'], - rule_list) - - @test.attr(type='smoke') - @test.idempotent_id('c72c1c0c-2193-4aca-fff2-b1442640bbbb') - def test_create_security_group_rule_description(self): - if not test.is_extension_enabled('standard-attr-description', - 'network'): - msg = "standard-attr-description not enabled." - raise self.skipException(msg) - sg = self._create_security_group()[0]['security_group'] - rule = self.client.create_security_group_rule( - security_group_id=sg['id'], protocol='tcp', - direction='ingress', ethertype=self.ethertype, - description='d1' - )['security_group_rule'] - self.assertEqual('d1', rule['description']) - body = self.client.show_security_group_rule(rule['id']) - self.assertEqual('d1', body['security_group_rule']['description']) - - @test.attr(type='smoke') - @test.idempotent_id('87dfbcf9-1849-43ea-b1e4-efa3eeae9f71') - def test_create_security_group_rule_with_additional_args(self): - """Verify security group rule with additional arguments works. - - direction:ingress, ethertype:[IPv4/IPv6], - protocol:tcp, port_range_min:77, port_range_max:77 - """ - group_create_body, _ = self._create_security_group() - sg_id = group_create_body['security_group']['id'] - direction = 'ingress' - protocol = 'tcp' - port_range_min = 77 - port_range_max = 77 - self._create_verify_security_group_rule(sg_id, direction, - self.ethertype, protocol, - port_range_min, - port_range_max) - - @test.attr(type='smoke') - @test.idempotent_id('c9463db8-b44d-4f52-b6c0-8dbda99f26ce') - def test_create_security_group_rule_with_icmp_type_code(self): - """Verify security group rule for icmp protocol works. - - Specify icmp type (port_range_min) and icmp code - (port_range_max) with different values. A separate testcase - is added for icmp protocol as icmp validation would be - different from tcp/udp. - """ - group_create_body, _ = self._create_security_group() - - sg_id = group_create_body['security_group']['id'] - direction = 'ingress' - protocol = 'icmp' - icmp_type_codes = [(3, 2), (3, 0), (8, 0), (0, 0), (11, None)] - for icmp_type, icmp_code in icmp_type_codes: - self._create_verify_security_group_rule(sg_id, direction, - self.ethertype, protocol, - icmp_type, icmp_code) - - @test.attr(type='smoke') - @test.idempotent_id('c2ed2deb-7a0c-44d8-8b4c-a5825b5c310b') - def test_create_security_group_rule_with_remote_group_id(self): - # Verify creating security group rule with remote_group_id works - sg1_body, _ = self._create_security_group() - sg2_body, _ = self._create_security_group() - - sg_id = sg1_body['security_group']['id'] - direction = 'ingress' - protocol = 'udp' - port_range_min = 50 - port_range_max = 55 - remote_id = sg2_body['security_group']['id'] - self._create_verify_security_group_rule(sg_id, direction, - self.ethertype, protocol, - port_range_min, - port_range_max, - remote_group_id=remote_id) - - @test.attr(type='smoke') - @test.idempotent_id('16459776-5da2-4634-bce4-4b55ee3ec188') - def test_create_security_group_rule_with_remote_ip_prefix(self): - # Verify creating security group rule with remote_ip_prefix works - sg1_body, _ = self._create_security_group() - - sg_id = sg1_body['security_group']['id'] - direction = 'ingress' - protocol = 'tcp' - port_range_min = 76 - port_range_max = 77 - ip_prefix = self._tenant_network_cidr - self._create_verify_security_group_rule(sg_id, direction, - self.ethertype, protocol, - port_range_min, - port_range_max, - remote_ip_prefix=ip_prefix) - - @test.attr(type='smoke') - @test.idempotent_id('0a307599-6655-4220-bebc-fd70c64f2290') - def test_create_security_group_rule_with_protocol_integer_value(self): - # Verify creating security group rule with the - # protocol as integer value - # arguments : "protocol": 17 - group_create_body, _ = self._create_security_group() - direction = 'ingress' - protocol = 17 - security_group_id = group_create_body['security_group']['id'] - rule_create_body = self.client.create_security_group_rule( - security_group_id=security_group_id, - direction=direction, - protocol=protocol - ) - sec_group_rule = rule_create_body['security_group_rule'] - self.assertEqual(sec_group_rule['direction'], direction) - self.assertEqual(int(sec_group_rule['protocol']), protocol) - - -class SecGroupIPv6Test(SecGroupTest): - _ip_version = 6 - _tenant_network_cidr = CONF.network.tenant_network_v6_cidr diff --git a/neutron/tests/api/test_security_groups_negative.py b/neutron/tests/api/test_security_groups_negative.py index 777de18c8ec..4bcfca2213c 100644 --- a/neutron/tests/api/test_security_groups_negative.py +++ b/neutron/tests/api/test_security_groups_negative.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import uuid - from tempest.lib import exceptions as lib_exc from tempest import test @@ -35,98 +33,6 @@ class NegativeSecGroupTest(base.BaseSecGroupTest): msg = "security-group extension not enabled." raise cls.skipException(msg) - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('424fd5c3-9ddc-486a-b45f-39bf0c820fc6') - def test_show_non_existent_security_group(self): - non_exist_id = str(uuid.uuid4()) - self.assertRaises(lib_exc.NotFound, self.client.show_security_group, - non_exist_id) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('4c094c09-000b-4e41-8100-9617600c02a6') - def test_show_non_existent_security_group_rule(self): - non_exist_id = str(uuid.uuid4()) - self.assertRaises(lib_exc.NotFound, - self.client.show_security_group_rule, - non_exist_id) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('1f1bb89d-5664-4956-9fcd-83ee0fa603df') - def test_delete_non_existent_security_group(self): - non_exist_id = str(uuid.uuid4()) - self.assertRaises(lib_exc.NotFound, - self.client.delete_security_group, - non_exist_id - ) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('981bdc22-ce48-41ed-900a-73148b583958') - def test_create_security_group_rule_with_bad_protocol(self): - group_create_body, _ = self._create_security_group() - - # Create rule with bad protocol name - pname = 'bad_protocol_name' - self.assertRaises( - lib_exc.BadRequest, self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], - protocol=pname, direction='ingress', ethertype=self.ethertype) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('5f8daf69-3c5f-4aaa-88c9-db1d66f68679') - def test_create_security_group_rule_with_bad_remote_ip_prefix(self): - group_create_body, _ = self._create_security_group() - - # Create rule with bad remote_ip_prefix - prefix = ['192.168.1./24', '192.168.1.1/33', 'bad_prefix', '256'] - for remote_ip_prefix in prefix: - self.assertRaises( - lib_exc.BadRequest, self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], - protocol='tcp', direction='ingress', ethertype=self.ethertype, - remote_ip_prefix=remote_ip_prefix) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('4bf786fd-2f02-443c-9716-5b98e159a49a') - def test_create_security_group_rule_with_non_existent_remote_groupid(self): - group_create_body, _ = self._create_security_group() - non_exist_id = str(uuid.uuid4()) - - # Create rule with non existent remote_group_id - group_ids = ['bad_group_id', non_exist_id] - for remote_group_id in group_ids: - self.assertRaises( - lib_exc.NotFound, self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], - protocol='tcp', direction='ingress', ethertype=self.ethertype, - remote_group_id=remote_group_id) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('b5c4b247-6b02-435b-b088-d10d45650881') - def test_create_security_group_rule_with_remote_ip_and_group(self): - sg1_body, _ = self._create_security_group() - sg2_body, _ = self._create_security_group() - - # Create rule specifying both remote_ip_prefix and remote_group_id - prefix = self._tenant_network_cidr - self.assertRaises( - lib_exc.BadRequest, self.client.create_security_group_rule, - security_group_id=sg1_body['security_group']['id'], - protocol='tcp', direction='ingress', - ethertype=self.ethertype, remote_ip_prefix=prefix, - remote_group_id=sg2_body['security_group']['id']) - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('5666968c-fff3-40d6-9efc-df1c8bd01abb') - def test_create_security_group_rule_with_bad_ethertype(self): - group_create_body, _ = self._create_security_group() - - # Create rule with bad ethertype - ethertype = 'bad_ethertype' - self.assertRaises( - lib_exc.BadRequest, self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], - protocol='udp', direction='ingress', ethertype=ethertype) - @test.attr(type=['negative', 'gate']) @test.idempotent_id('0d9c7791-f2ad-4e2f-ac73-abf2373b0d2d') def test_create_security_group_rule_with_invalid_ports(self): @@ -159,50 +65,6 @@ class NegativeSecGroupTest(base.BaseSecGroupTest): direction='ingress', ethertype=self.ethertype) self.assertIn(msg, str(ex)) - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('2323061e-9fbf-4eb0-b547-7e8fafc90849') - def test_create_additional_default_security_group_fails(self): - # Create security group named 'default', it should be failed. - name = 'default' - self.assertRaises(lib_exc.Conflict, - self.client.create_security_group, - name=name) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('8fde898f-ce88-493b-adc9-4e4692879fc5') - def test_create_duplicate_security_group_rule_fails(self): - # Create duplicate security group rule, it should fail. - body, _ = self._create_security_group() - - min_port = 66 - max_port = 67 - # Create a rule with valid params - self.client.create_security_group_rule( - security_group_id=body['security_group']['id'], - direction='ingress', - ethertype=self.ethertype, - protocol='tcp', - port_range_min=min_port, - port_range_max=max_port - ) - - # Try creating the same security group rule, it should fail - self.assertRaises( - lib_exc.Conflict, self.client.create_security_group_rule, - security_group_id=body['security_group']['id'], - protocol='tcp', direction='ingress', ethertype=self.ethertype, - port_range_min=min_port, port_range_max=max_port) - - @test.attr(type=['negative', 'smoke']) - @test.idempotent_id('be308db6-a7cf-4d5c-9baf-71bafd73f35e') - def test_create_security_group_rule_with_non_existent_security_group(self): - # Create security group rules with not existing security group. - non_existent_sg = str(uuid.uuid4()) - self.assertRaises(lib_exc.NotFound, - self.client.create_security_group_rule, - security_group_id=non_existent_sg, - direction='ingress', ethertype=self.ethertype) - @test.attr(type=['negative', 'smoke']) @test.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c') def test_update_default_security_group_name(self): @@ -215,23 +77,3 @@ class NegativeSecGroupTest(base.BaseSecGroupTest): class NegativeSecGroupIPv6Test(NegativeSecGroupTest): _ip_version = 6 _tenant_network_cidr = CONF.network.tenant_network_v6_cidr - - @test.attr(type=['negative', 'gate']) - @test.idempotent_id('7607439c-af73-499e-bf64-f687fd12a842') - def test_create_security_group_rule_wrong_ip_prefix_version(self): - group_create_body, _ = self._create_security_group() - - # Create rule with bad remote_ip_prefix - pairs = ({'ethertype': 'IPv6', - 'ip_prefix': CONF.network.tenant_network_cidr}, - {'ethertype': 'IPv4', - 'ip_prefix': CONF.network.tenant_network_v6_cidr}) - for pair in pairs: - self.assertRaisesRegexp( - lib_exc.BadRequest, - "Conflicting value ethertype", - self.client.create_security_group_rule, - security_group_id=group_create_body['security_group']['id'], - protocol='tcp', direction='ingress', - ethertype=pair['ethertype'], - remote_ip_prefix=pair['ip_prefix'])