From 980e54713776584f2b810d136a369ce5a73b3a7f Mon Sep 17 00:00:00 2001 From: Maru Newby Date: Fri, 3 Apr 2015 23:42:31 +0000 Subject: [PATCH] Prepare for unit test reorg The unit test reorg is about moving files around so a test module is clearly associated with the code module it targets, but the test modules in this change needed to be manually merged because they both targeted the same module. test_api_v2 is also updated to use the path of neutron/tests/base.py as the root of path to test implementations of extensions. Change-Id: I432b84339e51c26ef0aa26d44e29b5a3311626ad Implements: bp/reorganize-unit-test-tree --- neutron/tests/unit/agent/l3/test_l3_router.py | 226 ------------------ .../tests/unit/agent/l3/test_router_info.py | 205 ++++++++++++++++ .../unit/agent/linux/test_process_monitor.py | 95 -------- neutron/tests/unit/test_api_v2.py | 3 +- neutron/tests/unit/test_db_plugin.py | 60 +++++ neutron/tests/unit/test_db_plugin_level.py | 82 ------- .../unit/test_extension_extended_attribute.py | 154 ------------ neutron/tests/unit/test_extensions.py | 126 ++++++++++ .../tests/unit/test_linux_external_process.py | 77 ++++++ 9 files changed, 469 insertions(+), 559 deletions(-) delete mode 100644 neutron/tests/unit/agent/l3/test_l3_router.py delete mode 100644 neutron/tests/unit/agent/linux/test_process_monitor.py delete mode 100644 neutron/tests/unit/test_db_plugin_level.py delete mode 100644 neutron/tests/unit/test_extension_extended_attribute.py diff --git a/neutron/tests/unit/agent/l3/test_l3_router.py b/neutron/tests/unit/agent/l3/test_l3_router.py deleted file mode 100644 index 27d5b8030cd..00000000000 --- a/neutron/tests/unit/agent/l3/test_l3_router.py +++ /dev/null @@ -1,226 +0,0 @@ -# Copyright (c) 2015 Openstack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from neutron.agent.l3 import router_info -from neutron.agent.linux import ip_lib -from neutron.common import constants as l3_constants -from neutron.common import exceptions as n_exc -from neutron.openstack.common import uuidutils -from neutron.tests import base - -_uuid = uuidutils.generate_uuid - - -class BasicRouterTestCaseFramework(base.BaseTestCase): - def _create_router(self, router=None, **kwargs): - if not router: - router = mock.MagicMock() - self.agent_conf = mock.Mock() - # NOTE The use_namespaces config will soon be deprecated - self.agent_conf.use_namespaces = True - self.router_id = _uuid() - return router_info.RouterInfo(self.router_id, - router, - self.agent_conf, - mock.sentinel.interface_driver, - **kwargs) - - -class TestBasicRouterOperations(BasicRouterTestCaseFramework): - - def test_get_floating_ips(self): - router = mock.MagicMock() - router.get.return_value = [mock.sentinel.floating_ip] - ri = self._create_router(router) - - fips = ri.get_floating_ips() - - self.assertEqual([mock.sentinel.floating_ip], fips) - - def test_process_floating_ip_nat_rules(self): - ri = self._create_router() - fips = [{'fixed_ip_address': mock.sentinel.ip, - 'floating_ip_address': mock.sentinel.fip}] - ri.get_floating_ips = mock.Mock(return_value=fips) - ri.iptables_manager = mock.MagicMock() - ipv4_nat = ri.iptables_manager.ipv4['nat'] - ri.floating_forward_rules = mock.Mock( - return_value=[(mock.sentinel.chain, mock.sentinel.rule)]) - - ri.process_floating_ip_nat_rules() - - # Be sure that the rules are cleared first and apply is called last - self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), - ipv4_nat.mock_calls[0]) - self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) - - # Be sure that add_rule is called somewhere in the middle - ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain, - mock.sentinel.rule, - tag='floating_ip') - - def test_process_floating_ip_nat_rules_removed(self): - ri = self._create_router() - ri.get_floating_ips = mock.Mock(return_value=[]) - ri.iptables_manager = mock.MagicMock() - ipv4_nat = ri.iptables_manager.ipv4['nat'] - - ri.process_floating_ip_nat_rules() - - # Be sure that the rules are cleared first and apply is called last - self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), - ipv4_nat.mock_calls[0]) - self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) - - # Be sure that add_rule is called somewhere in the middle - self.assertFalse(ipv4_nat.add_rule.called) - - def _test_add_fip_addr_to_device_error(self, device): - ri = self._create_router() - ip = '15.1.2.3' - - result = ri._add_fip_addr_to_device( - {'id': mock.sentinel.id, 'floating_ip_address': ip}, device) - - device.addr.add.assert_called_with(ip + '/32') - return result - - def test__add_fip_addr_to_device(self): - result = self._test_add_fip_addr_to_device_error(mock.Mock()) - self.assertTrue(result) - - def test__add_fip_addr_to_device_error(self): - device = mock.Mock() - device.addr.add.side_effect = RuntimeError - result = self._test_add_fip_addr_to_device_error(device) - self.assertFalse(result) - - def test_process_snat_dnat_for_fip(self): - ri = self._create_router() - ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception) - - self.assertRaises(n_exc.FloatingIpSetupException, - ri.process_snat_dnat_for_fip) - - ri.process_floating_ip_nat_rules.assert_called_once_with() - - def test_put_fips_in_error_state(self): - ri = self._create_router() - ri.router = mock.Mock() - ri.router.get.return_value = [{'id': mock.sentinel.id1}, - {'id': mock.sentinel.id2}] - - statuses = ri.put_fips_in_error_state() - - expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR, - mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}] - self.assertNotEqual(expected, statuses) - - def test_configure_fip_addresses(self): - ri = self._create_router() - ri.process_floating_ip_addresses = mock.Mock( - side_effect=Exception) - - self.assertRaises(n_exc.FloatingIpSetupException, - ri.configure_fip_addresses, - mock.sentinel.interface_name) - - ri.process_floating_ip_addresses.assert_called_once_with( - mock.sentinel.interface_name) - - def test_get_router_cidrs_returns_cidrs(self): - ri = self._create_router() - addresses = ['15.1.2.2/24', '15.1.2.3/32'] - device = mock.MagicMock() - device.addr.list.return_value = [{'cidr': addresses[0]}, - {'cidr': addresses[1]}] - self.assertEqual(set(addresses), ri.get_router_cidrs(device)) - - -@mock.patch.object(ip_lib, 'IPDevice') -class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework): - - def test_process_floating_ip_addresses_remap(self, IPDevice): - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - - IPDevice.return_value = device = mock.Mock() - device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] - ri = self._create_router() - ri.get_floating_ips = mock.Mock(return_value=[fip]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE}, - fip_statuses) - - self.assertFalse(device.addr.add.called) - self.assertFalse(device.addr.delete.called) - - def test_process_router_with_disabled_floating_ip(self, IPDevice): - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - - ri = self._create_router() - ri.floating_ips = [fip] - ri.get_floating_ips = mock.Mock(return_value=[]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - - self.assertIsNone(fip_statuses.get(fip_id)) - - def test_process_router_floating_ip_with_device_add_error(self, IPDevice): - IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError) - device.addr.list.return_value = [] - fip_id = _uuid() - fip = { - 'id': fip_id, 'port_id': _uuid(), - 'floating_ip_address': '15.1.2.3', - 'fixed_ip_address': '192.168.0.2' - } - ri = self._create_router() - ri.add_floating_ip = mock.Mock( - return_value=l3_constants.FLOATINGIP_STATUS_ERROR) - ri.get_floating_ips = mock.Mock(return_value=[fip]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - - self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR}, - fip_statuses) - - # TODO(mrsmith): refactor for DVR cases - def test_process_floating_ip_addresses_remove(self, IPDevice): - IPDevice.return_value = device = mock.Mock() - device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] - - ri = self._create_router() - ri.remove_floating_ip = mock.Mock() - ri.router.get = mock.Mock(return_value=[]) - - fip_statuses = ri.process_floating_ip_addresses( - mock.sentinel.interface_name) - self.assertEqual({}, fip_statuses) - ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32') diff --git a/neutron/tests/unit/agent/l3/test_router_info.py b/neutron/tests/unit/agent/l3/test_router_info.py index acb764d373c..04aa55748c3 100644 --- a/neutron/tests/unit/agent/l3/test_router_info.py +++ b/neutron/tests/unit/agent/l3/test_router_info.py @@ -14,6 +14,9 @@ import mock from neutron.agent.common import config as agent_config from neutron.agent.l3 import router_info +from neutron.agent.linux import ip_lib +from neutron.common import constants as l3_constants +from neutron.common import exceptions as n_exc from neutron.openstack.common import uuidutils from neutron.tests import base @@ -104,3 +107,205 @@ class TestRouterInfo(base.BaseTestCase): expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24', 'via', '10.100.10.30']] self._check_agent_method_called(expected) + + +class BasicRouterTestCaseFramework(base.BaseTestCase): + def _create_router(self, router=None, **kwargs): + if not router: + router = mock.MagicMock() + self.agent_conf = mock.Mock() + # NOTE The use_namespaces config will soon be deprecated + self.agent_conf.use_namespaces = True + self.router_id = _uuid() + return router_info.RouterInfo(self.router_id, + router, + self.agent_conf, + mock.sentinel.interface_driver, + **kwargs) + + +class TestBasicRouterOperations(BasicRouterTestCaseFramework): + + def test_get_floating_ips(self): + router = mock.MagicMock() + router.get.return_value = [mock.sentinel.floating_ip] + ri = self._create_router(router) + + fips = ri.get_floating_ips() + + self.assertEqual([mock.sentinel.floating_ip], fips) + + def test_process_floating_ip_nat_rules(self): + ri = self._create_router() + fips = [{'fixed_ip_address': mock.sentinel.ip, + 'floating_ip_address': mock.sentinel.fip}] + ri.get_floating_ips = mock.Mock(return_value=fips) + ri.iptables_manager = mock.MagicMock() + ipv4_nat = ri.iptables_manager.ipv4['nat'] + ri.floating_forward_rules = mock.Mock( + return_value=[(mock.sentinel.chain, mock.sentinel.rule)]) + + ri.process_floating_ip_nat_rules() + + # Be sure that the rules are cleared first and apply is called last + self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), + ipv4_nat.mock_calls[0]) + self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) + + # Be sure that add_rule is called somewhere in the middle + ipv4_nat.add_rule.assert_called_once_with(mock.sentinel.chain, + mock.sentinel.rule, + tag='floating_ip') + + def test_process_floating_ip_nat_rules_removed(self): + ri = self._create_router() + ri.get_floating_ips = mock.Mock(return_value=[]) + ri.iptables_manager = mock.MagicMock() + ipv4_nat = ri.iptables_manager.ipv4['nat'] + + ri.process_floating_ip_nat_rules() + + # Be sure that the rules are cleared first and apply is called last + self.assertEqual(mock.call.clear_rules_by_tag('floating_ip'), + ipv4_nat.mock_calls[0]) + self.assertEqual(mock.call.apply(), ri.iptables_manager.mock_calls[-1]) + + # Be sure that add_rule is called somewhere in the middle + self.assertFalse(ipv4_nat.add_rule.called) + + def _test_add_fip_addr_to_device_error(self, device): + ri = self._create_router() + ip = '15.1.2.3' + + result = ri._add_fip_addr_to_device( + {'id': mock.sentinel.id, 'floating_ip_address': ip}, device) + + device.addr.add.assert_called_with(ip + '/32') + return result + + def test__add_fip_addr_to_device(self): + result = self._test_add_fip_addr_to_device_error(mock.Mock()) + self.assertTrue(result) + + def test__add_fip_addr_to_device_error(self): + device = mock.Mock() + device.addr.add.side_effect = RuntimeError + result = self._test_add_fip_addr_to_device_error(device) + self.assertFalse(result) + + def test_process_snat_dnat_for_fip(self): + ri = self._create_router() + ri.process_floating_ip_nat_rules = mock.Mock(side_effect=Exception) + + self.assertRaises(n_exc.FloatingIpSetupException, + ri.process_snat_dnat_for_fip) + + ri.process_floating_ip_nat_rules.assert_called_once_with() + + def test_put_fips_in_error_state(self): + ri = self._create_router() + ri.router = mock.Mock() + ri.router.get.return_value = [{'id': mock.sentinel.id1}, + {'id': mock.sentinel.id2}] + + statuses = ri.put_fips_in_error_state() + + expected = [{mock.sentinel.id1: l3_constants.FLOATINGIP_STATUS_ERROR, + mock.sentinel.id2: l3_constants.FLOATINGIP_STATUS_ERROR}] + self.assertNotEqual(expected, statuses) + + def test_configure_fip_addresses(self): + ri = self._create_router() + ri.process_floating_ip_addresses = mock.Mock( + side_effect=Exception) + + self.assertRaises(n_exc.FloatingIpSetupException, + ri.configure_fip_addresses, + mock.sentinel.interface_name) + + ri.process_floating_ip_addresses.assert_called_once_with( + mock.sentinel.interface_name) + + def test_get_router_cidrs_returns_cidrs(self): + ri = self._create_router() + addresses = ['15.1.2.2/24', '15.1.2.3/32'] + device = mock.MagicMock() + device.addr.list.return_value = [{'cidr': addresses[0]}, + {'cidr': addresses[1]}] + self.assertEqual(set(addresses), ri.get_router_cidrs(device)) + + +@mock.patch.object(ip_lib, 'IPDevice') +class TestFloatingIpWithMockDevice(BasicRouterTestCaseFramework): + + def test_process_floating_ip_addresses_remap(self, IPDevice): + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + + IPDevice.return_value = device = mock.Mock() + device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] + ri = self._create_router() + ri.get_floating_ips = mock.Mock(return_value=[fip]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ACTIVE}, + fip_statuses) + + self.assertFalse(device.addr.add.called) + self.assertFalse(device.addr.delete.called) + + def test_process_router_with_disabled_floating_ip(self, IPDevice): + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + + ri = self._create_router() + ri.floating_ips = [fip] + ri.get_floating_ips = mock.Mock(return_value=[]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + + self.assertIsNone(fip_statuses.get(fip_id)) + + def test_process_router_floating_ip_with_device_add_error(self, IPDevice): + IPDevice.return_value = device = mock.Mock(side_effect=RuntimeError) + device.addr.list.return_value = [] + fip_id = _uuid() + fip = { + 'id': fip_id, 'port_id': _uuid(), + 'floating_ip_address': '15.1.2.3', + 'fixed_ip_address': '192.168.0.2' + } + ri = self._create_router() + ri.add_floating_ip = mock.Mock( + return_value=l3_constants.FLOATINGIP_STATUS_ERROR) + ri.get_floating_ips = mock.Mock(return_value=[fip]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + + self.assertEqual({fip_id: l3_constants.FLOATINGIP_STATUS_ERROR}, + fip_statuses) + + # TODO(mrsmith): refactor for DVR cases + def test_process_floating_ip_addresses_remove(self, IPDevice): + IPDevice.return_value = device = mock.Mock() + device.addr.list.return_value = [{'cidr': '15.1.2.3/32'}] + + ri = self._create_router() + ri.remove_floating_ip = mock.Mock() + ri.router.get = mock.Mock(return_value=[]) + + fip_statuses = ri.process_floating_ip_addresses( + mock.sentinel.interface_name) + self.assertEqual({}, fip_statuses) + ri.remove_floating_ip.assert_called_once_with(device, '15.1.2.3/32') diff --git a/neutron/tests/unit/agent/linux/test_process_monitor.py b/neutron/tests/unit/agent/linux/test_process_monitor.py deleted file mode 100644 index fe4093892f3..00000000000 --- a/neutron/tests/unit/agent/linux/test_process_monitor.py +++ /dev/null @@ -1,95 +0,0 @@ -# Copyright 2014 Red Hat Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import mock - -from neutron.agent.linux import external_process -from neutron.tests import base - -TEST_UUID = 'test-uuid' -TEST_SERVICE = 'testsvc' -TEST_PID = 1234 - - -class BaseTestProcessMonitor(base.BaseTestCase): - - def setUp(self): - super(BaseTestProcessMonitor, self).setUp() - self.log_patch = mock.patch("neutron.agent.linux.external_process." - "LOG.error") - self.error_log = self.log_patch.start() - - self.spawn_patch = mock.patch("eventlet.spawn") - self.eventlent_spawn = self.spawn_patch.start() - - # create a default process monitor - self.create_child_process_monitor('respawn') - - def create_child_process_monitor(self, action): - conf = mock.Mock() - conf.AGENT.check_child_processes_action = action - conf.AGENT.check_child_processes = True - self.pmonitor = external_process.ProcessMonitor( - config=conf, - resource_type='test') - - def get_monitored_process(self, uuid, service=None): - monitored_process = mock.Mock() - self.pmonitor.register(uuid=uuid, - service_name=service, - monitored_process=monitored_process) - return monitored_process - - -class TestProcessMonitor(BaseTestProcessMonitor): - - def test_error_logged(self): - pm = self.get_monitored_process(TEST_UUID) - pm.active = False - self.pmonitor._check_child_processes() - self.assertTrue(self.error_log.called) - - def test_exit_handler(self): - self.create_child_process_monitor('exit') - pm = self.get_monitored_process(TEST_UUID) - pm.active = False - with mock.patch.object(external_process.ProcessMonitor, - '_exit_handler') as exit_handler: - self.pmonitor._check_child_processes() - exit_handler.assert_called_once_with(TEST_UUID, None) - - def test_register(self): - pm = self.get_monitored_process(TEST_UUID) - self.assertEqual(len(self.pmonitor._monitored_processes), 1) - self.assertIn(pm, self.pmonitor._monitored_processes.values()) - - def test_register_same_service_twice(self): - self.get_monitored_process(TEST_UUID) - self.get_monitored_process(TEST_UUID) - self.assertEqual(len(self.pmonitor._monitored_processes), 1) - - def test_register_different_service_types(self): - self.get_monitored_process(TEST_UUID) - self.get_monitored_process(TEST_UUID, TEST_SERVICE) - self.assertEqual(len(self.pmonitor._monitored_processes), 2) - - def test_unregister(self): - self.get_monitored_process(TEST_UUID) - self.pmonitor.unregister(TEST_UUID, None) - self.assertEqual(len(self.pmonitor._monitored_processes), 0) - - def test_unregister_unknown_process(self): - self.pmonitor.unregister(TEST_UUID, None) - self.assertEqual(len(self.pmonitor._monitored_processes), 0) diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py index dd98d9046f5..c4b706bd5ab 100644 --- a/neutron/tests/unit/test_api_v2.py +++ b/neutron/tests/unit/test_api_v2.py @@ -40,8 +40,7 @@ from neutron.tests import fake_notifier from neutron.tests.unit import testlib_api -ROOTDIR = os.path.dirname(os.path.dirname(__file__)) -EXTDIR = os.path.join(ROOTDIR, 'unit/extensions') +EXTDIR = os.path.join(base.ROOTDIR, 'unit/extensions') _uuid = uuidutils.generate_uuid diff --git a/neutron/tests/unit/test_db_plugin.py b/neutron/tests/unit/test_db_plugin.py index b62e0f1da6a..842cc50c986 100644 --- a/neutron/tests/unit/test_db_plugin.py +++ b/neutron/tests/unit/test_db_plugin.py @@ -5458,3 +5458,63 @@ class NeutronDbPluginV2AsMixinTestCase(testlib_api.SqlTestCase): self.net_data['network']['status'] = 'BUILD' net = self.plugin.create_network(self.context, self.net_data) self.assertEqual(net['status'], 'BUILD') + + +class TestNetworks(testlib_api.SqlTestCase): + def setUp(self): + super(TestNetworks, self).setUp() + self._tenant_id = 'test-tenant' + + # Update the plugin + self.setup_coreplugin(DB_PLUGIN_KLASS) + + def _create_network(self, plugin, ctx, shared=True): + network = {'network': {'name': 'net', + 'shared': shared, + 'admin_state_up': True, + 'tenant_id': self._tenant_id}} + created_network = plugin.create_network(ctx, network) + return (network, created_network['id']) + + def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id): + port = {'port': {'name': 'port', + 'network_id': net_id, + 'mac_address': attributes.ATTR_NOT_SPECIFIED, + 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, + 'admin_state_up': True, + 'device_id': 'device_id', + 'device_owner': device_owner, + 'tenant_id': tenant_id}} + plugin.create_port(ctx, port) + + def _test_update_shared_net_used(self, + device_owner, + expected_exception=None): + plugin = manager.NeutronManager.get_plugin() + ctx = context.get_admin_context() + network, net_id = self._create_network(plugin, ctx) + + self._create_port(plugin, + ctx, + net_id, + device_owner, + self._tenant_id + '1') + + network['network']['shared'] = False + + if (expected_exception): + with testlib_api.ExpectedException(expected_exception): + plugin.update_network(ctx, net_id, network) + else: + plugin.update_network(ctx, net_id, network) + + def test_update_shared_net_used_fails(self): + self._test_update_shared_net_used('', n_exc.InvalidSharedSetting) + + def test_update_shared_net_used_as_router_gateway(self): + self._test_update_shared_net_used( + constants.DEVICE_OWNER_ROUTER_GW) + + def test_update_shared_net_used_by_floating_ip(self): + self._test_update_shared_net_used( + constants.DEVICE_OWNER_FLOATINGIP) diff --git a/neutron/tests/unit/test_db_plugin_level.py b/neutron/tests/unit/test_db_plugin_level.py deleted file mode 100644 index a0ebbb5e66e..00000000000 --- a/neutron/tests/unit/test_db_plugin_level.py +++ /dev/null @@ -1,82 +0,0 @@ -# Copyright (c) 2014 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.api.v2 import attributes -from neutron.common import constants -from neutron.common import exceptions as n_exc -from neutron import context -from neutron import manager -from neutron.tests.unit import test_db_plugin -from neutron.tests.unit import testlib_api - - -class TestNetworks(testlib_api.SqlTestCase): - def setUp(self): - super(TestNetworks, self).setUp() - self._tenant_id = 'test-tenant' - - # Update the plugin - self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS) - - def _create_network(self, plugin, ctx, shared=True): - network = {'network': {'name': 'net', - 'shared': shared, - 'admin_state_up': True, - 'tenant_id': self._tenant_id}} - created_network = plugin.create_network(ctx, network) - return (network, created_network['id']) - - def _create_port(self, plugin, ctx, net_id, device_owner, tenant_id): - port = {'port': {'name': 'port', - 'network_id': net_id, - 'mac_address': attributes.ATTR_NOT_SPECIFIED, - 'fixed_ips': attributes.ATTR_NOT_SPECIFIED, - 'admin_state_up': True, - 'device_id': 'device_id', - 'device_owner': device_owner, - 'tenant_id': tenant_id}} - plugin.create_port(ctx, port) - - def _test_update_shared_net_used(self, - device_owner, - expected_exception=None): - plugin = manager.NeutronManager.get_plugin() - ctx = context.get_admin_context() - network, net_id = self._create_network(plugin, ctx) - - self._create_port(plugin, - ctx, - net_id, - device_owner, - self._tenant_id + '1') - - network['network']['shared'] = False - - if (expected_exception): - with testlib_api.ExpectedException(expected_exception): - plugin.update_network(ctx, net_id, network) - else: - plugin.update_network(ctx, net_id, network) - - def test_update_shared_net_used_fails(self): - self._test_update_shared_net_used('', n_exc.InvalidSharedSetting) - - def test_update_shared_net_used_as_router_gateway(self): - self._test_update_shared_net_used( - constants.DEVICE_OWNER_ROUTER_GW) - - def test_update_shared_net_used_by_floating_ip(self): - self._test_update_shared_net_used( - constants.DEVICE_OWNER_FLOATINGIP) diff --git a/neutron/tests/unit/test_extension_extended_attribute.py b/neutron/tests/unit/test_extension_extended_attribute.py deleted file mode 100644 index 7ff48c837f0..00000000000 --- a/neutron/tests/unit/test_extension_extended_attribute.py +++ /dev/null @@ -1,154 +0,0 @@ -# Copyright 2013 VMware, Inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Unit tests for extension extended attribute -""" - -from oslo_config import cfg -import webob.exc as webexc - -import neutron -from neutron.api import extensions -from neutron.api.v2 import attributes -from neutron.common import config -from neutron import manager -from neutron.plugins.common import constants -from neutron.plugins.ml2 import plugin as ml2_plugin -from neutron import quota -from neutron.tests import base -from neutron.tests.unit.extensions import extendedattribute as extattr -from neutron.tests.unit import test_api_v2 -from neutron.tests.unit import testlib_api -from neutron import wsgi - -_uuid = test_api_v2._uuid -_get_path = test_api_v2._get_path -extensions_path = ':'.join(neutron.tests.unit.extensions.__path__) - - -class ExtensionExtendedAttributeTestPlugin( - ml2_plugin.Ml2Plugin): - - supported_extension_aliases = [ - 'ext-obj-test', "extended-ext-attr" - ] - - def __init__(self, configfile=None): - super(ExtensionExtendedAttributeTestPlugin, self) - self.objs = [] - self.objh = {} - - def create_ext_test_resource(self, context, ext_test_resource): - obj = ext_test_resource['ext_test_resource'] - id = _uuid() - obj['id'] = id - self.objs.append(obj) - self.objh.update({id: obj}) - return obj - - def get_ext_test_resources(self, context, filters=None, fields=None): - return self.objs - - def get_ext_test_resource(self, context, id, fields=None): - return self.objh[id] - - -class ExtensionExtendedAttributeTestCase(base.BaseTestCase): - def setUp(self): - super(ExtensionExtendedAttributeTestCase, self).setUp() - plugin = ( - "neutron.tests.unit.test_extension_extended_attribute." - "ExtensionExtendedAttributeTestPlugin" - ) - - # point config file to: neutron/tests/etc/neutron.conf.test - self.config_parse() - - self.setup_coreplugin(plugin) - - ext_mgr = extensions.PluginAwareExtensionManager( - extensions_path, - {constants.CORE: ExtensionExtendedAttributeTestPlugin} - ) - ext_mgr.extend_resources("2.0", {}) - extensions.PluginAwareExtensionManager._instance = ext_mgr - - app = config.load_paste_app('extensions_test_app') - self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr) - - self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1" - # Save the global RESOURCE_ATTRIBUTE_MAP - self.saved_attr_map = {} - for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems(): - self.saved_attr_map[resource] = attrs.copy() - # Add the resources to the global attribute map - # This is done here as the setup process won't - # initialize the main API router which extends - # the global attribute map - attributes.RESOURCE_ATTRIBUTE_MAP.update( - extattr.EXTENDED_ATTRIBUTES_2_0) - self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin() - self.addCleanup(self.restore_attribute_map) - - quota.QUOTAS._driver = None - cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver', - group='QUOTAS') - - def restore_attribute_map(self): - # Restore the original RESOURCE_ATTRIBUTE_MAP - attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map - - def _do_request(self, method, path, data=None, params=None, action=None): - content_type = 'application/json' - body = None - if data is not None: # empty dict is valid - body = wsgi.Serializer().serialize(data, content_type) - - req = testlib_api.create_request( - path, body, content_type, - method, query_string=params) - res = req.get_response(self._api) - if res.status_code >= 400: - raise webexc.HTTPClientError(detail=res.body, code=res.status_code) - if res.status_code != webexc.HTTPNoContent.code: - return res.json - - def _ext_test_resource_create(self, attr=None): - data = { - "ext_test_resource": { - "tenant_id": self._tenant_id, - "name": "test", - extattr.EXTENDED_ATTRIBUTE: attr - } - } - - res = self._do_request('POST', _get_path('ext_test_resources'), data) - return res['ext_test_resource'] - - def test_ext_test_resource_create(self): - ext_test_resource = self._ext_test_resource_create() - attr = _uuid() - ext_test_resource = self._ext_test_resource_create(attr) - self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr) - - def test_ext_test_resource_get(self): - attr = _uuid() - obj = self._ext_test_resource_create(attr) - obj_id = obj['id'] - res = self._do_request('GET', _get_path( - 'ext_test_resources/{0}'.format(obj_id))) - obj2 = res['ext_test_resource'] - self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr) diff --git a/neutron/tests/unit/test_extensions.py b/neutron/tests/unit/test_extensions.py index e1407299334..ae533032b03 100644 --- a/neutron/tests/unit/test_extensions.py +++ b/neutron/tests/unit/test_extensions.py @@ -16,25 +16,36 @@ import abc import mock +from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils import routes import webob +import webob.exc as webexc import webtest +import neutron from neutron.api import extensions +from neutron.api.v2 import attributes from neutron.common import config from neutron.common import exceptions from neutron.db import db_base_plugin_v2 +from neutron import manager from neutron.plugins.common import constants +from neutron.plugins.ml2 import plugin as ml2_plugin +from neutron import quota from neutron.tests import base from neutron.tests.unit import extension_stubs as ext_stubs import neutron.tests.unit.extensions +from neutron.tests.unit.extensions import extendedattribute as extattr +from neutron.tests.unit import test_api_v2 from neutron.tests.unit import testlib_api from neutron import wsgi LOG = logging.getLogger(__name__) +_uuid = test_api_v2._uuid +_get_path = test_api_v2._get_path extensions_path = ':'.join(neutron.tests.unit.extensions.__path__) @@ -677,3 +688,118 @@ class SimpleExtensionManager(object): if self.request_ext: request_extensions.append(self.request_ext) return request_extensions + + +class ExtensionExtendedAttributeTestPlugin( + ml2_plugin.Ml2Plugin): + + supported_extension_aliases = [ + 'ext-obj-test', "extended-ext-attr" + ] + + def __init__(self, configfile=None): + super(ExtensionExtendedAttributeTestPlugin, self) + self.objs = [] + self.objh = {} + + def create_ext_test_resource(self, context, ext_test_resource): + obj = ext_test_resource['ext_test_resource'] + id = _uuid() + obj['id'] = id + self.objs.append(obj) + self.objh.update({id: obj}) + return obj + + def get_ext_test_resources(self, context, filters=None, fields=None): + return self.objs + + def get_ext_test_resource(self, context, id, fields=None): + return self.objh[id] + + +class ExtensionExtendedAttributeTestCase(base.BaseTestCase): + def setUp(self): + super(ExtensionExtendedAttributeTestCase, self).setUp() + plugin = ( + "neutron.tests.unit.test_extensions." + "ExtensionExtendedAttributeTestPlugin" + ) + + # point config file to: neutron/tests/etc/neutron.conf.test + self.config_parse() + + self.setup_coreplugin(plugin) + + ext_mgr = extensions.PluginAwareExtensionManager( + extensions_path, + {constants.CORE: ExtensionExtendedAttributeTestPlugin} + ) + ext_mgr.extend_resources("2.0", {}) + extensions.PluginAwareExtensionManager._instance = ext_mgr + + app = config.load_paste_app('extensions_test_app') + self._api = extensions.ExtensionMiddleware(app, ext_mgr=ext_mgr) + + self._tenant_id = "8c70909f-b081-452d-872b-df48e6c355d1" + # Save the global RESOURCE_ATTRIBUTE_MAP + self.saved_attr_map = {} + for resource, attrs in attributes.RESOURCE_ATTRIBUTE_MAP.iteritems(): + self.saved_attr_map[resource] = attrs.copy() + # Add the resources to the global attribute map + # This is done here as the setup process won't + # initialize the main API router which extends + # the global attribute map + attributes.RESOURCE_ATTRIBUTE_MAP.update( + extattr.EXTENDED_ATTRIBUTES_2_0) + self.agentscheduler_dbMinxin = manager.NeutronManager.get_plugin() + self.addCleanup(self.restore_attribute_map) + + quota.QUOTAS._driver = None + cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver', + group='QUOTAS') + + def restore_attribute_map(self): + # Restore the original RESOURCE_ATTRIBUTE_MAP + attributes.RESOURCE_ATTRIBUTE_MAP = self.saved_attr_map + + def _do_request(self, method, path, data=None, params=None, action=None): + content_type = 'application/json' + body = None + if data is not None: # empty dict is valid + body = wsgi.Serializer().serialize(data, content_type) + + req = testlib_api.create_request( + path, body, content_type, + method, query_string=params) + res = req.get_response(self._api) + if res.status_code >= 400: + raise webexc.HTTPClientError(detail=res.body, code=res.status_code) + if res.status_code != webexc.HTTPNoContent.code: + return res.json + + def _ext_test_resource_create(self, attr=None): + data = { + "ext_test_resource": { + "tenant_id": self._tenant_id, + "name": "test", + extattr.EXTENDED_ATTRIBUTE: attr + } + } + + res = self._do_request('POST', _get_path('ext_test_resources'), data) + return res['ext_test_resource'] + + def test_ext_test_resource_create(self): + ext_test_resource = self._ext_test_resource_create() + attr = _uuid() + ext_test_resource = self._ext_test_resource_create(attr) + self.assertEqual(ext_test_resource[extattr.EXTENDED_ATTRIBUTE], attr) + + def test_ext_test_resource_get(self): + attr = _uuid() + obj = self._ext_test_resource_create(attr) + obj_id = obj['id'] + res = self._do_request('GET', _get_path( + 'ext_test_resources/{0}'.format(obj_id))) + obj2 = res['ext_test_resource'] + self.assertEqual(obj2[extattr.EXTENDED_ATTRIBUTE], attr) diff --git a/neutron/tests/unit/test_linux_external_process.py b/neutron/tests/unit/test_linux_external_process.py index c2dd542207e..b6ab45e251e 100644 --- a/neutron/tests/unit/test_linux_external_process.py +++ b/neutron/tests/unit/test_linux_external_process.py @@ -20,6 +20,83 @@ from neutron.agent.linux import utils from neutron.tests import base +TEST_UUID = 'test-uuid' +TEST_SERVICE = 'testsvc' +TEST_PID = 1234 + + +class BaseTestProcessMonitor(base.BaseTestCase): + + def setUp(self): + super(BaseTestProcessMonitor, self).setUp() + self.log_patch = mock.patch("neutron.agent.linux.external_process." + "LOG.error") + self.error_log = self.log_patch.start() + + self.spawn_patch = mock.patch("eventlet.spawn") + self.eventlent_spawn = self.spawn_patch.start() + + # create a default process monitor + self.create_child_process_monitor('respawn') + + def create_child_process_monitor(self, action): + conf = mock.Mock() + conf.AGENT.check_child_processes_action = action + conf.AGENT.check_child_processes = True + self.pmonitor = ep.ProcessMonitor( + config=conf, + resource_type='test') + + def get_monitored_process(self, uuid, service=None): + monitored_process = mock.Mock() + self.pmonitor.register(uuid=uuid, + service_name=service, + monitored_process=monitored_process) + return monitored_process + + +class TestProcessMonitor(BaseTestProcessMonitor): + + def test_error_logged(self): + pm = self.get_monitored_process(TEST_UUID) + pm.active = False + self.pmonitor._check_child_processes() + self.assertTrue(self.error_log.called) + + def test_exit_handler(self): + self.create_child_process_monitor('exit') + pm = self.get_monitored_process(TEST_UUID) + pm.active = False + with mock.patch.object(ep.ProcessMonitor, + '_exit_handler') as exit_handler: + self.pmonitor._check_child_processes() + exit_handler.assert_called_once_with(TEST_UUID, None) + + def test_register(self): + pm = self.get_monitored_process(TEST_UUID) + self.assertEqual(len(self.pmonitor._monitored_processes), 1) + self.assertIn(pm, self.pmonitor._monitored_processes.values()) + + def test_register_same_service_twice(self): + self.get_monitored_process(TEST_UUID) + self.get_monitored_process(TEST_UUID) + self.assertEqual(len(self.pmonitor._monitored_processes), 1) + + def test_register_different_service_types(self): + self.get_monitored_process(TEST_UUID) + self.get_monitored_process(TEST_UUID, TEST_SERVICE) + self.assertEqual(len(self.pmonitor._monitored_processes), 2) + + def test_unregister(self): + self.get_monitored_process(TEST_UUID) + self.pmonitor.unregister(TEST_UUID, None) + self.assertEqual(len(self.pmonitor._monitored_processes), 0) + + def test_unregister_unknown_process(self): + self.pmonitor.unregister(TEST_UUID, None) + self.assertEqual(len(self.pmonitor._monitored_processes), 0) + + class TestProcessManager(base.BaseTestCase): def setUp(self): super(TestProcessManager, self).setUp()