DVS: Disable port security and security group

When a port is being created on a vlan network
it is required to disable port security and security
groups, since they are not supported on DVS.

Change-Id: I5c6d61f7e50b39d33003a8ec72f2cb4fa400313c
Signed-off-by: Michal Kelner Mishali <mkelnermishal@vmware.com>
This commit is contained in:
Michal Kelner Mishali 2019-01-28 14:47:24 +02:00
parent 78948848e6
commit 4b685d66cd
2 changed files with 74 additions and 15 deletions

View File

@ -401,15 +401,19 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
# ATTR_NOT_SPECIFIED is for the case where a port is created on a # ATTR_NOT_SPECIFIED is for the case where a port is created on a
# shared network that is not owned by the tenant. # shared network that is not owned by the tenant.
port_data = port['port'] port_data = port['port']
network_type = self._dvs_get_network(context, port['port'][
'network_id'])['provider:network_type']
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
# First we allocate port in neutron database # First we allocate port in neutron database
neutron_db = super(NsxDvsV2, self).create_port(context, port) neutron_db = super(NsxDvsV2, self).create_port(context, port)
self._extension_manager.process_create_port( self._extension_manager.process_create_port(
context, port_data, neutron_db) context, port_data, neutron_db)
port_security = self._get_network_security_binding( if network_type and network_type == 'vlan':
context, neutron_db['network_id']) port_data[psec.PORTSECURITY] = False
port_data[psec.PORTSECURITY] = port_security else:
port_security = self._get_network_security_binding(
context, neutron_db['network_id'])
port_data[psec.PORTSECURITY] = port_security
self._process_port_port_security_create( self._process_port_port_security_create(
context, port_data, neutron_db) context, port_data, neutron_db)
# Update fields obtained from neutron db (eg: MAC address) # Update fields obtained from neutron db (eg: MAC address)
@ -417,12 +421,17 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
has_ip = self._ip_on_port(neutron_db) has_ip = self._ip_on_port(neutron_db)
# security group extension checks # security group extension checks
if has_ip: if network_type and network_type != 'vlan':
self._ensure_default_security_group_on_port(context, port) if has_ip:
elif validators.is_attr_set(port_data.get(ext_sg.SECURITYGROUPS)): self._ensure_default_security_group_on_port(context, port)
raise psec_exc.PortSecurityAndIPRequiredForSecurityGroups() elif validators.is_attr_set(port_data.get(
port_data[ext_sg.SECURITYGROUPS] = ( ext_sg.SECURITYGROUPS)):
self._get_security_groups_on_port(context, port)) raise psec_exc.PortSecurityAndIPRequiredForSecurityGroups()
if network_type and network_type == 'vlan':
port_data[ext_sg.SECURITYGROUPS] = []
else:
port_data[ext_sg.SECURITYGROUPS] = (
self._get_security_groups_on_port(context, port))
self._process_port_create_security_group( self._process_port_create_security_group(
context, port_data, port_data[ext_sg.SECURITYGROUPS]) context, port_data, port_data[ext_sg.SECURITYGROUPS])
self._process_portbindings_create_and_update(context, self._process_portbindings_create_and_update(context,
@ -466,7 +475,6 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
delete_addr_pairs = self._check_update_deletes_allowed_address_pairs( delete_addr_pairs = self._check_update_deletes_allowed_address_pairs(
port) port)
has_addr_pairs = self._check_update_has_allowed_address_pairs(port) has_addr_pairs = self._check_update_has_allowed_address_pairs(port)
with db_api.CONTEXT_WRITER.using(context): with db_api.CONTEXT_WRITER.using(context):
ret_port = super(NsxDvsV2, self).update_port( ret_port = super(NsxDvsV2, self).update_port(
context, id, port) context, id, port)
@ -477,8 +485,12 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
port['port'].pop('fixed_ips', None) port['port'].pop('fixed_ips', None)
ret_port.update(port['port']) ret_port.update(port['port'])
# populate port_security setting # populate port_security setting, ignoring vlan network ports.
if psec.PORTSECURITY not in port['port']: network_type = self._dvs_get_network(context,
ret_port['network_id'])[
'provider:network_type']
if (psec.PORTSECURITY not in port['port'] and network_type !=
'vlan'):
ret_port[psec.PORTSECURITY] = self._get_port_security_binding( ret_port[psec.PORTSECURITY] = self._get_port_security_binding(
context, id) context, id)
# validate port security and allowed address pairs # validate port security and allowed address pairs
@ -500,8 +512,11 @@ class NsxDvsV2(addr_pair_db.AllowedAddressPairsMixin,
context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS]) context, ret_port, ret_port[addr_apidef.ADDRESS_PAIRS])
if psec.PORTSECURITY in port['port']: if psec.PORTSECURITY in port['port']:
self._process_port_port_security_update( if network_type != 'vlan':
context, port['port'], ret_port) self._process_port_port_security_update(
context, port['port'], ret_port)
else:
ret_port[psec.PORTSECURITY] = False
self._process_vnic_type(context, port['port'], id) self._process_vnic_type(context, port['port'], id)
LOG.debug("Updating port: %s", port) LOG.debug("Updating port: %s", port)
self._extension_manager.process_update_port( self._extension_manager.process_update_port(

View File

@ -255,6 +255,50 @@ class NeutronSimpleDvsTest(NeutronSimpleDvsTestCase):
port_status = port['port']['status'] port_status = port['port']['status']
self.assertEqual(port_status, 'ACTIVE') self.assertEqual(port_status, 'ACTIVE')
def test_create_dvs_port_vlan_no_port_security(self):
params = {'provider:network_type': 'vlan',
'provider:physical_network': 'dvs',
'provider:segmentation_id': 7}
params['arg_list'] = tuple(params.keys())
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(self._plugin._dvs, 'delete_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'delete_port_group'):
with self.network(**params) as network,\
self.subnet(network) as subnet,\
self.port(subnet) as port:
self.assertEqual('dvs',
port['port'][portbindings.VIF_TYPE])
port_security = port['port']['port_security_enabled']
security_groups = port['port']['security_groups']
self.assertEqual(port_security, False)
self.assertEqual(security_groups, [])
def test_update_dvs_port_vlan_no_port_security(self):
params = {'provider:network_type': 'vlan',
'provider:physical_network': 'dvs',
'provider:segmentation_id': 7}
params['arg_list'] = tuple(params.keys())
with mock.patch.object(self._plugin._dvs, 'add_port_group'),\
mock.patch.object(self._plugin._dvs, 'delete_port_group'),\
mock.patch.object(dvs.DvsManager, 'get_dvs_moref_by_name'),\
mock.patch.object(dvs.DvsManager, 'add_port_group'),\
mock.patch.object(dvs.DvsManager, 'delete_port_group'):
with self.network(**params) as network,\
self.subnet(network) as subnet,\
self.port(subnet) as port:
self.assertEqual('dvs',
port['port'][portbindings.VIF_TYPE])
data = {'port': {'port_security_enabled': True}}
req = self.new_update_request('ports',
data, port['port']['id'])
res = self.deserialize('json', req.get_response(self.api))
port_security = res['port']['port_security_enabled']
security_groups = res['port']['security_groups']
self.assertEqual(port_security, False)
self.assertEqual(security_groups, [])
def test_create_router_only_dvs_backend(self): def test_create_router_only_dvs_backend(self):
data = {'router': {'tenant_id': 'whatever'}} data = {'router': {'tenant_id': 'whatever'}}
data['router']['name'] = 'router1' data['router']['name'] = 'router1'