Merge "Parallel erase disk devices"
This commit is contained in:
commit
67a516bdcf
@ -16,6 +16,7 @@ import abc
|
||||
import binascii
|
||||
import functools
|
||||
import json
|
||||
from multiprocessing.pool import ThreadPool
|
||||
import os
|
||||
import shlex
|
||||
import time
|
||||
@ -367,6 +368,9 @@ class HardwareManager(object):
|
||||
parent class. Upstream submissions of common functionality are
|
||||
encouraged.
|
||||
|
||||
This interface could be called concurrently to speed up erasure, as
|
||||
such, it should be implemented in a thread-safe way.
|
||||
|
||||
:param node: Ironic node object
|
||||
:param block_device: a BlockDevice indicating a device to be erased.
|
||||
:raises IncompatibleHardwareMethodError: when there is no known way to
|
||||
@ -390,10 +394,23 @@ class HardwareManager(object):
|
||||
"""
|
||||
erase_results = {}
|
||||
block_devices = self.list_block_devices()
|
||||
if not len(block_devices):
|
||||
return {}
|
||||
|
||||
info = node.get('driver_internal_info', {})
|
||||
max_pool_size = info.get('disk_erasure_concurrency', 1)
|
||||
|
||||
thread_pool = ThreadPool(min(max_pool_size, len(block_devices)))
|
||||
for block_device in block_devices:
|
||||
result = dispatch_to_managers(
|
||||
'erase_block_device', node=node, block_device=block_device)
|
||||
erase_results[block_device.name] = result
|
||||
params = {'node': node, 'block_device': block_device}
|
||||
erase_results[block_device.name] = thread_pool.apply_async(
|
||||
dispatch_to_managers, ('erase_block_device',), params)
|
||||
thread_pool.close()
|
||||
thread_pool.join()
|
||||
|
||||
for device_name, result in erase_results.items():
|
||||
erase_results[device_name] = result.get()
|
||||
|
||||
return erase_results
|
||||
|
||||
def wait_for_disks(self):
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import binascii
|
||||
import multiprocessing
|
||||
import os
|
||||
import time
|
||||
|
||||
@ -23,6 +24,7 @@ from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import units
|
||||
import pyudev
|
||||
import six
|
||||
from stevedore import extension
|
||||
|
||||
from ironic_python_agent import errors
|
||||
@ -1275,7 +1277,7 @@ class TestGenericHardwareManager(base.IronicAgentTest):
|
||||
mocked_listdir.assert_has_calls(expected_calls)
|
||||
|
||||
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||
def test_erase_devices(self, mocked_dispatch):
|
||||
def test_erase_devices_no_parallel_by_default(self, mocked_dispatch):
|
||||
mocked_dispatch.return_value = 'erased device'
|
||||
|
||||
self.hardware.list_block_devices = mock.Mock()
|
||||
@ -1290,6 +1292,66 @@ class TestGenericHardwareManager(base.IronicAgentTest):
|
||||
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
@mock.patch('multiprocessing.pool.ThreadPool.apply_async', autospec=True)
|
||||
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||
def test_erase_devices_concurrency(self, mocked_dispatch, mocked_async):
|
||||
internal_info = self.node['driver_internal_info']
|
||||
internal_info['disk_erasure_concurrency'] = 10
|
||||
mocked_dispatch.return_value = 'erased device'
|
||||
|
||||
if six.PY3:
|
||||
apply_result = multiprocessing.pool.ApplyResult({}, None, None)
|
||||
else:
|
||||
apply_result = multiprocessing.pool.ApplyResult({}, None)
|
||||
apply_result._success = True
|
||||
apply_result._ready = True
|
||||
apply_result.get = lambda: 'erased device'
|
||||
mocked_async.return_value = apply_result
|
||||
|
||||
self.hardware.list_block_devices = mock.Mock()
|
||||
self.hardware.list_block_devices.return_value = [
|
||||
hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
|
||||
hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
|
||||
]
|
||||
|
||||
expected = {'/dev/hdaa': 'erased device', '/dev/sdj': 'erased device'}
|
||||
|
||||
result = self.hardware.erase_devices(self.node, [])
|
||||
|
||||
self.assertTrue(mocked_async.called)
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
@mock.patch.object(hardware, 'ThreadPool', autospec=True)
|
||||
def test_erase_devices_concurrency_pool_size(self, mocked_pool):
|
||||
self.hardware.list_block_devices = mock.Mock()
|
||||
self.hardware.list_block_devices.return_value = [
|
||||
hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
|
||||
hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
|
||||
]
|
||||
|
||||
# Test pool size 10 with 2 disks
|
||||
internal_info = self.node['driver_internal_info']
|
||||
internal_info['disk_erasure_concurrency'] = 10
|
||||
|
||||
self.hardware.erase_devices(self.node, [])
|
||||
mocked_pool.assert_called_with(2)
|
||||
|
||||
# Test default pool size with 2 disks
|
||||
internal_info = self.node['driver_internal_info']
|
||||
del internal_info['disk_erasure_concurrency']
|
||||
|
||||
self.hardware.erase_devices(self.node, [])
|
||||
mocked_pool.assert_called_with(1)
|
||||
|
||||
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||
def test_erase_devices_without_disk(self, mocked_dispatch):
|
||||
self.hardware.list_block_devices = mock.Mock()
|
||||
self.hardware.list_block_devices.return_value = []
|
||||
|
||||
expected = {}
|
||||
result = self.hardware.erase_devices({}, [])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
@mock.patch.object(utils, 'execute', autospec=True)
|
||||
def test_erase_block_device_ata_success(self, mocked_execute):
|
||||
mocked_execute.side_effect = [
|
||||
|
@ -0,0 +1,11 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Support parallel disk device erasure. This is controlled by the
|
||||
``driver_internal_info['agent_enable_parallel_erasure']`` passed
|
||||
by ironic.
|
||||
other:
|
||||
- |
|
||||
The ``HardwareManager.erase_block_device`` interface could be called
|
||||
concurrently to support the feature of parallel disk device erasure,
|
||||
it should be implemented in a thread-safe way.
|
Loading…
Reference in New Issue
Block a user