diff --git a/swift/common/utils.py b/swift/common/utils.py index 491b6981b9..54efdf2b18 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -4740,3 +4740,13 @@ def parse_override_options(**kwargs): return OverrideOptions(devices=devices, partitions=partitions, policies=policies) + + +def distribute_evenly(items, num_buckets): + """ + Distribute items as evenly as possible into N buckets. + """ + out = [[] for _ in range(num_buckets)] + for index, item in enumerate(items): + out[index % num_buckets].append(item) + return out diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 4abe0ea56e..3a9cfe99ee 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -15,7 +15,6 @@ import json import errno -import math import os from os.path import join import random @@ -34,7 +33,7 @@ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, dump_recon_cache, mkdirs, config_true_value, tpool_reraise, GreenAsyncPile, Timestamp, remove_file, - load_recon_cache, parse_override_options) + load_recon_cache, parse_override_options, distribute_evenly) from swift.common.header_key_dict import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -228,16 +227,14 @@ class ObjectReconstructor(Daemon): yield dict(override_devices=override_opts.devices, override_partitions=override_opts.partitions) return - # for somewhat uniform load per worker use same max_devices_per_worker - # when handling all devices or just override devices... - max_devices_per_worker = int(math.ceil( - 1.0 * len(self.all_local_devices) / self.reconstructor_workers)) - # ...but only use enough workers for the actual devices being handled - n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker)) - override_devices_per_worker = [devices[i::n] for i in range(n)] - for override_devices_pw in override_devices_per_worker: - yield dict(override_devices=override_devices_pw, - override_partitions=override_opts.partitions) + # for somewhat uniform load per worker use same + # max_devices_per_worker when handling all devices or just override + # devices, but only use enough workers for the actual devices being + # handled + n_workers = min(self.reconstructor_workers, len(devices)) + for ods in distribute_evenly(devices, n_workers): + yield dict(override_partitions=override_opts.partitions, + override_devices=ods) def is_healthy(self): """ diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index ba1e3b8a78..ddebdb3a9b 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -35,7 +35,8 @@ from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, \ rsync_module_interpolation, mkdirs, config_true_value, \ tpool_reraise, config_auto_int_value, storage_directory, \ - load_recon_cache, PrefixLoggerAdapter, parse_override_options + load_recon_cache, PrefixLoggerAdapter, parse_override_options, \ + distribute_evenly from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -258,19 +259,14 @@ class ObjectReplicator(Daemon): # Distribute devices among workers as evenly as possible self.replicator_workers = min(self.replicator_workers, len(devices_to_replicate)) - worker_args = [ - { - 'override_devices': [], - 'override_partitions': override_opts.partitions, - 'override_policies': override_opts.policies, - 'have_overrides': have_overrides, - 'multiprocess_worker_index': i, - } - for i in range(self.replicator_workers)] - for index, device in enumerate(devices_to_replicate): - idx = index % self.replicator_workers - worker_args[idx]['override_devices'].append(device) - return worker_args + return [{'override_devices': devs, + 'override_partitions': override_opts.partitions, + 'override_policies': override_opts.policies, + 'have_overrides': have_overrides, + 'multiprocess_worker_index': index} + for index, devs in enumerate( + distribute_evenly(devices_to_replicate, + self.replicator_workers))] def is_healthy(self): """ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index c71b572e31..b9caaabf34 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -6622,5 +6622,39 @@ class TestPipeMutex(unittest.TestCase): eventlet.debug.hub_prevent_multiple_readers(True) +class TestDistributeEvenly(unittest.TestCase): + def test_evenly_divided(self): + out = utils.distribute_evenly(range(12), 3) + self.assertEqual(out, [ + [0, 3, 6, 9], + [1, 4, 7, 10], + [2, 5, 8, 11], + ]) + + out = utils.distribute_evenly(range(12), 4) + self.assertEqual(out, [ + [0, 4, 8], + [1, 5, 9], + [2, 6, 10], + [3, 7, 11], + ]) + + def test_uneven(self): + out = utils.distribute_evenly(range(11), 3) + self.assertEqual(out, [ + [0, 3, 6, 9], + [1, 4, 7, 10], + [2, 5, 8], + ]) + + def test_just_one(self): + out = utils.distribute_evenly(range(5), 1) + self.assertEqual(out, [[0, 1, 2, 3, 4]]) + + def test_more_buckets_than_items(self): + out = utils.distribute_evenly(range(5), 7) + self.assertEqual(out, [[0], [1], [2], [3], [4], [], []]) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 4007d1aa1c..be05adc089 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1474,13 +1474,15 @@ class TestWorkerReconstructor(unittest.TestCase): self.assertEqual(2, reconstructor.reconstructor_workers) worker_args = list(reconstructor.get_worker_args( once=True, devices='sdb,sdd,sdf', partitions='99,333')) - self.assertEqual(1, len(worker_args)) - # 5 devices in total, 2 workers -> up to 3 devices per worker so a - # single worker should handle the requested override devices - self.assertEqual([ - {'override_partitions': [99, 333], 'override_devices': [ - 'sdb', 'sdd', 'sdf']}, - ], worker_args) + # 3 devices to operate on, 2 workers -> one worker gets two devices + # and the other worker just gets one + self.assertEqual([{ + 'override_partitions': [99, 333], + 'override_devices': ['sdb', 'sdf'], + }, { + 'override_partitions': [99, 333], + 'override_devices': ['sdd'], + }], worker_args) # with 4 override devices, expect 2 per worker worker_args = list(reconstructor.get_worker_args( @@ -1524,26 +1526,41 @@ class TestWorkerReconstructor(unittest.TestCase): {}, logger=self.logger) reconstructor.get_local_devices = lambda: [ 'd%s' % (i + 1) for i in range(21)] - # ... with many devices per worker, worker count is pretty granular - for i in range(1, 8): - reconstructor.reconstructor_workers = i - self.assertEqual(i, len(list(reconstructor.get_worker_args()))) - # ... then it gets sorta stair step - for i in range(9, 10): - reconstructor.reconstructor_workers = i - self.assertEqual(7, len(list(reconstructor.get_worker_args()))) - # 2-3 devices per worker - for args in reconstructor.get_worker_args(): - self.assertIn(len(args['override_devices']), (2, 3)) - for i in range(11, 20): - reconstructor.reconstructor_workers = i - self.assertEqual(11, len(list(reconstructor.get_worker_args()))) - # 1, 2 devices per worker - for args in reconstructor.get_worker_args(): - self.assertIn(len(args['override_devices']), (1, 2)) - # this is debatable, but maybe I'll argue if you're going to have - # *some* workers with > 1 device, it's better to have fewer workers - # with devices spread out evenly than a couple outliers? + + # With more devices than workers, the work is spread out as evenly + # as we can manage. When number-of-devices is a multiple of + # number-of-workers, every worker has the same number of devices to + # operate on. + reconstructor.reconstructor_workers = 7 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual([len(a['override_devices']) for a in worker_args], + [3] * 7) + + # When number-of-devices is not a multiple of number-of-workers, + # device counts differ by at most 1. + reconstructor.reconstructor_workers = 5 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + sorted([len(a['override_devices']) for a in worker_args]), + [4, 4, 4, 4, 5]) + + # With more workers than devices, we don't create useless workers. + # We'll only make one per device. + reconstructor.reconstructor_workers = 22 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + [len(a['override_devices']) for a in worker_args], + [1] * 21) + + # This is true even if we have far more workers than devices. + reconstructor.reconstructor_workers = 2 ** 16 + worker_args = list(reconstructor.get_worker_args()) + self.assertEqual( + [len(a['override_devices']) for a in worker_args], + [1] * 21) + + # Spot check one full result for sanity's sake + reconstructor.reconstructor_workers = 11 self.assertEqual([ {'override_partitions': [], 'override_devices': ['d1', 'd12']}, {'override_partitions': [], 'override_devices': ['d2', 'd13']}, @@ -1557,12 +1574,6 @@ class TestWorkerReconstructor(unittest.TestCase): {'override_partitions': [], 'override_devices': ['d10', 'd21']}, {'override_partitions': [], 'override_devices': ['d11']}, ], list(reconstructor.get_worker_args())) - # you can't get < than 1 device per worker - for i in range(21, 52): - reconstructor.reconstructor_workers = i - self.assertEqual(21, len(list(reconstructor.get_worker_args()))) - for args in reconstructor.get_worker_args(): - self.assertEqual(1, len(args['override_devices'])) def test_next_rcache_update_configured_with_stats_interval(self): now = time.time()