Merge "[ovn] Clean-up unused ACL method for DHCP"

This commit is contained in:
Zuul 2021-06-23 20:58:36 +00:00 committed by Gerrit Code Review
commit 9a760b9b14
3 changed files with 1 additions and 83 deletions

View File

@ -191,43 +191,6 @@ def add_sg_rule_acl_for_port_group(port_group, r, stateful, match):
return acl
def add_acl_dhcp(port, subnet, ovn_dhcp=True):
# Allow DHCP requests for OVN native DHCP service, while responses are
# allowed in ovn-northd.
# Allow both DHCP requests and responses to pass for other DHCP services.
# We do this even if DHCP isn't enabled for the subnet
acl_list = []
if not ovn_dhcp:
acl = {"lswitch": utils.ovn_name(port['network_id']),
"lport": port['id'],
"priority": ovn_const.ACL_PRIORITY_ALLOW,
"action": ovn_const.ACL_ACTION_ALLOW,
"log": False,
"name": [],
"severity": [],
"direction": 'to-lport',
"match": ('outport == "%s" && ip4 && ip4.src == %s && '
'udp && udp.src == 67 && udp.dst == 68'
) % (port['id'], subnet['cidr']),
"external_ids": {'neutron:lport': port['id']}}
acl_list.append(acl)
acl = {"lswitch": utils.ovn_name(port['network_id']),
"lport": port['id'],
"priority": ovn_const.ACL_PRIORITY_ALLOW,
"action": ovn_const.ACL_ACTION_ALLOW,
"log": False,
"name": [],
"severity": [],
"direction": 'from-lport',
"match": ('inport == "%s" && ip4 && '
'ip4.dst == {255.255.255.255, %s} && '
'udp && udp.src == 68 && udp.dst == 67'
) % (port['id'], subnet['cidr']),
"external_ids": {'neutron:lport': port['id']}}
acl_list.append(acl)
return acl_list
def _get_subnet_from_cache(plugin, admin_context, subnet_cache, subnet_id):
if subnet_id in subnet_cache:
return subnet_cache[subnet_id]

View File

@ -175,12 +175,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.expected_dns_records[0]['records'][hname] = port_ips
self.expected_ports_with_unknown_addr.append(lport_name)
if p == 'p1':
fake_subnet = {'cidr': '11.11.11.11/24'}
dhcp_acls = acl_utils.add_acl_dhcp(port['port'], fake_subnet)
for dhcp_acl in dhcp_acls:
self.create_acls.append(dhcp_acl)
elif p == 'p2':
if p == 'p2':
self.delete_lswitch_ports.append((lport_name, lswitch_name))
update_port_ids_v4.append(port['port']['id'])
update_port_ids_v6.append(port['port']['id'])
@ -729,10 +724,6 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
'external_ids': {'subnet_id': n3_s2['subnet']['id'],
'port_id': fake_port_id2}})
self.stale_lport_dhcpv6_options.append(stale_dhcpv6_options2)
fake_port = {'id': fake_port_id1, 'network_id': n3['network']['id']}
dhcp_acls = acl_utils.add_acl_dhcp(fake_port, n3_s1['subnet'])
for dhcp_acl in dhcp_acls:
self.create_acls.append(dhcp_acl)
columns = list(self.nb_api.tables['ACL'].columns)
if not (('name' in columns) and ('severity' in columns)):
for acl in self.create_acls:

View File

@ -75,42 +75,6 @@ class TestACLs(base.BaseTestCase):
if 'from-lport' in acl.values():
self.assertEqual(acl_from_lport, acl)
def test_add_acl_dhcp(self):
ovn_dhcp_acls = ovn_acl.add_acl_dhcp(self.fake_port, self.fake_subnet)
other_dhcp_acls = ovn_acl.add_acl_dhcp(self.fake_port,
self.fake_subnet,
ovn_dhcp=False)
expected_match_to_lport = (
'outport == "%s" && ip4 && ip4.src == %s && udp && udp.src == 67 '
'&& udp.dst == 68') % (self.fake_port['id'],
self.fake_subnet['cidr'])
acl_to_lport = {'action': 'allow', 'direction': 'to-lport',
'external_ids': {'neutron:lport': 'fake_port_id1'},
'log': False, 'name': [], 'severity': [],
'lport': 'fake_port_id1',
'lswitch': 'neutron-network_id1',
'match': expected_match_to_lport, 'priority': 1002}
expected_match_from_lport = (
'inport == "%s" && ip4 && '
'ip4.dst == {255.255.255.255, %s} && '
'udp && udp.src == 68 && udp.dst == 67'
) % (self.fake_port['id'], self.fake_subnet['cidr'])
acl_from_lport = {'action': 'allow', 'direction': 'from-lport',
'external_ids': {'neutron:lport': 'fake_port_id1'},
'log': False, 'name': [], 'severity': [],
'lport': 'fake_port_id1',
'lswitch': 'neutron-network_id1',
'match': expected_match_from_lport, 'priority': 1002}
self.assertEqual(1, len(ovn_dhcp_acls))
self.assertEqual(acl_from_lport, ovn_dhcp_acls[0])
self.assertEqual(2, len(other_dhcp_acls))
for acl in other_dhcp_acls:
if 'to-lport' in acl.values():
self.assertEqual(acl_to_lport, acl)
if 'from-lport' in acl.values():
self.assertEqual(acl_from_lport, acl)
def test_acl_protocol_and_ports_for_tcp_udp_and_sctp_number(self):
sg_rule = {'port_range_min': None,
'port_range_max': None}