diff --git a/vmware_nsx/db/nsxv_db.py b/vmware_nsx/db/nsxv_db.py index ba7a2772b6..b64ff4a427 100644 --- a/vmware_nsx/db/nsxv_db.py +++ b/vmware_nsx/db/nsxv_db.py @@ -549,6 +549,15 @@ def get_network_bindings_by_physical_net(session, phy_uuid): all()) +def get_network_bindings_by_physical_net_and_type(session, phy_uuid, + binding_type): + session = session or db.get_reader_session() + return (session.query(nsxv_models.NsxvTzNetworkBinding). + filter_by(phy_uuid=phy_uuid, + binding_type=binding_type). + all()) + + def delete_network_bindings(session, network_id): return (session.query(nsxv_models.NsxvTzNetworkBinding). filter_by(network_id=network_id).delete()) diff --git a/vmware_nsx/plugins/nsx_v/plugin.py b/vmware_nsx/plugins/nsx_v/plugin.py index 3ddac96581..ee6462f093 100644 --- a/vmware_nsx/plugins/nsx_v/plugin.py +++ b/vmware_nsx/plugins/nsx_v/plugin.py @@ -679,6 +679,15 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin, not self.nsx_v.vcns.validate_network_name( physical_network, network['name'])): err_msg = _("Portgroup name must match network name") + + # make sure no other neutron network is using it + bindings = ( + nsxv_db.get_network_bindings_by_physical_net_and_type( + context.elevated().session, physical_network, + network_type)) + if bindings: + err_msg = (_('protgroup %s is already used by ' + 'another network') % physical_network) else: err_msg = (_("%(net_type_param)s %(net_type_value)s not " "supported") % diff --git a/vmware_nsx/tests/unit/nsx_v/test_plugin.py b/vmware_nsx/tests/unit/nsx_v/test_plugin.py index d60db58b3e..657de36e59 100644 --- a/vmware_nsx/tests/unit/nsx_v/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v/test_plugin.py @@ -323,6 +323,30 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase): for k, v in expected: self.assertEqual(net['network'][k], v) + def test_create_portgroup_network(self): + name = 'pg_net' + expected = [('subnets', []), ('name', name), ('admin_state_up', True), + ('status', 'ACTIVE'), ('shared', False), + (pnet.NETWORK_TYPE, 'portgroup'), + (pnet.PHYSICAL_NETWORK, 'tzuuid')] + providernet_args = {pnet.NETWORK_TYPE: 'portgroup', + pnet.PHYSICAL_NETWORK: 'tzuuid'} + with self.network(name=name, + providernet_args=providernet_args, + arg_list=(pnet.NETWORK_TYPE, + pnet.PHYSICAL_NETWORK)) as net: + for k, v in expected: + self.assertEqual(net['network'][k], v) + + # try to create another one on the same physical net will failure + res = self._create_network( + self.fmt, name, True, + providernet_args=providernet_args, + arg_list=(pnet.NETWORK_TYPE, + pnet.PHYSICAL_NETWORK)) + data = self.deserialize(self.fmt, res) + self.assertIn('NeutronError', data) + def test_delete_network_after_removing_subnet(self): gateway_ip = '10.0.0.1' cidr = '10.0.0.0/24' diff --git a/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py b/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py index 685dbbda36..08231ce330 100644 --- a/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py +++ b/vmware_nsx/tests/unit/nsx_v/vshield/fake_vcns.py @@ -1109,7 +1109,7 @@ class FakeVcns(object): def create_spoofguard_policy(self, enforcement_points, name, enable): policy = {'name': name, - 'enforcement_point': enforcement_points[0], + 'enforcementPoints': [{'id': enforcement_points[0]}], 'operationMode': 'MANUAL' if enable else 'DISABLE'} policy_id = len(self._spoofguard_policies) self._spoofguard_policies.append(policy) @@ -1118,7 +1118,7 @@ class FakeVcns(object): def update_spoofguard_policy(self, policy_id, enforcement_points, name, enable): policy = {'name': name, - 'enforcement_point': enforcement_points[0], + 'enforcementPoints': [{'id': enforcement_points[0]}], 'operationMode': 'MANUAL' if enable else 'DISABLE'} self._spoofguard_policies[int(policy_id)] = policy return None, ''