[ovn]: port forwarding -- feature support under ovn_db_sync

This is a subset of the changes for implementing the floating IP
port forwarding feature in neutron, using OVN as the backend.

This changeset updates ovn_db_sync utility and its tests to
ensure that floating ip port forwarding can be repaired.

Depends-On: https://review.opendev.org/#/c/741303/
Change-Id: I7a158173252e73e081914f242133634c41de7999
Partially-implements: ovn/port_forwarding
Partial-Bug: #1877447
This commit is contained in:
Flavio Fernandes 2020-07-29 10:24:11 -04:00
parent d74f409c82
commit 361b485318
4 changed files with 244 additions and 4 deletions

View File

@ -191,7 +191,9 @@ def main():
cfg.CONF.set_override('mechanism_drivers', ['ovn-sync'], 'ml2') cfg.CONF.set_override('mechanism_drivers', ['ovn-sync'], 'ml2')
conf.service_plugins = [ conf.service_plugins = [
'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin', 'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin',
'neutron.services.segments.plugin.Plugin'] 'neutron.services.segments.plugin.Plugin',
'port_forwarding',
]
else: else:
LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin) LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin)
return return

View File

@ -71,6 +71,11 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
core_plugin, ovn_api, ovn_driver) core_plugin, ovn_api, ovn_driver)
self.mode = mode self.mode = mode
self.l3_plugin = directory.get_plugin(plugin_constants.L3) self.l3_plugin = directory.get_plugin(plugin_constants.L3)
self.pf_plugin = directory.get_plugin(plugin_constants.PORTFORWARDING)
if not self.pf_plugin:
self.pf_plugin = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'port_forwarding')())
self._ovn_client = ovn_client.OVNClient(ovn_api, sb_ovn) self._ovn_client = ovn_client.OVNClient(ovn_api, sb_ovn)
self.segments_plugin = directory.get_plugin('segments') self.segments_plugin = directory.get_plugin('segments')
if not self.segments_plugin: if not self.segments_plugin:
@ -325,6 +330,62 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
return to_add, to_remove return to_add, to_remove
def _calculate_fip_pfs_differences(self, ovn_rtr_lb_pfs, db_pfs):
to_add_or_update = set()
to_remove = []
ovn_pfs = utils.parse_ovn_lb_port_forwarding(ovn_rtr_lb_pfs)
# check that all pfs are accounted for in ovn_pfs by building
# a set for each protocol and then comparing it with ovn_pfs
db_mapped_pfs = {}
for db_pf in db_pfs:
fip_id = db_pf.get('floatingip_id')
protocol = self.l3_plugin.port_forwarding.ovn_lb_protocol(
db_pf.get('protocol'))
db_vip = "{}:{} {}:{}".format(
db_pf.get('floating_ip_address'), db_pf.get('external_port'),
db_pf.get('internal_ip_address'), db_pf.get('internal_port'))
fip_dict = db_mapped_pfs.get(fip_id, {})
fip_dict_proto = fip_dict.get(protocol, set())
fip_dict_proto.add(db_vip)
if protocol not in fip_dict:
fip_dict[protocol] = fip_dict_proto
if fip_id not in db_mapped_pfs:
db_mapped_pfs[fip_id] = fip_dict
for fip_id in db_mapped_pfs:
ovn_pfs_fip_id = ovn_pfs.get(fip_id, {})
# check for cases when ovn has lbs for protocols that are not in
# neutron db
if len(db_mapped_pfs[fip_id]) != len(ovn_pfs_fip_id):
to_add_or_update.add(fip_id)
continue
# check that vips in each protocol are an exact match
for protocol in db_mapped_pfs[fip_id]:
ovn_fip_dict_proto = ovn_pfs_fip_id.get(protocol)
if db_mapped_pfs[fip_id][protocol] != ovn_fip_dict_proto:
to_add_or_update.add(fip_id)
# remove pf entries that exist in ovn lb but have no fip in
# neutron db.
for fip_id in ovn_pfs:
for db_pf in db_pfs:
pf_fip_id = db_pf.get('floatingip_id')
if pf_fip_id == fip_id:
break
else:
to_remove.append(fip_id)
return list(to_add_or_update), to_remove
def _create_or_update_floatingip_pfs(self, context, fip_id, txn):
self.l3_plugin.port_forwarding.db_sync_create_or_update(
context, fip_id, txn)
def _delete_floatingip_pfs(self, context, fip_id, txn):
self.l3_plugin.port_forwarding.db_sync_delete(
context, fip_id, txn)
def sync_routers_and_rports(self, ctx): def sync_routers_and_rports(self, ctx):
"""Sync Routers between neutron and NB. """Sync Routers between neutron and NB.
@ -358,6 +419,7 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
db_extends[router['id']]['routes'] = [] db_extends[router['id']]['routes'] = []
db_extends[router['id']]['snats'] = [] db_extends[router['id']]['snats'] = []
db_extends[router['id']]['fips'] = [] db_extends[router['id']]['fips'] = []
db_extends[router['id']]['fips_pfs'] = []
if not router.get(l3.EXTERNAL_GW_INFO): if not router.get(l3.EXTERNAL_GW_INFO):
continue continue
gateways = self._ovn_client._get_gw_info(ctx, router) gateways = self._ovn_client._get_gw_info(ctx, router)
@ -385,6 +447,11 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
ctx, {'router_id': list(db_routers.keys())}) ctx, {'router_id': list(db_routers.keys())})
for fip in fips: for fip in fips:
db_extends[fip['router_id']]['fips'].append(fip) db_extends[fip['router_id']]['fips'].append(fip)
if self.pf_plugin:
fip_pfs = self.pf_plugin.get_floatingip_port_forwardings(
ctx, fip['id'])
for fip_pf in fip_pfs:
db_extends[fip['router_id']]['fips_pfs'].append(fip_pf)
interfaces = self.l3_plugin._get_sync_interfaces( interfaces = self.l3_plugin._get_sync_interfaces(
ctx, list(db_routers.keys()), ctx, list(db_routers.keys()),
[constants.DEVICE_OWNER_ROUTER_INTF, [constants.DEVICE_OWNER_ROUTER_INTF,
@ -403,6 +470,7 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
update_lrport_list = [] update_lrport_list = []
update_snats_list = [] update_snats_list = []
update_fips_list = [] update_fips_list = []
update_pfs_list = []
for lrouter in lrouters: for lrouter in lrouters:
ovn_rtr_lb_pfs = self.ovn_api.get_router_floatingip_lbs( ovn_rtr_lb_pfs = self.ovn_api.get_router_floatingip_lbs(
utils.ovn_name(lrouter['name'])) utils.ovn_name(lrouter['name']))
@ -438,6 +506,12 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
update_fips_list.append({'id': lrouter['name'], update_fips_list.append({'id': lrouter['name'],
'add': add_fips, 'add': add_fips,
'del': del_fips}) 'del': del_fips})
db_fips_pfs = db_extends[lrouter['name']]['fips_pfs']
add_fip_pfs, del_fip_pfs = self._calculate_fip_pfs_differences(
ovn_rtr_lb_pfs, db_fips_pfs)
update_pfs_list.append({'id': lrouter['name'],
'add': add_fip_pfs,
'del': del_fip_pfs})
ovn_nats = lrouter['snats'] ovn_nats = lrouter['snats']
db_snats = db_extends[lrouter['name']]['snats'] db_snats = db_extends[lrouter['name']]['snats']
add_snats, del_snats = helpers.diff_list_of_dict( add_snats, del_snats = helpers.diff_list_of_dict(
@ -479,6 +553,14 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
{'id': router['id'], {'id': router['id'],
'add': db_extends[router['id']]['fips'], 'add': db_extends[router['id']]['fips'],
'del': []}) 'del': []})
if 'fips_pfs' in db_extends[router['id']]:
add_fip_pfs = {
db_pf['floatingip_id'] for
db_pf in db_extends[router['id']]['fips_pfs']}
update_pfs_list.append(
{'id': router['id'],
'add': list(add_fip_pfs),
'del': []})
except RuntimeError: except RuntimeError:
LOG.warning("Create router in OVN NB failed for router %s", LOG.warning("Create router in OVN NB failed for router %s",
router['id']) router['id'])
@ -578,6 +660,32 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
for nat in fip['add']: for nat in fip['add']:
self._ovn_client._create_or_update_floatingip( self._ovn_client._create_or_update_floatingip(
nat, txn=txn) nat, txn=txn)
for pf in update_pfs_list:
if pf['del']:
LOG.warning("Router %(id)s port forwarding for floating "
"ips %(fip)s found in OVN but not in Neutron",
{'id': pf['id'], 'fip': pf['del']})
if self.mode == SYNC_MODE_REPAIR:
LOG.warning(
"Delete port forwarding for fips %s from "
"OVN NB DB",
pf['del'])
for pf_id in pf['del']:
self._delete_floatingip_pfs(ctx, pf_id, txn)
if pf['add']:
LOG.warning("Router %(id)s port forwarding for floating "
"ips %(fip)s Neutron out of sync or missing "
"in OVN",
{'id': pf['id'], 'fip': pf['add']})
if self.mode == SYNC_MODE_REPAIR:
LOG.warning("Add port forwarding for fips %s "
"to OVN NB DB",
pf['add'])
for pf_fip_id in pf['add']:
self._create_or_update_floatingip_pfs(
ctx, pf_fip_id, txn)
for snat in update_snats_list: for snat in update_snats_list:
if snat['del']: if snat['del']:
LOG.warning("Router %(id)s snat %(snat)s " LOG.warning("Router %(id)s snat %(snat)s "

View File

@ -12,23 +12,30 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from collections import namedtuple
from neutron.common.ovn import acl as acl_utils from neutron.common.ovn import acl as acl_utils
from neutron.common.ovn import constants as ovn_const from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.services.portforwarding.drivers.ovn.driver import \
OVNPortForwarding as ovn_pf
from neutron.services.segments import db as segments_db from neutron.services.segments import db as segments_db
from neutron.tests.functional import base from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib.api.definitions import dns as dns_apidef from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import fip_pf_description as ext_pf_def
from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def
from neutron_lib.api.definitions import l3 from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import port_security as ps from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants from neutron_lib import constants
from neutron_lib import context from neutron_lib import context
from oslo_utils import uuidutils from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import idlutils from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import constants as ovsdbapp_const
class TestOvnNbSync(base.TestOVNFunctionalBase): class TestOvnNbSync(base.TestOVNFunctionalBase):
@ -56,6 +63,8 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.delete_lrouter_ports = [] self.delete_lrouter_ports = []
self.delete_lrouter_routes = [] self.delete_lrouter_routes = []
self.delete_lrouter_nats = [] self.delete_lrouter_nats = []
self.create_fip_fws = []
self.delete_fip_fws = []
self.delete_acls = [] self.delete_acls = []
self.create_port_groups = [] self.create_port_groups = []
self.delete_port_groups = [] self.delete_port_groups = []
@ -136,6 +145,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
update_port_ids_v4 = [] update_port_ids_v4 = []
update_port_ids_v6 = [] update_port_ids_v6 = []
n1_port_dict = {} n1_port_dict = {}
n1_port_details_dict = {}
for p in ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7']: for p in ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7']:
if p in ['p1', 'p5']: if p in ['p1', 'p5']:
port_kwargs = { port_kwargs = {
@ -152,6 +162,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
**port_kwargs) **port_kwargs)
port = self.deserialize(self.fmt, res) port = self.deserialize(self.fmt, res)
n1_port_dict[p] = port['port']['id'] n1_port_dict[p] = port['port']['id']
n1_port_details_dict[p] = port['port']
lport_name = port['port']['id'] lport_name = port['port']['id']
lswitch_name = 'neutron-' + n1['network']['id'] lswitch_name = 'neutron-' + n1['network']['id']
@ -432,6 +443,45 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.context, r1_f2['id'], {'floatingip': { self.context, r1_f2['id'], {'floatingip': {
'port_id': n1_port_dict['p2']}}) 'port_id': n1_port_dict['p2']}})
# Floating ip used for exercising port forwarding (via ovn lb)
r1_f3 = self.l3_plugin.create_floatingip(
self.context, {'floatingip': {
'tenant_id': self._tenant_id,
'floating_network_id': e1['network']['id'],
'floating_ip_address': '100.0.0.22',
'subnet_id': None,
'port_id': None}})
p5_ip = n1_port_details_dict['p5']['fixed_ips'][0]['ip_address']
fip_pf_args = {
pf_def.EXTERNAL_PORT: 2222,
pf_def.INTERNAL_PORT: 22,
pf_def.INTERNAL_PORT_ID: n1_port_dict['p5'],
pf_def.PROTOCOL: "tcp",
ext_pf_def.DESCRIPTION_FIELD: 'PortFwd r1_f3_p5:22 tcp',
pf_def.INTERNAL_IP_ADDRESS: p5_ip}
fip_args = {pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: fip_pf_args}}
self.pf_plugin.create_floatingip_port_forwarding(
self.context, r1_f3['id'], **fip_args)
# Add port forwarding with same external and internal value
fip_pf_args[pf_def.EXTERNAL_PORT] = 80
fip_pf_args[pf_def.INTERNAL_PORT] = 80
fip_pf_args[ext_pf_def.DESCRIPTION_FIELD] = 'PortFwd r1_f3_p5:80 tcp'
self.pf_plugin.create_floatingip_port_forwarding(
self.context, r1_f3['id'], **fip_args)
fip_pf_args = {
pf_def.EXTERNAL_PORT: 5353,
pf_def.INTERNAL_PORT: 53,
pf_def.INTERNAL_PORT_ID: n1_port_dict['p5'],
pf_def.PROTOCOL: "udp",
ext_pf_def.DESCRIPTION_FIELD: 'PortFwd r1_f3_p5:53 udp',
pf_def.INTERNAL_IP_ADDRESS: p5_ip}
fip_args = {pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: fip_pf_args}}
self.pf_plugin.create_floatingip_port_forwarding(
self.context, r1_f3['id'], **fip_args)
# update External subnet gateway ip to test function _subnet_update # update External subnet gateway ip to test function _subnet_update
# of L3 OVN plugin. # of L3 OVN plugin.
data = {'subnet': {'gateway_ip': '100.0.0.1'}} data = {'subnet': {'gateway_ip': '100.0.0.1'}}
@ -480,6 +530,21 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
'logical_ip': 'logical_ip':
r1_f1['fixed_ip_address'], r1_f1['fixed_ip_address'],
'type': 'dnat_and_snat'})) 'type': 'dnat_and_snat'}))
# Floating IP Port Forwardings
self.create_fip_fws.append(('pf-floatingip-{}-tcp'.format(r1_f3['id']),
{'vip': '{}:8080'.format(
r1_f3['floating_ip_address']),
'ips': ['{}:80'.format(p5_ip)],
'protocol': 'tcp',
'may_exist': False},
'neutron-' + r1['id'],))
self.delete_fip_fws.append(('pf-floatingip-{}-udp'.format(r1_f3['id']),
{'vip': '{}:5353'.format(
r1_f3['floating_ip_address']),
'if_exists': False}))
self.delete_fip_fws.append(('pf-floatingip-{}-tcp'.format(r1_f3['id']),
{'vip': '100.9.0.99:9999',
'if_exists': True}))
res = self._create_network(self.fmt, 'n4', True, **net_kwargs) res = self._create_network(self.fmt, 'n4', True, **net_kwargs)
n4 = self.deserialize(self.fmt, res) n4 = self.deserialize(self.fmt, res)
@ -736,6 +801,14 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
txn.add(self.nb_api.delete_nat_rule_in_lrouter( txn.add(self.nb_api.delete_nat_rule_in_lrouter(
lrouter_name, if_exists=True, **nat_dict)) lrouter_name, if_exists=True, **nat_dict))
for lb_name, lb_dict, lrouter_name in self.create_fip_fws:
txn.add(self.nb_api.lb_add(lb_name, **lb_dict))
txn.add(self.nb_api.lr_lb_add(lrouter_name, lb_name,
may_exist=True))
for lb_name, lb_dict in self.delete_fip_fws:
txn.add(self.nb_api.lb_del(lb_name, **lb_dict))
for acl in self.create_acls: for acl in self.create_acls:
txn.add(self.nb_api.add_acl(**acl)) txn.add(self.nb_api.add_acl(**acl))
@ -1139,9 +1212,12 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
fip_port = '' fip_port = ''
if fip['id'] in fip_macs: if fip['id'] in fip_macs:
fip_port = fip['port_id'] fip_port = fip['port_id']
db_nats[fip['router_id']].append( # Fips that do not have fip_port are used as port forwarding,
fip['floating_ip_address'] + fip['fixed_ip_address'] + # and we shall skip those in this iteration
'dnat_and_snat' + mac_address + fip_port) if fip_port:
db_nats[fip['router_id']].append(
fip['floating_ip_address'] + fip['fixed_ip_address'] +
'dnat_and_snat' + mac_address + fip_port)
_plugin_nb_ovn = self.mech_driver._nb_ovn _plugin_nb_ovn = self.mech_driver._nb_ovn
plugin_lrouter_ids = [ plugin_lrouter_ids = [
@ -1307,6 +1383,57 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
AssertionError, self.assertItemsEqual, r_nats, AssertionError, self.assertItemsEqual, r_nats,
monitor_nats) monitor_nats)
def _validate_fip_port_forwarding(self, should_match=True):
fip_pf_cmp = namedtuple(
'fip_pf_cmp',
'fip_id proto rtr_name ext_ip ext_port int_ip int_port')
# Helper function to break a single ovn lb entry into multiple
# floating ip port forwarding entries.
def _parse_ovn_lb_pf(ovn_lb):
protocol = (ovn_lb.protocol[0]
if ovn_lb.protocol else ovsdbapp_const.PROTO_TCP)
ext_ids = ovn_lb.external_ids
fip_id = ext_ids[ovn_const.OVN_FIP_EXT_ID_KEY]
router_name = ext_ids[ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY]
for vip, ips in ovn_lb.vips.items():
ext_ip, ext_port = vip.split(':')
for ip in ips.split(','):
int_ip, int_port = ip.split(':')
yield fip_pf_cmp(fip_id, protocol, router_name,
ext_ip, int(ext_port),
int_ip, int(int_port))
_plugin_nb_ovn = self.mech_driver._nb_ovn
db_pfs = []
fips = self._list('floatingips')
for fip in fips['floatingips']:
for pf in self.pf_plugin.get_floatingip_port_forwardings(
self.ctx, floatingip_id=fip['id']):
db_pfs.append(fip_pf_cmp(
fip['id'],
ovn_pf.ovn_lb_protocol(pf['protocol']),
utils.ovn_name(pf['router_id']),
pf['floating_ip_address'],
pf['external_port'],
pf['internal_ip_address'],
pf['internal_port'],
))
nb_pfs = []
rtr_names = [row.name for row in _plugin_nb_ovn._tables[
'Logical_Router'].rows.values()]
for rtr_name in rtr_names:
for ovn_lb in _plugin_nb_ovn.get_router_floatingip_lbs(rtr_name):
for pf in _parse_ovn_lb_pf(ovn_lb):
nb_pfs.append(pf)
if should_match:
self.assertItemsEqual(nb_pfs, db_pfs)
else:
self.assertRaises(AssertionError, self.assertItemsEqual,
nb_pfs, db_pfs)
def _validate_port_groups(self, should_match=True): def _validate_port_groups(self, should_match=True):
_plugin_nb_ovn = self.mech_driver._nb_ovn _plugin_nb_ovn = self.mech_driver._nb_ovn
@ -1381,6 +1508,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self._validate_dhcp_opts(should_match=should_match) self._validate_dhcp_opts(should_match=should_match)
self._validate_acls(should_match=should_match) self._validate_acls(should_match=should_match)
self._validate_routers_and_router_ports(should_match=should_match) self._validate_routers_and_router_ports(should_match=should_match)
self._validate_fip_port_forwarding(should_match=should_match)
self._validate_port_groups(should_match=should_match) self._validate_port_groups(should_match=should_match)
self._validate_dns_records(should_match=should_match) self._validate_dns_records(should_match=should_match)

