Merge "Set trusted port only once in iptables firewall driver"

This commit is contained in:
Zuul 2018-03-18 14:48:51 +00:00 committed by Gerrit Code Review
commit c888425fa9
2 changed files with 69 additions and 2 deletions

View File

@ -67,6 +67,7 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
# list of port which has security group
self.filtered_ports = {}
self.unfiltered_ports = {}
self.trusted_ports = []
self.ipconntrack = ip_conntrack.get_conntrack(
self.iptables.get_rules_for_table, self.filtered_ports,
self.unfiltered_ports, namespace=namespace,
@ -111,11 +112,15 @@ class IptablesFirewallDriver(firewall.FirewallDriver):
def process_trusted_ports(self, port_ids):
"""Process ports that are trusted and shouldn't be filtered."""
for port in port_ids:
if port not in self.trusted_ports:
self._add_trusted_port_rules(port)
self.trusted_ports.append(port)
def remove_trusted_ports(self, port_ids):
for port in port_ids:
if port in self.trusted_ports:
self._remove_trusted_port_rules(port)
self.trusted_ports.remove(port)
def _add_trusted_port_rules(self, port):
device = self._get_device_name(port)

View File

@ -1080,6 +1080,68 @@ class IptablesFirewallTestCase(BaseIptablesFirewallTestCase):
ingress = None
self._test_prepare_port_filter(rule, ingress, egress)
def _test_process_trusted_ports(self, configured):
port = self._fake_port()
port['id'] = 'tapfake_dev'
calls = [
mock.call.add_chain('sg-fallback'),
mock.call.add_rule('sg-fallback',
'-j DROP', comment=ic.UNMATCH_DROP)]
if configured:
self.firewall.trusted_ports.append(port['id'])
else:
calls.append(
mock.call.add_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged '
'-j ACCEPT', comment=ic.TRUSTED_ACCEPT))
filter_inst = self.v4filter_inst
self.firewall.process_trusted_ports([port['id']])
comb = zip(calls, filter_inst.mock_calls)
for (l, r) in comb:
self.assertEqual(l, r)
filter_inst.assert_has_calls(calls)
self.assertIn(port['id'], self.firewall.trusted_ports)
def test_process_trusted_ports(self):
self._test_process_trusted_ports(False)
def test_process_trusted_ports_already_configured(self):
self._test_process_trusted_ports(True)
def _test_remove_trusted_ports(self, configured):
port = self._fake_port()
port['id'] = 'tapfake_dev'
calls = [
mock.call.add_chain('sg-fallback'),
mock.call.add_rule('sg-fallback',
'-j DROP', comment=ic.UNMATCH_DROP)]
if configured:
self.firewall.trusted_ports.append(port['id'])
calls.append(
mock.call.remove_rule('FORWARD',
'-m physdev --physdev-out tapfake_dev '
'--physdev-is-bridged -j ACCEPT'))
filter_inst = self.v4filter_inst
self.firewall.remove_trusted_ports([port['id']])
comb = zip(calls, filter_inst.mock_calls)
for (l, r) in comb:
self.assertEqual(l, r)
filter_inst.assert_has_calls(calls)
self.assertNotIn(port['id'], self.firewall.trusted_ports)
def test_remove_trusted_ports(self):
self._test_remove_trusted_ports(True)
def test_process_remove_ports_not_configured(self):
self._test_remove_trusted_ports(False)
def _test_prepare_port_filter(self,
rule,
ingress_expected_call=None,