ironic-inspector/ironic_inspector/test/unit/test_firewall.py
Dmitry Tantsur 41580add20 Make sure to clean the blacklist cache when disabling the firewall
Otherwise on reintrospection the firewall code may not update
iptables rules, assuming they are already correct.

Change-Id: Icc05174854bd9ab51bfed9d1360873bf5db9ed54
Closes-Bug: #1570447
2016-04-15 14:03:46 +02:00

339 lines
14 KiB
Python

# Copyright 2015 NEC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import subprocess
import mock
from oslo_config import cfg
from ironic_inspector.common import ironic as ir_utils
from ironic_inspector import firewall
from ironic_inspector import node_cache
from ironic_inspector.test import base as test_base
CONF = cfg.CONF
@mock.patch.object(firewall, '_iptables')
@mock.patch.object(ir_utils, 'get_client')
@mock.patch.object(subprocess, 'check_call')
class TestFirewall(test_base.NodeTest):
def test_update_filters_without_manage_firewall(self, mock_call,
mock_get_client,
mock_iptables):
CONF.set_override('manage_firewall', False, 'firewall')
firewall.update_filters()
self.assertEqual(0, mock_iptables.call_count)
def test_init_args(self, mock_call, mock_get_client, mock_iptables):
rootwrap_path = '/some/fake/path'
CONF.set_override('rootwrap_config', rootwrap_path)
firewall.init()
init_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport', '67',
'-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain)]
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(init_expected_args, call_args_list):
self.assertEqual(args, call[0])
expected = ('sudo', 'ironic-inspector-rootwrap', rootwrap_path,
'iptables', '-w')
self.assertEqual(expected, firewall.BASE_COMMAND)
def test_init_args_old_iptables(self, mock_call, mock_get_client,
mock_iptables):
rootwrap_path = '/some/fake/path'
CONF.set_override('rootwrap_config', rootwrap_path)
mock_call.side_effect = subprocess.CalledProcessError(2, '')
firewall.init()
init_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport', '67',
'-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain)]
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(init_expected_args, call_args_list):
self.assertEqual(args, call[0])
expected = ('sudo', 'ironic-inspector-rootwrap', rootwrap_path,
'iptables',)
self.assertEqual(expected, firewall.BASE_COMMAND)
def test_init_kwargs(self, mock_call, mock_get_client, mock_iptables):
firewall.init()
init_expected_kwargs = [
{'ignore': True},
{'ignore': True},
{'ignore': True}]
call_args_list = mock_iptables.call_args_list
for (kwargs, call) in zip(init_expected_kwargs, call_args_list):
self.assertEqual(kwargs, call[1])
def test_update_filters_args(self, mock_call, mock_get_client,
mock_iptables):
# Pretend that we have nodes on introspection
node_cache.add_node(self.node.uuid, bmc_address='1.2.3.4')
firewall.init()
update_filters_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-F', firewall.NEW_CHAIN),
('-X', firewall.NEW_CHAIN),
('-N', firewall.NEW_CHAIN),
('-A', firewall.NEW_CHAIN, '-j', 'ACCEPT'),
('-I', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-E', firewall.NEW_CHAIN, CONF.firewall.firewall_chain)
]
firewall.update_filters()
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(update_filters_expected_args,
call_args_list):
self.assertEqual(args, call[0])
def test_update_filters_kwargs(self, mock_call, mock_get_client,
mock_iptables):
firewall.init()
update_filters_expected_kwargs = [
{'ignore': True},
{'ignore': True},
{'ignore': True},
{},
{'ignore': True},
{'ignore': True},
{'ignore': True},
{},
{},
{},
{'ignore': True},
{'ignore': True},
{'ignore': True}
]
firewall.update_filters()
call_args_list = mock_iptables.call_args_list
for (kwargs, call) in zip(update_filters_expected_kwargs,
call_args_list):
self.assertEqual(kwargs, call[1])
def test_update_filters_with_blacklist(self, mock_call, mock_get_client,
mock_iptables):
active_macs = ['11:22:33:44:55:66', '66:55:44:33:22:11']
inactive_mac = ['AA:BB:CC:DD:EE:FF']
self.macs = active_macs + inactive_mac
self.ports = [mock.Mock(address=m) for m in self.macs]
mock_get_client.port.list.return_value = self.ports
node_cache.add_node(self.node.uuid, mac=active_macs,
bmc_address='1.2.3.4', foo=None)
firewall.init()
update_filters_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-F', firewall.NEW_CHAIN),
('-X', firewall.NEW_CHAIN),
('-N', firewall.NEW_CHAIN),
# Blacklist
('-A', firewall.NEW_CHAIN, '-m', 'mac', '--mac-source',
inactive_mac[0], '-j', 'DROP'),
('-A', firewall.NEW_CHAIN, '-j', 'ACCEPT'),
('-I', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-E', firewall.NEW_CHAIN, CONF.firewall.firewall_chain)
]
firewall.update_filters(mock_get_client)
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(update_filters_expected_args,
call_args_list):
self.assertEqual(args, call[0])
# check caching
mock_iptables.reset_mock()
firewall.update_filters(mock_get_client)
self.assertFalse(mock_iptables.called)
def test_update_filters_clean_cache_on_error(self, mock_call,
mock_get_client,
mock_iptables):
active_macs = ['11:22:33:44:55:66', '66:55:44:33:22:11']
inactive_mac = ['AA:BB:CC:DD:EE:FF']
self.macs = active_macs + inactive_mac
self.ports = [mock.Mock(address=m) for m in self.macs]
mock_get_client.port.list.return_value = self.ports
node_cache.add_node(self.node.uuid, mac=active_macs,
bmc_address='1.2.3.4', foo=None)
firewall.init()
update_filters_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-F', firewall.NEW_CHAIN),
('-X', firewall.NEW_CHAIN),
('-N', firewall.NEW_CHAIN),
# Blacklist
('-A', firewall.NEW_CHAIN, '-m', 'mac', '--mac-source',
inactive_mac[0], '-j', 'DROP'),
('-A', firewall.NEW_CHAIN, '-j', 'ACCEPT'),
('-I', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-E', firewall.NEW_CHAIN, CONF.firewall.firewall_chain)
]
mock_iptables.side_effect = [None, None, RuntimeError()]
self.assertRaises(RuntimeError, firewall.update_filters,
mock_get_client)
# check caching
mock_iptables.reset_mock()
mock_iptables.side_effect = None
firewall.update_filters(mock_get_client)
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(update_filters_expected_args,
call_args_list):
self.assertEqual(args, call[0])
def test_update_filters_args_node_not_found_hook(self, mock_call,
mock_get_client,
mock_iptables):
# DHCP should be always opened if node_not_found hook is set
CONF.set_override('node_not_found_hook', 'enroll', 'processing')
firewall.init()
update_filters_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-F', firewall.NEW_CHAIN),
('-X', firewall.NEW_CHAIN),
('-N', firewall.NEW_CHAIN),
('-A', firewall.NEW_CHAIN, '-j', 'ACCEPT'),
('-I', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-E', firewall.NEW_CHAIN, CONF.firewall.firewall_chain)
]
firewall.update_filters()
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(update_filters_expected_args,
call_args_list):
self.assertEqual(args, call[0])
def test_update_filters_args_no_introspection(self, mock_call,
mock_get_client,
mock_iptables):
firewall.init()
firewall.BLACKLIST_CACHE = ['foo']
mock_get_client.return_value.port.list.return_value = [
mock.Mock(address='foobar')]
update_filters_expected_args = [
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-N', CONF.firewall.firewall_chain),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-F', firewall.NEW_CHAIN),
('-X', firewall.NEW_CHAIN),
('-N', firewall.NEW_CHAIN),
('-A', firewall.NEW_CHAIN, '-j', 'REJECT'),
('-I', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', firewall.NEW_CHAIN),
('-D', 'INPUT', '-i', 'br-ctlplane', '-p', 'udp', '--dport',
'67', '-j', CONF.firewall.firewall_chain),
('-F', CONF.firewall.firewall_chain),
('-X', CONF.firewall.firewall_chain),
('-E', firewall.NEW_CHAIN, CONF.firewall.firewall_chain)
]
firewall.update_filters()
call_args_list = mock_iptables.call_args_list
for (args, call) in zip(update_filters_expected_args,
call_args_list):
self.assertEqual(args, call[0])
self.assertIsNone(firewall.BLACKLIST_CACHE)
# Check caching enabled flag
mock_iptables.reset_mock()
firewall.update_filters()
self.assertFalse(mock_iptables.called)
# Adding a node changes it back
node_cache.add_node(self.node.uuid, bmc_address='1.2.3.4')
mock_iptables.reset_mock()
firewall.update_filters()
mock_iptables.assert_any_call('-A', firewall.NEW_CHAIN, '-j', 'ACCEPT')
self.assertEqual({'foobar'}, firewall.BLACKLIST_CACHE)