View File

@ -373,6 +373,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
ovn_api = ovn_nb_synchronizer.ovn_api ovn_api = ovn_nb_synchronizer.ovn_api
ovn_driver = ovn_nb_synchronizer.ovn_driver ovn_driver = ovn_nb_synchronizer.ovn_driver
l3_plugin = ovn_nb_synchronizer.l3_plugin l3_plugin = ovn_nb_synchronizer.l3_plugin
pf_plugin = ovn_nb_synchronizer.pf_plugin
segments_plugin = ovn_nb_synchronizer.segments_plugin segments_plugin = ovn_nb_synchronizer.segments_plugin
core_plugin.get_networks = mock.Mock() core_plugin.get_networks = mock.Mock()
@ -445,6 +446,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
# end of router-sync block # end of router-sync block
l3_plugin.get_floatingips = mock.Mock() l3_plugin.get_floatingips = mock.Mock()
l3_plugin.get_floatingips.return_value = self.floating_ips l3_plugin.get_floatingips.return_value = self.floating_ips
pf_plugin.get_floatingip_port_forwardings = mock.Mock(return_value=[])
ovn_api.get_all_logical_switches_with_ports = mock.Mock() ovn_api.get_all_logical_switches_with_ports = mock.Mock()
ovn_api.get_all_logical_switches_with_ports.return_value = ( ovn_api.get_all_logical_switches_with_ports.return_value = (
self.lswitches_with_ports) self.lswitches_with_ports)