Merge "Simplify notify transaction tests"

This commit is contained in:
Jenkins 2017-02-06 20:53:16 +00:00 committed by Gerrit Code Review
commit ae3eea1664

@ -37,7 +37,6 @@ from neutron.common import utils
from neutron import context
from neutron.db import agents_db
from neutron.db import api as db_api
from neutron.db import db_base_plugin_v2 as base_plugin
from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.db import provisioning_blocks
@ -61,7 +60,6 @@ from neutron.services.l3_router import l3_router_plugin
from neutron.services.revisions import revision_plugin
from neutron.services.segments import db as segments_plugin_db
from neutron.services.segments import plugin as segments_plugin
from neutron.tests import base
from neutron.tests.common import helpers
from neutron.tests.unit import _test_extension_portbindings as test_bindings
from neutron.tests.unit.agent import test_securitygroups_rpc as test_sg_rpc
@ -697,6 +695,33 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self.assertIsNone(plugin._port_provisioned('port', 'evt', 'trigger',
self.context, port_id))
def test_port_after_create_outside_transaction(self):
self.tx_open = True
receive = lambda *a, **k: setattr(self, 'tx_open',
k['context'].session.is_active)
registry.subscribe(receive, resources.PORT, events.AFTER_CREATE)
with self.port():
self.assertFalse(self.tx_open)
def test_port_after_update_outside_transaction(self):
self.tx_open = True
receive = lambda *a, **k: setattr(self, 'tx_open',
k['context'].session.is_active)
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_UPDATE)
self._update('ports', p['port']['id'],
{'port': {'name': 'update'}})
self.assertFalse(self.tx_open)
def test_port_after_delete_outside_transaction(self):
self.tx_open = True
receive = lambda *a, **k: setattr(self, 'tx_open',
k['context'].session.is_active)
with self.port() as p:
registry.subscribe(receive, resources.PORT, events.AFTER_DELETE)
self._delete('ports', p['port']['id'])
self.assertFalse(self.tx_open)
def test_create_router_port_and_fail_create_postcommit(self):
with mock.patch.object(managers.MechanismManager,
@ -2451,140 +2476,6 @@ class TestML2PluggableIPAM(test_ipam.UseIpamMixin, TestMl2SubnetsV2):
subnet['subnet']['id'])
class TestMl2PluginCreateUpdateDeletePort(base.BaseTestCase):
def setUp(self):
super(TestMl2PluginCreateUpdateDeletePort, self).setUp()
# TODO(ihrachys): revisit plugin setup once we decouple
# neutron.objects.db.api from core plugin instance
self.setup_coreplugin(PLUGIN_NAME, load_plugins=False)
self.context = mock.MagicMock()
self.context.session.is_active = False
self.notify_p = mock.patch('neutron.callbacks.registry.notify')
self.notify = self.notify_p.start()
def _ensure_transaction_is_closed(self):
transaction = self.context.session.begin(subtransactions=True)
enter = transaction.__enter__.call_count
exit = transaction.__exit__.call_count
self.assertEqual(enter, exit)
def _create_plugin_for_create_update_port(self):
plugin = ml2_plugin.Ml2Plugin()
directory.add_plugin(constants.CORE, plugin)
plugin.extension_manager = mock.Mock()
plugin.type_manager = mock.Mock()
plugin.mechanism_manager = mock.Mock()
plugin.notifier = mock.Mock()
plugin._check_mac_update_allowed = mock.Mock(return_value=True)
plugin._extend_availability_zone = mock.Mock()
self.notify.side_effect = (
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
return plugin
def test_create_port_rpc_outside_transaction(self):
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'_make_port_dict') as make_port, \
mock.patch.object(base_plugin.NeutronDbPluginV2,
'update_port'),\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'create_port_db'),\
mock.patch.object(ml2_plugin.Ml2Plugin,
'_get_network_mtu'):
init.return_value = None
new_port = mock.MagicMock()
make_port.return_value = new_port
plugin = self._create_plugin_for_create_update_port()
plugin.create_port(self.context, mock.MagicMock())
kwargs = {'context': self.context, 'port': new_port}
self.notify.assert_called_once_with('port', 'after_create',
plugin, **kwargs)
def test_update_port_rpc_outside_transaction(self):
port_id = 'fake_id'
net_id = 'mynet'
original_port_db = models_v2.Port(
id=port_id,
tenant_id='tenant',
network_id=net_id,
mac_address='08:00:01:02:03:04',
admin_state_up=True,
status='ACTIVE',
device_id='vm_id',
device_owner=DEVICE_OWNER_COMPUTE)
binding = mock.Mock()
binding.port_id = port_id
binding.host = 'vm_host'
binding.vnic_type = portbindings.VNIC_NORMAL
binding.profile = ''
binding.vif_type = ''
binding.vif_details = ''
with mock.patch.object(ml2_plugin.Ml2Plugin, '__init__') as init,\
mock.patch.object(ml2_db, 'get_locked_port_and_binding',
return_value=(original_port_db, binding)),\
mock.patch.object(base_plugin.NeutronDbPluginV2,
'update_port') as db_update_port,\
mock.patch.object(ml2_plugin.Ml2Plugin,
'_get_network_mtu'):
init.return_value = None
updated_port = mock.MagicMock()
db_update_port.return_value = updated_port
plugin = self._create_plugin_for_create_update_port()
original_port = plugin._make_port_dict(original_port_db)
res = plugin.update_port(self.context, port_id, mock.MagicMock())
first_update = {
'context': self.context,
'port': updated_port,
'mac_address_updated': True,
'original_port': original_port,
}
bind_update = {
'context': self.context,
'port': res,
'mac_address_updated': False,
'original_port': original_port,
}
expected = [
mock.call('port', 'after_update', plugin, **first_update),
mock.call('port', 'after_update', plugin, **bind_update)
]
self.notify.assert_has_calls(expected)
def test_notify_outside_of_delete_transaction(self):
self.notify.side_effect = (
lambda r, e, t, **kwargs: self._ensure_transaction_is_closed())
l3plugin = mock.Mock()
l3plugin.supported_extension_aliases = [
'router', constants.L3_AGENT_SCHEDULER_EXT_ALIAS,
constants.L3_DISTRIBUTED_EXT_ALIAS
]
with mock.patch.object(ml2_plugin.Ml2Plugin,
'__init__',
return_value=None),\
mock.patch.object(directory,
'get_plugins',
return_value={constants.L3: l3plugin}),\
mock.patch.object(ml2_plugin.Ml2Plugin,
'_get_network_mtu'):
plugin = self._create_plugin_for_create_update_port()
# Set backend manually here since __init__ was mocked
plugin.set_ipam_backend()
# deleting the port will call registry.notify, which will
# run the transaction balancing function defined in this test
plugin.delete_port(self.context, 'fake_id')
self.assertTrue(self.notify.call_count)
class TestTransactionGuard(Ml2PluginV2TestCase):
def test_delete_network_guard(self):
plugin = directory.get_plugin()