From ab84b7fb2b6febc9dfd9b0767be90fcb3277c192 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Thu, 26 Aug 2021 16:54:13 +0000 Subject: [PATCH] Allow to process FW OF rules belonging to a port in a single operation This patch adds a new configuration variable to control the OVS OpenFlow rule processing operations: * ``openflow_processed_per_port``: by default "False". If enabled, all OpenFlow rules associated to a port will be processed at once, in one single transaction. If disabled, the flows will be processed in batches of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules. With ``openflow_processed_per_port`` enabled, all Firewall OpenFlow rules related to a port are processed in one transaction (executed in one single command). That ensures the rules are written atomically and apply all of them at the same time. That means all needed rules to handle the ingress and egress traffic of a port using the Open vSwitch Firewall, are committed in the OVS DB at the same time. That will prevent from partially applied OpenFlow sets in the Firewall and inconsistencies when applying new SG rules or during the OVS agent restart. That will override, if needed, the hard limit of "AGENT_RES_PROCESSING_STEP=100" OpenFlow rules that could be processed in OVS at once. If the default configuration values are not modified, the behaviour of the OVS library does not change. Closes-Bug: #1934917 Change-Id: If4984dece266a789d607725f8497f1aac3d73d23 --- neutron/agent/common/ovs_lib.py | 57 ++++++++++++++----- .../linux/openvswitch_firewall/firewall.py | 36 +++++++++--- neutron/conf/agent/ovs_conf.py | 9 +++ .../tests/unit/agent/common/test_ovs_lib.py | 53 +++++++++-------- .../openvswitch_firewall/test_firewall.py | 18 +++--- ...rocessing-parameters-b38f7a1e88568798.yaml | 11 ++++ 6 files changed, 132 insertions(+), 52 deletions(-) create mode 100644 releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml diff --git a/neutron/agent/common/ovs_lib.py b/neutron/agent/common/ovs_lib.py index aa0c6f809c1..7e1409af727 100644 --- a/neutron/agent/common/ovs_lib.py +++ b/neutron/agent/common/ovs_lib.py @@ -16,7 +16,6 @@ import collections import functools import itertools -import operator import random import time import uuid @@ -78,6 +77,9 @@ CTRL_BURST_LIMIT_MIN = 25 # TODO(slaweq): move this to neutron_lib.constants TYPE_GRE_IP6 = 'ip6gre' +ActionFlowTuple = collections.namedtuple('ActionFlowTuple', + ['action', 'flow', 'flow_group_id']) + def _ovsdb_result_pending(result): """Return True if ovsdb indicates the result is still pending.""" @@ -251,6 +253,7 @@ class OVSBridge(BaseOVS): self.initial_protocols = { constants.OPENFLOW10, constants.OPENFLOW13, constants.OPENFLOW14} self.initial_protocols.add(self._highest_protocol_needed) + self._flows_per_port = cfg.CONF.OVS.openflow_processed_per_port @property def default_cookie(self): @@ -468,7 +471,22 @@ class OVSBridge(BaseOVS): self.br_name) raise RuntimeError(_('No datapath_id on bridge %s') % self.br_name) - def do_action_flows(self, action, kwargs_list, use_bundle=False): + def do_action_flows_by_group_id(self, action, flows_by_group_id, + use_bundle=False): + if self._flows_per_port: + # Group flow actions per port. + for flow_group_id, flows in flows_by_group_id.items(): + self.do_action_flows(action, flows, use_bundle=use_bundle, + flow_group_id=flow_group_id) + else: + # Group all actions in one single list without any group ID + # reference. + flows = [item for _list in flows_by_group_id.values() + for item in _list] + self.do_action_flows(action, flows, use_bundle=use_bundle) + + def do_action_flows(self, action, kwargs_list, use_bundle=False, + flow_group_id=None): # we can't mix strict and non-strict, so we'll use the first kw # and check against other kw being different strict = kwargs_list[0].get('strict', False) @@ -515,7 +533,15 @@ class OVSBridge(BaseOVS): if use_bundle: extra_param.append('--bundle') - step = common_constants.AGENT_RES_PROCESSING_STEP + if flow_group_id: + # NOTE(ralonsoh): all flows belonging to a port will be written + # atomically in the same command. + step = len(flow_strs) + else: + # No group ID defined (flows are not grouped per port). Use the + # default batch step value "openflow_number_processing_step". + step = common_constants.AGENT_RES_PROCESSING_STEP + for i in range(0, len(flow_strs), step): self.run_ofctl('%s-flows' % action, extra_param + ['-'], '\n'.join(flow_strs[i:i + step])) @@ -1243,14 +1269,15 @@ class DeferredOVSBridge(object): return getattr(self.br, name) raise AttributeError(name) - def add_flow(self, **kwargs): - self.action_flow_tuples.append(('add', kwargs)) + def add_flow(self, flow_group_id=None, **kwargs): + self.action_flow_tuples.append( + ActionFlowTuple('add', kwargs, flow_group_id)) def mod_flow(self, **kwargs): - self.action_flow_tuples.append(('mod', kwargs)) + self.action_flow_tuples.append(ActionFlowTuple('mod', kwargs, None)) def delete_flows(self, **kwargs): - self.action_flow_tuples.append(('del', kwargs)) + self.action_flow_tuples.append(ActionFlowTuple('del', kwargs, None)) def apply_flows(self): action_flow_tuples = self.action_flow_tuples @@ -1259,14 +1286,16 @@ class DeferredOVSBridge(object): return if not self.full_ordered: - action_flow_tuples.sort(key=lambda af: self.weights[af[0]]) + action_flow_tuples.sort(key=lambda flow: self.weights[flow.action]) - grouped = itertools.groupby(action_flow_tuples, - key=operator.itemgetter(0)) - itemgetter_1 = operator.itemgetter(1) - for action, action_flow_list in grouped: - flows = list(map(itemgetter_1, action_flow_list)) - self.br.do_action_flows(action, flows, self.use_bundle) + flows_by_action = itertools.groupby(action_flow_tuples, + key=lambda af: af.action) + for action, flows in flows_by_action: + flows_by_group_id = collections.defaultdict(list) + for flow in flows: + flows_by_group_id[flow.flow_group_id].append(flow.flow) + self.br.do_action_flows_by_group_id(action, flows_by_group_id, + self.use_bundle) def __enter__(self): return self diff --git a/neutron/agent/linux/openvswitch_firewall/firewall.py b/neutron/agent/linux/openvswitch_firewall/firewall.py index 4c9730b2e78..e5ada18a2ae 100644 --- a/neutron/agent/linux/openvswitch_firewall/firewall.py +++ b/neutron/agent/linux/openvswitch_firewall/firewall.py @@ -406,7 +406,7 @@ class ConjIPFlowManager(object): def _update_flows_for_vlan_subr(self, direction, ethertype, vlan_tag, flow_state, addr_to_conj, - conj_id_to_remove): + conj_id_to_remove, ofport): """Do the actual flow updates for given direction and ethertype.""" conj_id_to_remove = conj_id_to_remove or [] # Delete any current flow related to any deleted IP address, before @@ -450,9 +450,9 @@ class ConjIPFlowManager(object): continue for flow in rules.create_flows_for_ip_address( addr, direction, ethertype, vlan_tag, conj_ids): - self.driver._add_flow(**flow) + self.driver._add_flow(flow_group_id=ofport, **flow) - def update_flows_for_vlan(self, vlan_tag, conj_id_to_remove=None): + def update_flows_for_vlan(self, vlan_tag, ofport, conj_id_to_remove=None): """Install action=conjunction(conj_id, 1/2) flows, which depend on IP addresses of remote_group_id or remote_address_group_id. @@ -466,7 +466,7 @@ class ConjIPFlowManager(object): self._update_flows_for_vlan_subr( direction, ethertype, vlan_tag, self.flow_state[vlan_tag][(direction, ethertype)], - addr_to_conj, conj_id_to_remove) + addr_to_conj, conj_id_to_remove, ofport) self.flow_state[vlan_tag][(direction, ethertype)] = addr_to_conj def add(self, vlan_tag, sg_id, remote_id, direction, ethertype, @@ -522,7 +522,7 @@ class ConjIPFlowManager(object): update = True if update: - self.update_flows_for_vlan(vlan_tag, + self.update_flows_for_vlan(vlan_tag, None, conj_id_to_remove=conj_id_to_remove) @@ -595,7 +595,18 @@ class OVSFirewallDriver(firewall.FirewallDriver): for f in rules.create_accept_flows(flow): self._add_flow(**f) - def _add_flow(self, **kwargs): + def _add_flow(self, flow_group_id=None, **kwargs): + """Add a new flow. + + Most of the port related flows will have the parameters "reg_port" or + "in_port". If no "flow_group_id" is defined, "in_port" or "reg_port" + will be used instead (those parameters store the port "ofport"). The + flow group ID will be used to commit all flows related to a port in + the same transaction (for deferred OVS bridge implementation only). + """ + flow_group_id = (flow_group_id or + kwargs.get('in_port') or + kwargs.get('reg_port')) dl_type = kwargs.get('dl_type') create_reg_numbers(kwargs) if isinstance(dl_type, int): @@ -603,7 +614,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): if self._update_cookie: kwargs['cookie'] = self._update_cookie if self._deferred: - self.int_br.add_flow(**kwargs) + self.int_br.add_flow(flow_group_id=flow_group_id, **kwargs) else: self.int_br.br.add_flow(**kwargs) @@ -889,6 +900,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): actions += 'strip_vlan,resubmit(,{:d})'.format( ovs_consts.BASE_INGRESS_TABLE) self._add_flow( + flow_group_id=ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac, @@ -900,6 +912,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): actions += 'resubmit(,{:d})'.format( ovs_consts.BASE_INGRESS_TABLE) self._add_flow( + flow_group_id=ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac, @@ -944,6 +957,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): port.vlan_tag, port.network_type) self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.TRANSIENT_TABLE, priority=90, dl_dst=mac_addr, @@ -1198,6 +1212,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): # and if not, accept it for mac_addr in port.all_allowed_macs: self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.ACCEPT_OR_INGRESS_TABLE, priority=100, dl_dst=mac_addr, @@ -1240,6 +1255,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): # Prevent flood for accepted egress traffic self._add_flow( + flow_group_id=dst_port, table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE, priority=12, dl_dst=mac, @@ -1272,6 +1288,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): if patch_ofport is not ovs_lib.INVALID_OFPORT: self._add_flow( + flow_group_id=dst_port, table=ovs_consts.ACCEPTED_EGRESS_TRAFFIC_NORMAL_TABLE, priority=10, dl_src=mac, @@ -1296,6 +1313,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): def _initialize_tracked_egress(self, port): # Drop invalid packets self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.RULES_EGRESS_TABLE, priority=50, ct_state=ovsfw_consts.OF_STATE_INVALID, @@ -1411,6 +1429,7 @@ class OVSFirewallDriver(firewall.FirewallDriver): def _initialize_tracked_ingress(self, port): # Drop invalid packets self._add_flow( + flow_group_id=port.ofport, table=ovs_consts.RULES_INGRESS_TABLE, priority=50, ct_state=ovsfw_consts.OF_STATE_INVALID, @@ -1557,7 +1576,8 @@ class OVSFirewallDriver(firewall.FirewallDriver): self._add_non_ip_conj_flows(port) - self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag) + self.conj_ip_manager.update_flows_for_vlan(port.vlan_tag, + port.ofport) def _create_rules_generator_for_port(self, port): for sec_group in port.sec_groups: diff --git a/neutron/conf/agent/ovs_conf.py b/neutron/conf/agent/ovs_conf.py index f8fa8ef2d0e..e707bbefd80 100644 --- a/neutron/conf/agent/ovs_conf.py +++ b/neutron/conf/agent/ovs_conf.py @@ -44,6 +44,15 @@ OPTS = [ 'unregistered multicast packets to all ports. ' 'The switch will send unregistered multicast packets ' 'only to ports connected to multicast routers.')), + cfg.BoolOpt('openflow_processed_per_port', + default=False, + help=_('If enabled, all OpenFlow rules associated to a port ' + 'are processed at once, in one single transaction. ' + 'That avoids possible inconsistencies during OVS agent ' + 'restart and port updates. ' + 'If disabled, the flows will be processed in batches ' + 'of "openflow_number_processing_step" number of ' + 'OpenFlow rules.')), ] diff --git a/neutron/tests/unit/agent/common/test_ovs_lib.py b/neutron/tests/unit/agent/common/test_ovs_lib.py index f43e32a3c21..fc2178ef723 100644 --- a/neutron/tests/unit/agent/common/test_ovs_lib.py +++ b/neutron/tests/unit/agent/common/test_ovs_lib.py @@ -612,8 +612,8 @@ class TestDeferredOVSBridge(base.BaseTestCase): super(TestDeferredOVSBridge, self).setUp() self.br = mock.Mock() - self.mocked_do_action_flows = mock.patch.object( - self.br, 'do_action_flows').start() + self.mock_do_action_flows_by_group_id = mock.patch.object( + self.br, 'do_action_flows_by_group_id').start() self.add_flow_dict1 = dict(in_port=11, actions='drop') self.add_flow_dict2 = dict(in_port=12, actions='drop') @@ -628,15 +628,15 @@ class TestDeferredOVSBridge(base.BaseTestCase): ovs_lib.DeferredOVSBridge.ALLOWED_PASSTHROUGHS) def _verify_mock_call(self, expected_calls): - self.mocked_do_action_flows.assert_has_calls(expected_calls) + self.mock_do_action_flows_by_group_id.assert_has_calls(expected_calls) self.assertEqual(len(expected_calls), - len(self.mocked_do_action_flows.mock_calls)) + len(self.mock_do_action_flows_by_group_id.mock_calls)) def test_apply_on_exit(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call('del', [self.del_flow_dict1], False), + mock.call('add', {None: [self.add_flow_dict1]}, False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', {None: [self.del_flow_dict1]}, False), ] with ovs_lib.DeferredOVSBridge(self.br) as deferred_br: @@ -660,13 +660,16 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call('del', [self.del_flow_dict1], False), + mock.call('add', + {11: [self.add_flow_dict1], 12: [self.add_flow_dict2]}, + False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', {None: [self.del_flow_dict1]}, False), ] with ovs_lib.DeferredOVSBridge(self.br) as deferred_br: - deferred_br.add_flow(**self.add_flow_dict1) + deferred_br.add_flow(flow_group_id=11, **self.add_flow_dict1) + deferred_br.add_flow(flow_group_id=12, **self.add_flow_dict2) deferred_br.mod_flow(**self.mod_flow_dict1) deferred_br.delete_flows(**self.del_flow_dict1) self._verify_mock_call([]) @@ -676,12 +679,15 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply_order(self): expected_calls = [ - mock.call( - 'del', [self.del_flow_dict1, self.del_flow_dict2], False), - mock.call( - 'mod', [self.mod_flow_dict1, self.mod_flow_dict2], False), - mock.call( - 'add', [self.add_flow_dict1, self.add_flow_dict2], False), + mock.call('del', + {None: [self.del_flow_dict1, self.del_flow_dict2]}, + False), + mock.call('mod', + {None: [self.mod_flow_dict1, self.mod_flow_dict2]}, + False), + mock.call('add', + {None: [self.add_flow_dict1, self.add_flow_dict2]}, + False), ] order = 'del', 'mod', 'add' @@ -696,12 +702,13 @@ class TestDeferredOVSBridge(base.BaseTestCase): def test_apply_full_ordered(self): expected_calls = [ - mock.call('add', [self.add_flow_dict1], False), - mock.call('mod', [self.mod_flow_dict1], False), - mock.call( - 'del', [self.del_flow_dict1, self.del_flow_dict2], False), - mock.call('add', [self.add_flow_dict2], False), - mock.call('mod', [self.mod_flow_dict2], False), + mock.call('add', {None: [self.add_flow_dict1]}, False), + mock.call('mod', {None: [self.mod_flow_dict1]}, False), + mock.call('del', + {None: [self.del_flow_dict1, self.del_flow_dict2]}, + False), + mock.call('add', {None: [self.add_flow_dict2]}, False), + mock.call('mod', {None: [self.mod_flow_dict2]}, False), ] with ovs_lib.DeferredOVSBridge(self.br, diff --git a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py index 0976b2f646d..a7acf8e1b50 100644 --- a/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py +++ b/neutron/tests/unit/agent/linux/openvswitch_firewall/test_firewall.py @@ -423,7 +423,7 @@ class TestConjIPFlowManager(base.BaseTestCase): get_conj_id_mock.return_value = self.conj_id self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 0) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY) self.assertFalse(remote_group.get_ethertype_filtered_addresses.called) self.assertFalse(self.driver._add_flow.called) @@ -439,7 +439,7 @@ class TestConjIPFlowManager(base.BaseTestCase): get_conj_id_mock.return_value = self.conj_id self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 0) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, mock.ANY) self.assertTrue(remote_group.get_ethertype_filtered_addresses.called) self.assertTrue(self.driver._add_flow.called) @@ -454,20 +454,24 @@ class TestConjIPFlowManager(base.BaseTestCase): constants.INGRESS_DIRECTION, constants.IPv4, 0) self.manager.add(self.vlan_tag, 'sg', 'remote_id', constants.INGRESS_DIRECTION, constants.IPv4, 3) - self.manager.update_flows_for_vlan(self.vlan_tag) + self.manager.update_flows_for_vlan(self.vlan_tag, 'ofport1') self.assertEqual(self.driver._add_flow.call_args_list, [mock.call(actions='conjunction(16,1/2)', ct_state='+est-rel-rpl', dl_type=2048, nw_src='10.22.3.4/32', priority=70, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(17,1/2)', ct_state='+new-est', dl_type=2048, nw_src='10.22.3.4/32', priority=70, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(22,1/2)', ct_state='+est-rel-rpl', dl_type=2048, nw_src='10.22.3.4/32', priority=73, - reg_net=self.vlan_tag, table=82), + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1'), mock.call(actions='conjunction(23,1/2)', ct_state='+new-est', dl_type=2048, nw_src='10.22.3.4/32', priority=73, - reg_net=self.vlan_tag, table=82)]) + reg_net=self.vlan_tag, table=82, + flow_group_id='ofport1')]) def _sg_removed(self, sg_name): with mock.patch.object(self.manager.conj_id_map, diff --git a/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml b/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml new file mode 100644 index 00000000000..d3518d547ea --- /dev/null +++ b/releasenotes/notes/ovs-of-rules-processing-parameters-b38f7a1e88568798.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Added a new configuration variable, in ``[OVS]`` section, to control + the OVS OpenFlow rule processing operations when using the OVS native + firewall driver (``securitygroup.firewall_driver=openvswitch``): + + * ``openflow_processed_per_port``: by default "False". If enabled, all + OpenFlow rules associated to a port will be processed at once, in a + single transaction. If disabled, the flows will be processed in batches + of "AGENT_RES_PROCESSING_STEP=100" number of OpenFlow rules.