Parallel erase disk devices
Currently we erase the disks one by one, which takes a long time to finish, this patch adds support to the IPA so that it can erase disks in parallel if told so. Story: 1546949 Task: 11548 Co-Authored-By: yuan liang <leetpy2@gmail.com> Co-Authored-By: Kaifeng Wang <kaifeng.w@gmail.com> Change-Id: If5cfb6ec000a654d07103c4b378d4c135249e238
This commit is contained in:
parent
8f700c4cef
commit
1ee42cc3ff
@ -16,6 +16,7 @@ import abc
|
|||||||
import binascii
|
import binascii
|
||||||
import functools
|
import functools
|
||||||
import json
|
import json
|
||||||
|
from multiprocessing.pool import ThreadPool
|
||||||
import os
|
import os
|
||||||
import shlex
|
import shlex
|
||||||
import time
|
import time
|
||||||
@ -367,6 +368,9 @@ class HardwareManager(object):
|
|||||||
parent class. Upstream submissions of common functionality are
|
parent class. Upstream submissions of common functionality are
|
||||||
encouraged.
|
encouraged.
|
||||||
|
|
||||||
|
This interface could be called concurrently to speed up erasure, as
|
||||||
|
such, it should be implemented in a thread-safe way.
|
||||||
|
|
||||||
:param node: Ironic node object
|
:param node: Ironic node object
|
||||||
:param block_device: a BlockDevice indicating a device to be erased.
|
:param block_device: a BlockDevice indicating a device to be erased.
|
||||||
:raises IncompatibleHardwareMethodError: when there is no known way to
|
:raises IncompatibleHardwareMethodError: when there is no known way to
|
||||||
@ -390,10 +394,23 @@ class HardwareManager(object):
|
|||||||
"""
|
"""
|
||||||
erase_results = {}
|
erase_results = {}
|
||||||
block_devices = self.list_block_devices()
|
block_devices = self.list_block_devices()
|
||||||
|
if not len(block_devices):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
info = node.get('driver_internal_info', {})
|
||||||
|
max_pool_size = info.get('disk_erasure_concurrency', 1)
|
||||||
|
|
||||||
|
thread_pool = ThreadPool(min(max_pool_size, len(block_devices)))
|
||||||
for block_device in block_devices:
|
for block_device in block_devices:
|
||||||
result = dispatch_to_managers(
|
params = {'node': node, 'block_device': block_device}
|
||||||
'erase_block_device', node=node, block_device=block_device)
|
erase_results[block_device.name] = thread_pool.apply_async(
|
||||||
erase_results[block_device.name] = result
|
dispatch_to_managers, ('erase_block_device',), params)
|
||||||
|
thread_pool.close()
|
||||||
|
thread_pool.join()
|
||||||
|
|
||||||
|
for device_name, result in erase_results.items():
|
||||||
|
erase_results[device_name] = result.get()
|
||||||
|
|
||||||
return erase_results
|
return erase_results
|
||||||
|
|
||||||
def wait_for_disks(self):
|
def wait_for_disks(self):
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import binascii
|
import binascii
|
||||||
|
import multiprocessing
|
||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
|
|
||||||
@ -23,6 +24,7 @@ from oslo_concurrency import processutils
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_utils import units
|
from oslo_utils import units
|
||||||
import pyudev
|
import pyudev
|
||||||
|
import six
|
||||||
from stevedore import extension
|
from stevedore import extension
|
||||||
|
|
||||||
from ironic_python_agent import errors
|
from ironic_python_agent import errors
|
||||||
@ -1275,7 +1277,7 @@ class TestGenericHardwareManager(base.IronicAgentTest):
|
|||||||
mocked_listdir.assert_has_calls(expected_calls)
|
mocked_listdir.assert_has_calls(expected_calls)
|
||||||
|
|
||||||
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||||
def test_erase_devices(self, mocked_dispatch):
|
def test_erase_devices_no_parallel_by_default(self, mocked_dispatch):
|
||||||
mocked_dispatch.return_value = 'erased device'
|
mocked_dispatch.return_value = 'erased device'
|
||||||
|
|
||||||
self.hardware.list_block_devices = mock.Mock()
|
self.hardware.list_block_devices = mock.Mock()
|
||||||
@ -1290,6 +1292,66 @@ class TestGenericHardwareManager(base.IronicAgentTest):
|
|||||||
|
|
||||||
self.assertEqual(expected, result)
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
|
@mock.patch('multiprocessing.pool.ThreadPool.apply_async', autospec=True)
|
||||||
|
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||||
|
def test_erase_devices_concurrency(self, mocked_dispatch, mocked_async):
|
||||||
|
internal_info = self.node['driver_internal_info']
|
||||||
|
internal_info['disk_erasure_concurrency'] = 10
|
||||||
|
mocked_dispatch.return_value = 'erased device'
|
||||||
|
|
||||||
|
if six.PY3:
|
||||||
|
apply_result = multiprocessing.pool.ApplyResult({}, None, None)
|
||||||
|
else:
|
||||||
|
apply_result = multiprocessing.pool.ApplyResult({}, None)
|
||||||
|
apply_result._success = True
|
||||||
|
apply_result._ready = True
|
||||||
|
apply_result.get = lambda: 'erased device'
|
||||||
|
mocked_async.return_value = apply_result
|
||||||
|
|
||||||
|
self.hardware.list_block_devices = mock.Mock()
|
||||||
|
self.hardware.list_block_devices.return_value = [
|
||||||
|
hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
|
||||||
|
hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
|
||||||
|
]
|
||||||
|
|
||||||
|
expected = {'/dev/hdaa': 'erased device', '/dev/sdj': 'erased device'}
|
||||||
|
|
||||||
|
result = self.hardware.erase_devices(self.node, [])
|
||||||
|
|
||||||
|
self.assertTrue(mocked_async.called)
|
||||||
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
|
@mock.patch.object(hardware, 'ThreadPool', autospec=True)
|
||||||
|
def test_erase_devices_concurrency_pool_size(self, mocked_pool):
|
||||||
|
self.hardware.list_block_devices = mock.Mock()
|
||||||
|
self.hardware.list_block_devices.return_value = [
|
||||||
|
hardware.BlockDevice('/dev/sdj', 'big', 1073741824, True),
|
||||||
|
hardware.BlockDevice('/dev/hdaa', 'small', 65535, False),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Test pool size 10 with 2 disks
|
||||||
|
internal_info = self.node['driver_internal_info']
|
||||||
|
internal_info['disk_erasure_concurrency'] = 10
|
||||||
|
|
||||||
|
self.hardware.erase_devices(self.node, [])
|
||||||
|
mocked_pool.assert_called_with(2)
|
||||||
|
|
||||||
|
# Test default pool size with 2 disks
|
||||||
|
internal_info = self.node['driver_internal_info']
|
||||||
|
del internal_info['disk_erasure_concurrency']
|
||||||
|
|
||||||
|
self.hardware.erase_devices(self.node, [])
|
||||||
|
mocked_pool.assert_called_with(1)
|
||||||
|
|
||||||
|
@mock.patch.object(hardware, 'dispatch_to_managers', autospec=True)
|
||||||
|
def test_erase_devices_without_disk(self, mocked_dispatch):
|
||||||
|
self.hardware.list_block_devices = mock.Mock()
|
||||||
|
self.hardware.list_block_devices.return_value = []
|
||||||
|
|
||||||
|
expected = {}
|
||||||
|
result = self.hardware.erase_devices({}, [])
|
||||||
|
self.assertEqual(expected, result)
|
||||||
|
|
||||||
@mock.patch.object(utils, 'execute', autospec=True)
|
@mock.patch.object(utils, 'execute', autospec=True)
|
||||||
def test_erase_block_device_ata_success(self, mocked_execute):
|
def test_erase_block_device_ata_success(self, mocked_execute):
|
||||||
mocked_execute.side_effect = [
|
mocked_execute.side_effect = [
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
Support parallel disk device erasure. This is controlled by the
|
||||||
|
``driver_internal_info['agent_enable_parallel_erasure']`` passed
|
||||||
|
by ironic.
|
||||||
|
other:
|
||||||
|
- |
|
||||||
|
The ``HardwareManager.erase_block_device`` interface could be called
|
||||||
|
concurrently to support the feature of parallel disk device erasure,
|
||||||
|
it should be implemented in a thread-safe way.
|
Loading…
x
Reference in New Issue
Block a user