From 8ee631cceeac560cc272693b4a70567a271f2754 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Mon, 15 Nov 2021 18:38:12 +0000 Subject: [PATCH] reconstructor: restrict max objects per revert job Previously the ssync Sender would attempt to revert all objects in a partition within a single SSYNC request. With this change the reconstructor daemon option max_objects_per_revert can be used to limit the number of objects reverted inside a single SSYNC request for revert type jobs i.e. when reverting handoff partitions. If more than max_objects_per_revert are available, the remaining objects will remain in the sender partition and will not be reverted until the next call to ssync.Sender, which would currrently be the next time the reconstructor visits that handoff partition. Note that the option only applies to handoff revert jobs, not to sync jobs. Change-Id: If81760c80a4692212e3774e73af5ce37c02e8aff --- doc/source/config/object_server_config.rst | 57 +++++++++ etc/object-server.conf-sample | 14 +++ swift/obj/reconstructor.py | 14 ++- swift/obj/ssync_sender.py | 14 ++- test/unit/obj/test_reconstructor.py | 89 +++++++++++++- test/unit/obj/test_ssync_sender.py | 130 ++++++++++++++++++++- 6 files changed, 308 insertions(+), 10 deletions(-) diff --git a/doc/source/config/object_server_config.rst b/doc/source/config/object_server_config.rst index 8dc5e3127d..7f5c4737ec 100644 --- a/doc/source/config/object_server_config.rst +++ b/doc/source/config/object_server_config.rst @@ -466,6 +466,63 @@ handoffs_only false The handoffs_only mode op temporary use and should be disabled as soon as the emergency situation has been resolved. +rebuild_handoff_node_count 2 The default strategy for unmounted + drives will stage + rebuilt data on a + handoff node until + updated rings are + deployed. Because + fragments are rebuilt on + offset handoffs based on + fragment index and the + proxy limits how deep it + will search for EC frags + we restrict how many + nodes we'll try. + Setting to 0 will + disable rebuilds to + handoffs and only + rebuild fragments for + unmounted devices to + mounted primaries after + a ring change. Setting + to -1 means "no limit". +max_objects_per_revert 0 By default the reconstructor + attempts to revert all + objects from handoff + partitions in a single + batch using a single + SSYNC request. In + exceptional + circumstances + max_objects_per_revert + can be used to + temporarily limit the + number of objects + reverted by each + reconstructor revert + type job. If more than + max_objects_per_revert + are available in a + sender's handoff + partition, the remaining + objects will remain in + the handoff partition + and will not be reverted + until the next time the + reconstructor visits + that handoff partition + i.e. with this option + set, a single cycle of + the reconstructor may + not completely revert + all handoff partitions. + The option has no effect + on reconstructor sync + type jobs between + primary partitions. A + value of 0 (the default) + means there is no limit. node_timeout DEFAULT or 10 Request timeout to external services. The value used is the value set in this section, or the value set diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index db4d226360..e9557b6c27 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -358,6 +358,7 @@ use = egg:swift#recon # lockup_timeout = 1800 # ring_check_interval = 15.0 # recon_cache_path = /var/cache/swift +# # The handoffs_only mode option is for special case emergency situations during # rebalance such as disk full in the cluster. This option SHOULD NOT BE # CHANGED, except for extreme situations. When handoffs_only mode is enabled @@ -380,6 +381,19 @@ use = egg:swift#recon # Setting to -1 means "no limit". # rebuild_handoff_node_count = 2 # +# By default the reconstructor attempts to revert all objects from handoff +# partitions in a single batch using a single SSYNC request. In exceptional +# circumstances max_objects_per_revert can be used to temporarily limit the +# number of objects reverted by each reconstructor revert type job. If more +# than max_objects_per_revert are available in a sender's handoff partition, +# the remaining objects will remain in the handoff partition and will not be +# reverted until the next time the reconstructor visits that handoff partition +# i.e. with this option set, a single cycle of the reconstructor may not +# completely revert all handoff partitions. The option has no effect on +# reconstructor sync type jobs between primary partitions. A value of 0 (the +# default) means there is no limit. +# max_objects_per_revert = 0 +# # You can set scheduling priority of processes. Niceness values range from -20 # (most favorable to the process) to 19 (least favorable to the process). # nice_priority = diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index accda48510..c1c3636492 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -241,7 +241,8 @@ class ObjectReconstructor(Daemon): conf.get('reclaim_age', DEFAULT_RECLAIM_AGE))) self.request_node_count = config_request_node_count_value( conf.get('request_node_count', '2 * replicas')) - + self.max_objects_per_revert = non_negative_int( + conf.get('max_objects_per_revert', 0)) # When upgrading from liberasurecode<=1.5.0, you may want to continue # writing legacy CRCs until all nodes are upgraded and capabale of # reading fragments with zlib CRCs. @@ -1058,9 +1059,13 @@ class ObjectReconstructor(Daemon): if not suffixes: continue - # ssync any out-of-sync suffixes with the remote node + # ssync any out-of-sync suffixes with the remote node; do not limit + # max_objects - we need to check them all because, unlike a revert + # job, we don't purge any objects so start with the same set each + # cycle success, _ = ssync_sender( - self, node, job, suffixes, include_non_durable=False)() + self, node, job, suffixes, include_non_durable=False, + max_objects=0)() # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) @@ -1088,7 +1093,8 @@ class ObjectReconstructor(Daemon): node['index']) success, in_sync_objs = ssync_sender( self, node, job, job['suffixes'], - include_non_durable=True)() + include_non_durable=True, + max_objects=self.max_objects_per_revert)() if success: syncd_with += 1 reverted_objs.update(in_sync_objs) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 700b097c45..309bcc1b61 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -144,7 +144,7 @@ class Sender(object): """ def __init__(self, daemon, node, job, suffixes, remote_check_objs=None, - include_non_durable=False): + include_non_durable=False, max_objects=0): self.daemon = daemon self.df_mgr = self.daemon._df_router[job['policy']] self.node = node @@ -154,6 +154,7 @@ class Sender(object): # make sure those objects exist or not in remote. self.remote_check_objs = remote_check_objs self.include_non_durable = include_non_durable + self.max_objects = max_objects def __call__(self): """ @@ -319,6 +320,17 @@ class Sender(object): sleep() # Gives a chance for other greenthreads to run nlines += 1 nbytes += len(msg) + if 0 < self.max_objects <= nlines: + for _ in hash_gen: + # only log truncation if there were more hashes to come... + self.daemon.logger.info( + 'ssync missing_check truncated after %d objects: ' + 'device: %s, part: %s, policy: %s, last object hash: ' + '%s', nlines, self.job['device'], + self.job['partition'], int(self.job['policy']), + object_hash) + break + break with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check end'): msg = b':MISSING_CHECK: END\r\n' diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index ef3999296f..f8f71e3581 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1108,12 +1108,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): """ class _fake_ssync(object): def __init__(self, daemon, node, job, suffixes, - include_non_durable=False, **kwargs): + include_non_durable=False, max_objects=0, + **kwargs): # capture context and generate an available_map of objs context = {} context['node'] = node context['job'] = job context['suffixes'] = suffixes + context['max_objects'] = max_objects self.suffixes = suffixes self.daemon = daemon self.job = job @@ -1124,8 +1126,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): frag_index=self.job.get('frag_index'), frag_prefs=frag_prefs) self.available_map = {} + nlines = 0 for hash_, timestamps in hash_gen: self.available_map[hash_] = timestamps + nlines += 1 + if 0 < max_objects <= nlines: + break + context['available_map'] = self.available_map ssync_calls.append(context) self.success = True @@ -1179,6 +1186,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): context['available_map'])) else: self.assertFalse(context.get('include_non_durable')) + self.assertEqual(0, context.get('max_objects')) mock_delete.assert_has_calls(expected_calls, any_order=True) @@ -1207,10 +1215,32 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): filename.endswith(data_file_tail), filename) else: self.assertFalse(context.get('include_non_durable')) + self.assertEqual(0, context.get('max_objects')) # sanity check that some files should were deleted self.assertGreater(n_files, n_files_after) + def test_max_objects_per_revert_only_for_revert_jobs(self): + # verify max_objects_per_revert option is only passed to revert jobs + ssync_calls = [] + conf = dict(self.conf, max_objects_per_revert=2) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)), \ + mocked_http_conn(*[200] * 6, body=pickle.dumps({})): + reconstructor = object_reconstructor.ObjectReconstructor( + conf, logger=self.logger) + reconstructor.reconstruct() + reverts = syncs = 0 + for context in ssync_calls: + if context['job']['job_type'] == REVERT: + self.assertEqual(2, context.get('max_objects')) + reverts += 1 + else: + self.assertEqual(0, context.get('max_objects')) + syncs += 1 + self.assertGreater(reverts, 0) + self.assertGreater(syncs, 0) + def test_delete_reverted_nondurable(self): # verify reconstructor only deletes reverted nondurable fragments older # commit_window @@ -1419,6 +1449,63 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): with self.assertRaises(DiskFileError): df_older.writer().commit(ts_old) + def test_delete_reverted_max_objects_per_revert(self): + # verify reconstructor only deletes objects that were actually reverted + # when ssync is limited by max_objects_per_revert + shutil.rmtree(self.ec_obj_path) + ips = utils.whataremyips(self.reconstructor.bind_ip) + local_devs = [dev for dev in self.ec_obj_ring.devs + if dev and dev['replication_ip'] in ips and + dev['replication_port'] == + self.reconstructor.port] + partition = (local_devs[0]['id'] + 1) % 3 + # 2 durable objects + df_0 = self._create_diskfile( + object_name='zero', part=partition) + datafile_0 = df_0.manager.cleanup_ondisk_files( + df_0._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_0)) + df_1 = self._create_diskfile( + object_name='one', part=partition) + datafile_1 = df_1.manager.cleanup_ondisk_files( + df_1._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_1)) + df_2 = self._create_diskfile( + object_name='two', part=partition) + datafile_2 = df_2.manager.cleanup_ondisk_files( + df_2._datadir, frag_prefs=[])['data_file'] + self.assertTrue(os.path.exists(datafile_2)) + + datafiles = [datafile_0, datafile_1, datafile_2] + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual(datafiles, actual_datafiles) + + # only two object will be sync'd and purged... + ssync_calls = [] + conf = dict(self.conf, max_objects_per_revert=2, handoffs_only=True) + self.reconstructor = object_reconstructor.ObjectReconstructor( + conf, logger=self.logger) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertEqual(2, context.get('max_objects')) + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual(1, len(actual_datafiles), actual_datafiles) + + # ...until next reconstructor run which will sync and purge the last + # object + ssync_calls = [] + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.reconstruct() + for context in ssync_calls: + self.assertEqual(REVERT, context['job']['job_type']) + self.assertEqual(2, context.get('max_objects')) + actual_datafiles = [df for df in datafiles if os.path.exists(df)] + self.assertEqual([], actual_datafiles) + def test_no_delete_failed_revert(self): # test will only process revert jobs self.reconstructor.handoffs_only = True diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index d27ae79a01..cd7999e41e 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -102,10 +102,10 @@ class TestSender(BaseTest): self.daemon_logger = debug_logger('test-ssync-sender') self.daemon = ObjectReplicator(self.daemon_conf, self.daemon_logger) - job = {'policy': POLICIES.legacy, - 'device': 'test-dev', - 'partition': '99'} # sufficient for Sender.__init__ - self.sender = ssync_sender.Sender(self.daemon, None, job, None) + self.job = {'policy': POLICIES.legacy, + 'device': 'test-dev', + 'partition': '99'} # sufficient for Sender.__init__ + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None) def test_call_catches_MessageTimeout(self): @@ -810,6 +810,9 @@ class TestSender(BaseTest): {'ts_data': Timestamp(1380144471.00000)}) connection = FakeConnection() response = FakeResponse() + # max_objects unlimited + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=0) self.sender.daemon.node_timeout = 0.01 self.sender.df_mgr.yield_hashes = yield_hashes sleeps = [0, 0, 1] @@ -874,6 +877,10 @@ class TestSender(BaseTest): 'No match for %r %r %r %r' % (device, partition, policy, suffixes)) + # note: max_objects > number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=4) + connection = FakeConnection() self.sender.job = { 'device': 'dev', @@ -908,6 +915,121 @@ class TestSender(BaseTest): ts_meta=Timestamp(1380144475.44444), ts_ctype=Timestamp(1380144474.44448)))] self.assertEqual(available_map, dict(candidates)) + self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) + + def test_missing_check_max_objects_less_than_actual_objects(self): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + # verify missing_check stops after 2 objects even though more + # objects would yield + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and + suffixes == ['abc', 'def']): + yield ( + '9d41d8cd98f00b204e9800998ecf0abc', + {'ts_data': Timestamp(1380144470.00000)}) + yield ( + '9d41d8cd98f00b204e9800998ecf0def', + {'ts_data': Timestamp(1380144472.22222), + 'ts_meta': Timestamp(1380144473.22222)}) + yield ( + '9d41d8cd98f00b204e9800998ecf1def', + {'ts_data': Timestamp(1380144474.44444), + 'ts_ctype': Timestamp(1380144474.44448), + 'ts_meta': Timestamp(1380144475.44444)}) + else: + raise Exception( + 'No match for %r %r %r %r' % (device, partition, + policy, suffixes)) + + # max_objects < number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=2) + + connection = FakeConnection() + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } + self.sender.suffixes = ['abc', 'def'] + response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.df_mgr.yield_hashes = yield_hashes + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' + b'm:186a0\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(send_map, {}) + candidates = [('9d41d8cd98f00b204e9800998ecf0abc', + dict(ts_data=Timestamp(1380144470.00000))), + ('9d41d8cd98f00b204e9800998ecf0def', + dict(ts_data=Timestamp(1380144472.22222), + ts_meta=Timestamp(1380144473.22222)))] + self.assertEqual(available_map, dict(candidates)) + self.assertEqual( + ['ssync missing_check truncated after 2 objects: device: dev, ' + 'part: 9, policy: 0, last object hash: ' + '9d41d8cd98f00b204e9800998ecf0def'], + self.daemon_logger.get_lines_for_level('info')) + + def test_missing_check_max_objects_exactly_actual_objects(self): + def yield_hashes(device, partition, policy, suffixes=None, **kwargs): + if (device == 'dev' and partition == '9' and + policy == POLICIES.legacy and + suffixes == ['abc', 'def']): + yield ( + '9d41d8cd98f00b204e9800998ecf0abc', + {'ts_data': Timestamp(1380144470.00000)}) + yield ( + '9d41d8cd98f00b204e9800998ecf0def', + {'ts_data': Timestamp(1380144472.22222), + 'ts_meta': Timestamp(1380144473.22222)}) + else: + raise Exception( + 'No match for %r %r %r %r' % (device, partition, + policy, suffixes)) + + # max_objects == number that would yield + self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + max_objects=2) + + connection = FakeConnection() + self.sender.job = { + 'device': 'dev', + 'partition': '9', + 'policy': POLICIES.legacy, + } + self.sender.suffixes = ['abc', 'def'] + response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.df_mgr.yield_hashes = yield_hashes + available_map, send_map = self.sender.missing_check(connection, + response) + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 ' + b'm:186a0\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(send_map, {}) + candidates = [('9d41d8cd98f00b204e9800998ecf0abc', + dict(ts_data=Timestamp(1380144470.00000))), + ('9d41d8cd98f00b204e9800998ecf0def', + dict(ts_data=Timestamp(1380144472.22222), + ts_meta=Timestamp(1380144473.22222)))] + self.assertEqual(available_map, dict(candidates)) + # nothing logged re: truncation + self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) def test_missing_check_far_end_disconnect(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):