Make reconstructor go faster with --override-devices
The object reconstructor will now fork all available worker processes when operating on a subset of local devices. Example: A system has 24 disks, named "d1" through "d24" reconstructor_workers = 8 invoked with --override-devices=d1,d2,d3,d4,d5,d6 In this case, the reconstructor will now use 6 worker processes, one per disk. The old behavior was to use 2 worker processes, one for d1, d3, and d5 and the other for d2, d4, and d6 (because 24 / 8 = 3, so we assigned 3 disks per worker before creating another). I think the new behavior better matches operators' expectations. If I give a concurrent program six tasks to do and tell it to operate on up to eight at a time, I'd expect it to do all six tasks at once, not run two concurrent batches of three tasks apiece. This has no effect when --override-devices is not specified. When operating on all local devices instead of a subset, the new and old code produce the same result. The reconstructor's behavior now matches the object replicator's behavior. Change-Id: Ib308c156c77b9b92541a12dd7e9b1a8ea8307a30
This commit is contained in:
parent
47efb5b969
commit
c4751d0d55
@ -4740,3 +4740,13 @@ def parse_override_options(**kwargs):
|
|||||||
|
|
||||||
return OverrideOptions(devices=devices, partitions=partitions,
|
return OverrideOptions(devices=devices, partitions=partitions,
|
||||||
policies=policies)
|
policies=policies)
|
||||||
|
|
||||||
|
|
||||||
|
def distribute_evenly(items, num_buckets):
|
||||||
|
"""
|
||||||
|
Distribute items as evenly as possible into N buckets.
|
||||||
|
"""
|
||||||
|
out = [[] for _ in range(num_buckets)]
|
||||||
|
for index, item in enumerate(items):
|
||||||
|
out[index % num_buckets].append(item)
|
||||||
|
return out
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
|
|
||||||
import json
|
import json
|
||||||
import errno
|
import errno
|
||||||
import math
|
|
||||||
import os
|
import os
|
||||||
from os.path import join
|
from os.path import join
|
||||||
import random
|
import random
|
||||||
@ -34,7 +33,7 @@ from swift.common.utils import (
|
|||||||
whataremyips, unlink_older_than, compute_eta, get_logger,
|
whataremyips, unlink_older_than, compute_eta, get_logger,
|
||||||
dump_recon_cache, mkdirs, config_true_value,
|
dump_recon_cache, mkdirs, config_true_value,
|
||||||
tpool_reraise, GreenAsyncPile, Timestamp, remove_file,
|
tpool_reraise, GreenAsyncPile, Timestamp, remove_file,
|
||||||
load_recon_cache, parse_override_options)
|
load_recon_cache, parse_override_options, distribute_evenly)
|
||||||
from swift.common.header_key_dict import HeaderKeyDict
|
from swift.common.header_key_dict import HeaderKeyDict
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
@ -228,16 +227,14 @@ class ObjectReconstructor(Daemon):
|
|||||||
yield dict(override_devices=override_opts.devices,
|
yield dict(override_devices=override_opts.devices,
|
||||||
override_partitions=override_opts.partitions)
|
override_partitions=override_opts.partitions)
|
||||||
return
|
return
|
||||||
# for somewhat uniform load per worker use same max_devices_per_worker
|
# for somewhat uniform load per worker use same
|
||||||
# when handling all devices or just override devices...
|
# max_devices_per_worker when handling all devices or just override
|
||||||
max_devices_per_worker = int(math.ceil(
|
# devices, but only use enough workers for the actual devices being
|
||||||
1.0 * len(self.all_local_devices) / self.reconstructor_workers))
|
# handled
|
||||||
# ...but only use enough workers for the actual devices being handled
|
n_workers = min(self.reconstructor_workers, len(devices))
|
||||||
n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker))
|
for ods in distribute_evenly(devices, n_workers):
|
||||||
override_devices_per_worker = [devices[i::n] for i in range(n)]
|
yield dict(override_partitions=override_opts.partitions,
|
||||||
for override_devices_pw in override_devices_per_worker:
|
override_devices=ods)
|
||||||
yield dict(override_devices=override_devices_pw,
|
|
||||||
override_partitions=override_opts.partitions)
|
|
||||||
|
|
||||||
def is_healthy(self):
|
def is_healthy(self):
|
||||||
"""
|
"""
|
||||||
|
@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \
|
|||||||
compute_eta, get_logger, dump_recon_cache, \
|
compute_eta, get_logger, dump_recon_cache, \
|
||||||
rsync_module_interpolation, mkdirs, config_true_value, \
|
rsync_module_interpolation, mkdirs, config_true_value, \
|
||||||
tpool_reraise, config_auto_int_value, storage_directory, \
|
tpool_reraise, config_auto_int_value, storage_directory, \
|
||||||
load_recon_cache, PrefixLoggerAdapter, parse_override_options
|
load_recon_cache, PrefixLoggerAdapter, parse_override_options, \
|
||||||
|
distribute_evenly
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||||
@ -258,19 +259,14 @@ class ObjectReplicator(Daemon):
|
|||||||
# Distribute devices among workers as evenly as possible
|
# Distribute devices among workers as evenly as possible
|
||||||
self.replicator_workers = min(self.replicator_workers,
|
self.replicator_workers = min(self.replicator_workers,
|
||||||
len(devices_to_replicate))
|
len(devices_to_replicate))
|
||||||
worker_args = [
|
return [{'override_devices': devs,
|
||||||
{
|
|
||||||
'override_devices': [],
|
|
||||||
'override_partitions': override_opts.partitions,
|
'override_partitions': override_opts.partitions,
|
||||||
'override_policies': override_opts.policies,
|
'override_policies': override_opts.policies,
|
||||||
'have_overrides': have_overrides,
|
'have_overrides': have_overrides,
|
||||||
'multiprocess_worker_index': i,
|
'multiprocess_worker_index': index}
|
||||||
}
|
for index, devs in enumerate(
|
||||||
for i in range(self.replicator_workers)]
|
distribute_evenly(devices_to_replicate,
|
||||||
for index, device in enumerate(devices_to_replicate):
|
self.replicator_workers))]
|
||||||
idx = index % self.replicator_workers
|
|
||||||
worker_args[idx]['override_devices'].append(device)
|
|
||||||
return worker_args
|
|
||||||
|
|
||||||
def is_healthy(self):
|
def is_healthy(self):
|
||||||
"""
|
"""
|
||||||
|
@ -6622,5 +6622,39 @@ class TestPipeMutex(unittest.TestCase):
|
|||||||
eventlet.debug.hub_prevent_multiple_readers(True)
|
eventlet.debug.hub_prevent_multiple_readers(True)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDistributeEvenly(unittest.TestCase):
|
||||||
|
def test_evenly_divided(self):
|
||||||
|
out = utils.distribute_evenly(range(12), 3)
|
||||||
|
self.assertEqual(out, [
|
||||||
|
[0, 3, 6, 9],
|
||||||
|
[1, 4, 7, 10],
|
||||||
|
[2, 5, 8, 11],
|
||||||
|
])
|
||||||
|
|
||||||
|
out = utils.distribute_evenly(range(12), 4)
|
||||||
|
self.assertEqual(out, [
|
||||||
|
[0, 4, 8],
|
||||||
|
[1, 5, 9],
|
||||||
|
[2, 6, 10],
|
||||||
|
[3, 7, 11],
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_uneven(self):
|
||||||
|
out = utils.distribute_evenly(range(11), 3)
|
||||||
|
self.assertEqual(out, [
|
||||||
|
[0, 3, 6, 9],
|
||||||
|
[1, 4, 7, 10],
|
||||||
|
[2, 5, 8],
|
||||||
|
])
|
||||||
|
|
||||||
|
def test_just_one(self):
|
||||||
|
out = utils.distribute_evenly(range(5), 1)
|
||||||
|
self.assertEqual(out, [[0, 1, 2, 3, 4]])
|
||||||
|
|
||||||
|
def test_more_buckets_than_items(self):
|
||||||
|
out = utils.distribute_evenly(range(5), 7)
|
||||||
|
self.assertEqual(out, [[0], [1], [2], [3], [4], [], []])
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -1474,13 +1474,15 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||||
worker_args = list(reconstructor.get_worker_args(
|
worker_args = list(reconstructor.get_worker_args(
|
||||||
once=True, devices='sdb,sdd,sdf', partitions='99,333'))
|
once=True, devices='sdb,sdd,sdf', partitions='99,333'))
|
||||||
self.assertEqual(1, len(worker_args))
|
# 3 devices to operate on, 2 workers -> one worker gets two devices
|
||||||
# 5 devices in total, 2 workers -> up to 3 devices per worker so a
|
# and the other worker just gets one
|
||||||
# single worker should handle the requested override devices
|
self.assertEqual([{
|
||||||
self.assertEqual([
|
'override_partitions': [99, 333],
|
||||||
{'override_partitions': [99, 333], 'override_devices': [
|
'override_devices': ['sdb', 'sdf'],
|
||||||
'sdb', 'sdd', 'sdf']},
|
}, {
|
||||||
], worker_args)
|
'override_partitions': [99, 333],
|
||||||
|
'override_devices': ['sdd'],
|
||||||
|
}], worker_args)
|
||||||
|
|
||||||
# with 4 override devices, expect 2 per worker
|
# with 4 override devices, expect 2 per worker
|
||||||
worker_args = list(reconstructor.get_worker_args(
|
worker_args = list(reconstructor.get_worker_args(
|
||||||
@ -1524,26 +1526,41 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
{}, logger=self.logger)
|
{}, logger=self.logger)
|
||||||
reconstructor.get_local_devices = lambda: [
|
reconstructor.get_local_devices = lambda: [
|
||||||
'd%s' % (i + 1) for i in range(21)]
|
'd%s' % (i + 1) for i in range(21)]
|
||||||
# ... with many devices per worker, worker count is pretty granular
|
|
||||||
for i in range(1, 8):
|
# With more devices than workers, the work is spread out as evenly
|
||||||
reconstructor.reconstructor_workers = i
|
# as we can manage. When number-of-devices is a multiple of
|
||||||
self.assertEqual(i, len(list(reconstructor.get_worker_args())))
|
# number-of-workers, every worker has the same number of devices to
|
||||||
# ... then it gets sorta stair step
|
# operate on.
|
||||||
for i in range(9, 10):
|
reconstructor.reconstructor_workers = 7
|
||||||
reconstructor.reconstructor_workers = i
|
worker_args = list(reconstructor.get_worker_args())
|
||||||
self.assertEqual(7, len(list(reconstructor.get_worker_args())))
|
self.assertEqual([len(a['override_devices']) for a in worker_args],
|
||||||
# 2-3 devices per worker
|
[3] * 7)
|
||||||
for args in reconstructor.get_worker_args():
|
|
||||||
self.assertIn(len(args['override_devices']), (2, 3))
|
# When number-of-devices is not a multiple of number-of-workers,
|
||||||
for i in range(11, 20):
|
# device counts differ by at most 1.
|
||||||
reconstructor.reconstructor_workers = i
|
reconstructor.reconstructor_workers = 5
|
||||||
self.assertEqual(11, len(list(reconstructor.get_worker_args())))
|
worker_args = list(reconstructor.get_worker_args())
|
||||||
# 1, 2 devices per worker
|
self.assertEqual(
|
||||||
for args in reconstructor.get_worker_args():
|
sorted([len(a['override_devices']) for a in worker_args]),
|
||||||
self.assertIn(len(args['override_devices']), (1, 2))
|
[4, 4, 4, 4, 5])
|
||||||
# this is debatable, but maybe I'll argue if you're going to have
|
|
||||||
# *some* workers with > 1 device, it's better to have fewer workers
|
# With more workers than devices, we don't create useless workers.
|
||||||
# with devices spread out evenly than a couple outliers?
|
# We'll only make one per device.
|
||||||
|
reconstructor.reconstructor_workers = 22
|
||||||
|
worker_args = list(reconstructor.get_worker_args())
|
||||||
|
self.assertEqual(
|
||||||
|
[len(a['override_devices']) for a in worker_args],
|
||||||
|
[1] * 21)
|
||||||
|
|
||||||
|
# This is true even if we have far more workers than devices.
|
||||||
|
reconstructor.reconstructor_workers = 2 ** 16
|
||||||
|
worker_args = list(reconstructor.get_worker_args())
|
||||||
|
self.assertEqual(
|
||||||
|
[len(a['override_devices']) for a in worker_args],
|
||||||
|
[1] * 21)
|
||||||
|
|
||||||
|
# Spot check one full result for sanity's sake
|
||||||
|
reconstructor.reconstructor_workers = 11
|
||||||
self.assertEqual([
|
self.assertEqual([
|
||||||
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
|
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
|
||||||
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
|
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
|
||||||
@ -1557,12 +1574,6 @@ class TestWorkerReconstructor(unittest.TestCase):
|
|||||||
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
|
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
|
||||||
{'override_partitions': [], 'override_devices': ['d11']},
|
{'override_partitions': [], 'override_devices': ['d11']},
|
||||||
], list(reconstructor.get_worker_args()))
|
], list(reconstructor.get_worker_args()))
|
||||||
# you can't get < than 1 device per worker
|
|
||||||
for i in range(21, 52):
|
|
||||||
reconstructor.reconstructor_workers = i
|
|
||||||
self.assertEqual(21, len(list(reconstructor.get_worker_args())))
|
|
||||||
for args in reconstructor.get_worker_args():
|
|
||||||
self.assertEqual(1, len(args['override_devices']))
|
|
||||||
|
|
||||||
def test_next_rcache_update_configured_with_stats_interval(self):
|
def test_next_rcache_update_configured_with_stats_interval(self):
|
||||||
now = time.time()
|
now = time.time()
|
||||||
|
Loading…
Reference in New Issue
Block a user