Validate vmedia for vmedia usage

Virtual media devices based logic needs to be
guarded from being used or considered based upon
if the machine actually booted from virtual media,
or not.

At the same time, actual devices need to be checked
in order to make sure they align with what we expect
in order to prevent consideration of content which
should not be leveraged.

Change-Id: If2d5c6f4815c9e42798a2d96d59015e1b1dbd457
Story: 2008749
Task: 42108
This commit is contained in:
Julia Kreger 2021-03-25 13:02:04 -07:00
parent 2a64413bb6
commit 8dd6589e66
3 changed files with 341 additions and 65 deletions

View File

@ -148,45 +148,59 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest):
self.assertEqual('sdc', vmedia_device_returned)
@mock.patch.object(utils, 'execute', autospec=True)
def test__find_device_by_labels(self, execute_mock):
execute_mock.side_effect = [
processutils.ProcessExecutionError,
('/dev/fake', ''),
]
self.assertEqual('/dev/fake',
utils._find_device_by_labels(['l1', 'l2']))
execute_mock.assert_has_calls([
mock.call('blkid', '-L', item)
for item in ('l1', 'l2')
])
@mock.patch.object(utils, 'execute', autospec=True)
def test__find_device_by_labels_upper(self, execute_mock):
execute_mock.side_effect = [
processutils.ProcessExecutionError,
processutils.ProcessExecutionError,
('/dev/fake', ''),
]
self.assertEqual('/dev/fake',
utils._find_device_by_labels(['l1', 'l2']))
execute_mock.assert_has_calls([
mock.call('blkid', '-L', item)
for item in ('l1', 'l2', 'L1')
])
@mock.patch.object(utils, 'execute', autospec=True)
def test__find_device_by_labels_not_found(self, execute_mock):
def test__find_vmedia_device_by_labels_handles_exec_error(self,
execute_mock):
execute_mock.side_effect = processutils.ProcessExecutionError
self.assertIsNone(utils._find_device_by_labels(['l1', 'l2']))
self.assertIsNone(utils._find_vmedia_device_by_labels(['l1', 'l2']))
execute_mock.assert_called_once_with('lsblk', '-P', '-oPATH,LABEL')
@mock.patch.object(utils, 'execute', autospec=True)
def test__find_vmedia_device_by_labels(self, execute_mock):
# NOTE(TheJulia): Case is intentionally mixed here to ensure
# proper matching occurs
disk_list = ('PATH="/dev/sda" LABEL=""\n'
'PATH="/dev/sda2" LABEL="Meow"\n'
'PATH="/dev/sda3" LABEL="Recovery HD"\n'
'PATH="/dev/sda1" LABEL="EFI"\n'
'PATH="/dev/sdb" LABEL=""\n'
'PATH="/dev/sdb1" LABEL=""\n'
'PATH="/dev/sdb2" LABEL=""\n'
'PATH="/dev/sdc" LABEL="meow"\n')
invalid_disk = ('KNAME="sda1" SIZE="1610612736" TYPE="part" TRAN=""\n'
'KNAME="sda" SIZE="1610612736" TYPE="disk" '
'TRAN="sata"\n')
valid_disk = ('KNAME="sdc" SIZE="1610612736" TYPE="disk" TRAN="usb"\n')
execute_mock.side_effect = [
(disk_list, ''),
(invalid_disk, ''),
(valid_disk, ''),
]
self.assertEqual('/dev/sdc',
utils._find_vmedia_device_by_labels(['cat', 'meOw']))
execute_mock.assert_has_calls([
mock.call('blkid', '-L', item)
for item in ('l1', 'l2', 'L1', 'L2')
mock.call('lsblk', '-P', '-oPATH,LABEL'),
mock.call('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE', '/dev/sda2'),
mock.call('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE', '/dev/sdc'),
])
@mock.patch.object(utils, '_find_device_by_labels', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
def test__find_vmedia_device_by_labels_not_found(self, execute_mock):
disk_list = ('PATH="/dev/sdb" LABEL="evil"\n'
'PATH="/dev/sdb1" LABEL="banana"\n'
'PATH="/dev/sdb2" LABEL=""\n')
execute_mock.return_value = (disk_list, '')
self.assertIsNone(utils._find_vmedia_device_by_labels(['l1', 'l2']))
execute_mock.assert_called_once_with('lsblk', '-P', '-oPATH,LABEL')
@mock.patch.object(utils, '_check_vmedia_device', autospec=True)
@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True)
@mock.patch.object(utils, '_read_params_from_file', autospec=True)
@mock.patch.object(ironic_utils, 'mounted', autospec=True)
def test__get_vmedia_params(self, mount_mock, read_params_mock, find_mock):
def test__get_vmedia_params(self, mount_mock, read_params_mock, find_mock,
check_vmedia_mock):
check_vmedia_mock.return_value = True
find_mock.return_value = '/dev/fake'
mount_mock.return_value.__enter__.return_value = '/tempdir'
expected_params = {'a': 'b'}
@ -198,12 +212,15 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest):
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
self.assertEqual(expected_params, returned_params)
@mock.patch.object(utils, '_find_device_by_labels', autospec=True)
@mock.patch.object(utils, '_check_vmedia_device', autospec=True)
@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device', autospec=True)
@mock.patch.object(utils, '_read_params_from_file', autospec=True)
@mock.patch.object(ironic_utils, 'mounted', autospec=True)
def test__get_vmedia_params_by_device(self, mount_mock, read_params_mock,
get_device_mock, find_mock):
get_device_mock, find_mock,
check_vmedia_mock):
check_vmedia_mock.return_value = True
find_mock.return_value = None
mount_mock.return_value.__enter__.return_value = '/tempdir'
expected_params = {'a': 'b'}
@ -215,15 +232,37 @@ class GetAgentParamsTestCase(ironic_agent_base.IronicAgentTest):
mount_mock.assert_called_once_with('/dev/sda')
read_params_mock.assert_called_once_with("/tempdir/parameters.txt")
self.assertEqual(expected_params, returned_params)
check_vmedia_mock.assert_called_with('/dev/sda')
@mock.patch.object(utils, '_find_device_by_labels', autospec=True)
@mock.patch.object(utils, '_check_vmedia_device', autospec=True)
@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device', autospec=True)
@mock.patch.object(utils, '_read_params_from_file', autospec=True)
@mock.patch.object(ironic_utils, 'mounted', autospec=True)
def test__get_vmedia_params_by_device_device_invalid(
self, mount_mock, read_params_mock,
get_device_mock, find_mock,
check_vmedia_mock):
check_vmedia_mock.return_value = False
find_mock.return_value = None
expected_params = {}
read_params_mock.return_value = expected_params
get_device_mock.return_value = "sda"
returned_params = utils._get_vmedia_params()
mount_mock.assert_not_called()
read_params_mock.assert_not_called
self.assertEqual(expected_params, returned_params)
check_vmedia_mock.assert_called_with('/dev/sda')
@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True)
@mock.patch.object(utils, '_get_vmedia_device', autospec=True)
def test__get_vmedia_params_cannot_find_dev(self, get_device_mock,
find_mock):
find_mock.return_value = None
get_device_mock.return_value = None
self.assertRaises(errors.VirtualMediaBootError,
utils._get_vmedia_params)
self.assertEqual({}, utils._get_vmedia_params())
class TestFailures(testtools.TestCase):
@ -913,22 +952,42 @@ class TestGetEfiPart(testtools.TestCase):
self.assertIsNone(utils.get_efi_part_on_device('/dev/sda'))
@mock.patch.object(utils, '_find_device_by_labels', autospec=True)
@mock.patch.object(utils, '_booted_from_vmedia', autospec=True)
@mock.patch.object(utils, '_check_vmedia_device', autospec=True)
@mock.patch.object(utils, '_find_vmedia_device_by_labels', autospec=True)
@mock.patch.object(shutil, 'copy', autospec=True)
@mock.patch.object(ironic_utils, 'mounted', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
class TestCopyConfigFromVmedia(testtools.TestCase):
def test_no_vmedia(self, mock_execute, mock_mount, mock_copy,
mock_find_device):
def test_vmedia_found_not_booted_from_vmedia(
self, mock_execute, mock_mount, mock_copy,
mock_find_device, mock_check_vmedia, mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = False
mock_find_device.return_value = '/dev/fake'
utils.copy_config_from_vmedia()
mock_mount.assert_not_called()
mock_execute.assert_not_called()
mock_copy.assert_not_called()
mock_check_vmedia.assert_not_called()
self.assertTrue(mock_booted_from_vmedia.called)
def test_no_vmedia(
self, mock_execute, mock_mount, mock_copy,
mock_find_device, mock_check_vmedia, mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = True
mock_find_device.return_value = None
utils.copy_config_from_vmedia()
mock_mount.assert_not_called()
mock_execute.assert_not_called()
mock_copy.assert_not_called()
mock_check_vmedia.assert_not_called()
self.assertFalse(mock_booted_from_vmedia.called)
def test_no_files(self, mock_execute, mock_mount, mock_copy,
mock_find_device):
def test_no_files(
self, mock_execute, mock_mount, mock_copy,
mock_find_device, mock_check_vmedia, mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = True
temp_path = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(temp_path))
@ -940,9 +999,13 @@ class TestCopyConfigFromVmedia(testtools.TestCase):
mock_execute.assert_called_once_with('findmnt', '-n', '-oTARGET',
'/dev/something')
mock_copy.assert_not_called()
self.assertTrue(mock_booted_from_vmedia.called)
def test_mounted_no_files(self, mock_execute, mock_mount, mock_copy,
mock_find_device):
def test_mounted_no_files(
self, mock_execute, mock_mount, mock_copy,
mock_find_device, mock_check_vmedia, mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = True
mock_execute.return_value = '/some/path', ''
mock_find_device.return_value = '/dev/something'
utils.copy_config_from_vmedia()
@ -950,10 +1013,14 @@ class TestCopyConfigFromVmedia(testtools.TestCase):
'findmnt', '-n', '-oTARGET', '/dev/something')
mock_copy.assert_not_called()
mock_mount.assert_not_called()
self.assertTrue(mock_booted_from_vmedia.called)
@mock.patch.object(os, 'makedirs', autospec=True)
def test_copy(self, mock_makedirs, mock_execute, mock_mount, mock_copy,
mock_find_device):
def test_copy(
self, mock_makedirs, mock_execute, mock_mount, mock_copy,
mock_find_device, mock_check_vmedia, mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = True
mock_find_device.return_value = '/dev/something'
mock_execute.side_effect = processutils.ProcessExecutionError("")
path = tempfile.mkdtemp()
@ -989,10 +1056,14 @@ class TestCopyConfigFromVmedia(testtools.TestCase):
mock.call(mock.ANY, '/etc/ironic-python-agent/ironic.crt'),
mock.call(mock.ANY, '/etc/ironic-python-agent.d/ironic.conf'),
], any_order=True)
self.assertTrue(mock_booted_from_vmedia.called)
@mock.patch.object(os, 'makedirs', autospec=True)
def test_copy_mounted(self, mock_makedirs, mock_execute, mock_mount,
mock_copy, mock_find_device):
def test_copy_mounted(
self, mock_makedirs, mock_execute, mock_mount,
mock_copy, mock_find_device, mock_check_vmedia,
mock_booted_from_vmedia):
mock_booted_from_vmedia.return_value = True
mock_find_device.return_value = '/dev/something'
path = tempfile.mkdtemp()
self.addCleanup(lambda: shutil.rmtree(path))
@ -1026,6 +1097,7 @@ class TestCopyConfigFromVmedia(testtools.TestCase):
mock.call(mock.ANY, '/etc/ironic-python-agent.d/ironic.conf'),
], any_order=True)
mock_mount.assert_not_called()
self.assertTrue(mock_booted_from_vmedia.called)
@mock.patch.object(requests, 'get', autospec=True)
@ -1056,3 +1128,70 @@ class TestStreamingClient(ironic_agent_base.IronicAgentTest):
mock_get.assert_called_with("http://url", verify=True, cert=None,
stream=True, timeout=60)
self.assertEqual(2, mock_get.call_count)
class TestCheckVirtualMedia(ironic_agent_base.IronicAgentTest):
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device(self, mock_execute):
lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="usb"\n'
mock_execute.return_value = (lsblk, '')
self.assertTrue(utils._check_vmedia_device('/dev/sdh'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_rom(self, mock_execute):
lsblk = 'KNAME="sr0" SIZE="1610612736" TYPE="rom" TRAN="usb"\n'
mock_execute.return_value = (lsblk, '')
self.assertTrue(utils._check_vmedia_device('/dev/sr0'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sr0')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_too_large(self, mock_execute):
lsblk = 'KNAME="sdh" SIZE="1610612736000" TYPE="disk" TRAN="usb"\n'
mock_execute.return_value = (lsblk, '')
self.assertFalse(utils._check_vmedia_device('/dev/sdh'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_part(self, mock_execute):
lsblk = ('KNAME="sdh1" SIZE="1610612736" TYPE="part" TRAN=""\n'
'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="sata"\n')
mock_execute.return_value = (lsblk, '')
self.assertFalse(utils._check_vmedia_device('/dev/sdh1'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh1')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_other(self, mock_execute):
lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="other" TRAN="usb"\n'
mock_execute.return_value = (lsblk, '')
self.assertFalse(utils._check_vmedia_device('/dev/sdh'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_sata(self, mock_execute):
lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="disk" TRAN="sata"\n'
mock_execute.return_value = (lsblk, '')
self.assertFalse(utils._check_vmedia_device('/dev/sdh'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh')
@mock.patch.object(utils, 'execute', autospec=True)
def test_check_vmedia_device_scsi(self, mock_execute):
lsblk = 'KNAME="sdh" SIZE="1610612736" TYPE="other" TRAN="scsi"\n'
mock_execute.return_value = (lsblk, '')
self.assertFalse(utils._check_vmedia_device('/dev/sdh'))
mock_execute.assert_called_with('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
'/dev/sdh')

View File

@ -125,15 +125,33 @@ def _get_vmedia_device():
pass
def _find_device_by_labels(labels):
"""Find device matching any of the provided labels."""
for label in labels + [lbl.upper() for lbl in labels]:
def _find_vmedia_device_by_labels(labels):
"""Find device matching any of the provided labels for virtual media"""
candidates = []
try:
path, _e = execute('blkid', '-L', label)
except processutils.ProcessExecutionError:
pass
lsblk_output, _e = execute('lsblk', '-P', '-oPATH,LABEL')
except processutils.ProcessExecutionError as e:
_early_log('Was unable to execute the lsblk command. %s', e)
return
for device in ironic_utils.parse_device_tags(lsblk_output):
for label in labels:
if label.upper() == device['LABEL'].upper():
candidates.append(device['PATH'])
for candidate in candidates:
# We explicitly take the device and run it past _check_vmedia_device
# as there *can* be candidate entries, and we only want to return
# one that seems most likely to be the actual device, and the vmedia
# check code also evaluates the device overall, instead of just the
# block device with a label of some sort.
if _check_vmedia_device(candidate):
return candidate
else:
return path.strip()
_early_log('Found possible vmedia candidate %s, however '
'the device failed vmedia validity checking.',
candidate)
_early_log('Did not identify any virtual media candidates devices.')
def _get_vmedia_params():
@ -143,18 +161,22 @@ def _get_vmedia_params():
:raises: VirtualMediaBootError when it cannot find the virtual media device
"""
parameters_file = "parameters.txt"
vmedia_device_file = _find_device_by_labels(['ir-vfd-dev'])
vmedia_device_file = _find_vmedia_device_by_labels(['ir-vfd-dev'])
if not vmedia_device_file:
# TODO(rameshg87): This block of code is there only for compatibility
# reasons (so that newer agent can work with older Ironic). Remove
# this after Liberty release.
# This falls back to trying to find a matching device by name/type.
# if not found, it is likely okay to just fail out and treat it as
# No device found as there are multiple ways to launch IPA, and all
# vmedia styles should be treated consistently.
vmedia_device = _get_vmedia_device()
if not vmedia_device:
msg = "Unable to find virtual media device"
raise errors.VirtualMediaBootError(msg)
return {}
vmedia_device_file = os.path.join("/dev", vmedia_device)
if not _check_vmedia_device(vmedia_device_file):
# If the device is not valid, return an empty dictionary.
return {}
with ironic_utils.mounted(vmedia_device_file) as vmedia_mount_point:
parameters_file_path = os.path.join(vmedia_mount_point,
parameters_file)
@ -201,17 +223,102 @@ def _find_mount_point(device):
return path.strip()
def _check_vmedia_device(vmedia_device_file):
"""Check if a virtual media device appears valid.
Explicitly ignores partitions, actual disks, and other itmes that
seem unlikely to be virtual media based items being provided
into the running operating system via a BMC.
:param vmedia_device_file: Path to the device to examine.
:returns: False by default, True if the device appears to be
valid.
"""
try:
output, _e = execute('lsblk', '-n', '-s', '-P', '-b',
'-oKNAME,TRAN,TYPE,SIZE',
vmedia_device_file)
except processutils.ProcessExecutionError as e:
_early_log('Failed to execute lsblk. lsblk is required for '
'virtual media identification. %s', e)
return False
try:
for device in ironic_utils.parse_device_tags(output):
if device['TYPE'] == 'part':
_early_log('Excluding device %s from virtual media'
'consideration as it is a partition.',
device['KNAME'])
return False
if device['TYPE'] == 'rom':
# Media is a something like /dev/sr0, a Read only media type.
# The kernel decides this by consulting the underlying type
# registered for the scsi transport and thus type used.
# This will most likely be a qemu driven testing VM,
# or an older machine where SCSI transport is directly
# used to convey in a virtual
return True
if device['TYPE'] == 'disk' and device['TRAN'] == 'usb':
# We know from experience on HPE machines, with ilo4/5, we see
# and redfish with edgeline gear, return attachment from
# pci device 0c-03.
# https://linux-hardware.org/?probe=4d2526e9f4
# https://linux-hardware.org/?id=pci:103c-22f6-1590-00e4
#
# Dell hardware takes a similar approach, using an Aten usb hub
# which provides the standing connection for the BMC attached
# virtual kvm.
# https://linux-hardware.org/?id=usb:0557-8021
#
# Supermicro also uses Aten on X11, X10, X8
# https://linux-hardware.org/?probe=4d0ed95e02
#
# Lenovo appears in some hardware to use an Emulux Pilot4
# integrated hub to proivide device access on some hardware.
# https://linux-hardware.org/index.php?id=usb:2a4b-0400
#
# ??? but the virtual devices appear to be American Megatrends
# https://linux-hardware.org/?probe=076bcef32e
#
# Fujitsu hardware is more uncertian, but appears to be similar
# in use of a USB pass-through
# http://linux-hardware.org/index.php?probe=cca9eab7fe&log=dmesg
if device['SIZE'] != "" and int(device['SIZE']) < 4294967296:
# Device is a usb backed block device which is smaller
# than 4 GiB
return True
else:
_early_log('Device %s appears to not qualify as virtual '
'due to the device size. Size: %s',
device['KNAME'], device['SIZE'])
_early_log('Device %s was disqualified as virtual media. '
'Type: %s, Transport: %s',
device['KNAME'], device['TYPE'], device['TRAN'])
return False
except KeyError:
return False
def _booted_from_vmedia():
"""Indicates if the machine was booted via vmedia."""
params = _read_params_from_file('/proc/cmdline')
return params.get('boot_method') == 'vmedia'
def copy_config_from_vmedia():
"""Copies any configuration from a virtual media device.
Copies files under /etc/ironic-python-agent and /etc/ironic-python-agent.d.
"""
vmedia_device_file = _find_device_by_labels(
vmedia_device_file = _find_vmedia_device_by_labels(
['config-2', 'vmedia_boot_iso'])
if not vmedia_device_file:
_early_log('No virtual media device detected')
return
if not _booted_from_vmedia():
_early_log('Cannot use configuration from virtual media as the '
'agent was not booted from virtual media.')
return
# Determine the device
mounted = _find_mount_point(vmedia_device_file)
if mounted:
_copy_config_from(mounted)

View File

@ -0,0 +1,30 @@
---
security:
- |
Addresses a potential vector in which an system authenticated malicious
actor could leveraged data left on disk in some limited cases to make the
API of the ``ironic-python-agent`` attackable, or possibly break cleaning
processes to prevent the machine from being able to be returned to the
available pool. Please see `story 2008749 <https://storyboard.openstack.org/#!/story/2008749>`_
for more information.
fixes:
- |
Adds validation of Virtual Media devices in order to prevent existing
partitions on the system from being considered as potential sources of IPA
configuration data.
- |
Adds check into the configuration load from virtual media, to ensure it
only occurs when the machine booted from virtual media.
issues:
- |
Logic around virtual media device validation is now much more strict,
and may not work in all cases. Should you discover a case, please provide
the output from ``lsblk -P -O`` with a virtual media device attached to the
Ironic development community via
`Storyboard <https://storyboard.openstack.org/#!/project/947>`_.
- |
Internal logic to copy configuration data from virtual media now requires
the ``boot_method=vmedia`` flag to be set on the kernel command line of
the bootloader for the virtual media. Operators crafting custom boot
ISOs, should ensure that the appropriate command line is being added in
any custom build processes.