diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index a337e7149b..09f3bc7e97 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -440,6 +440,17 @@ use = egg:swift#xprofile # integer value. A negative value implies no limit. # recon_candidates_limit = 5 # +# As the sharder visits each container that's currently sharding it dumps to +# recon their current progress. To be able to mark their progress as completed +# this in-progress check will need to monitor containers that have just +# completed sharding. The recon_sharded_timeout parameter says for how long a +# container whose just finished sharding should be checked by the in-progress +# check. This is to allow anything monitoring the sharding recon dump to have +# enough time to collate and see things complete. The time is capped at +# reclaim_age, so this parameter should be less than or equal to reclaim_age. +# The default is 12 hours (12 x 60 x 60) +# recon_sharded_timeout = 43200 +# # Large databases tend to take a while to work with, but we want to make sure # we write down our progress. Use a larger-than-normal broker timeout to make # us less likely to bomb out on a LockTimeout. diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 0bfa42a231..d577f03161 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -518,6 +518,8 @@ class ContainerSharder(ContainerReplicator): self.shrinking_candidates = [] self.recon_candidates_limit = int( conf.get('recon_candidates_limit', 5)) + self.recon_sharded_timeout = int( + conf.get('recon_sharded_timeout', 43200)) self.broker_timeout = config_positive_int_value( conf.get('broker_timeout', 60)) replica_count = self.ring.replica_count @@ -645,9 +647,19 @@ class ContainerSharder(ContainerReplicator): def _record_sharding_progress(self, broker, node, error): own_shard_range = broker.get_own_shard_range() - if (broker.get_db_state() in (UNSHARDED, SHARDING) and - own_shard_range.state in (ShardRange.SHARDING, - ShardRange.SHARDED)): + db_state = broker.get_db_state() + if (db_state in (UNSHARDED, SHARDING, SHARDED) + and own_shard_range.state in (ShardRange.SHARDING, + ShardRange.SHARDED)): + if db_state == SHARDED: + context_ts = max([float(ts) for c, ts in + CleavingContext.load_all(broker)]) or None + if not context_ts or (context_ts + self.recon_sharded_timeout + < Timestamp.now().timestamp): + # not contexts or last context timestamp too old for the + # broker to be recorded + return + info = self._make_stats_info(broker, node, own_shard_range) info['state'] = own_shard_range.state_text info['db_state'] = broker.get_db_state() @@ -995,8 +1007,11 @@ class ContainerSharder(ContainerReplicator): def _audit_cleave_contexts(self, broker): now = Timestamp.now() for context, last_mod in CleavingContext.load_all(broker): - if Timestamp(last_mod).timestamp + self.reclaim_age < \ - now.timestamp: + last_mod = Timestamp(last_mod) + is_done = context.done() and last_mod.timestamp + \ + self.recon_sharded_timeout < now.timestamp + is_stale = last_mod.timestamp + self.reclaim_age < now.timestamp + if is_done or is_stale: context.delete(broker) def _audit_container(self, broker): @@ -1646,7 +1661,6 @@ class ContainerSharder(ContainerReplicator): modified_shard_ranges.append(own_shard_range) broker.merge_shard_ranges(modified_shard_ranges) if broker.set_sharded_state(): - cleaving_context.delete(broker) return True else: self.logger.warning( diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index b8154fdf81..3cf785a50b 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -41,6 +41,7 @@ from test.probe.brain import BrainSplitter from test.probe.common import ReplProbeTest, get_server_number, \ wait_for_server_to_hangup from test.debug_logger import debug_logger +import mock MIN_SHARD_CONTAINER_THRESHOLD = 4 @@ -888,8 +889,10 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges) self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE], [sr['state'] for sr in orig_root_shard_ranges]) - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual([], contexts) # length check + # Contexts should still be there, and should be complete + contexts = set([ctx.done() + for ctx, _ in CleavingContext.load_all(broker)]) + self.assertEqual({True}, contexts) self.direct_delete_container(expect_failure=True) self.assertLengthEqual(found['normal_dbs'], 2) @@ -939,9 +942,10 @@ class TestContainerSharding(BaseTestContainerSharding): orig['state_timestamp']) self.assertGreaterEqual(updated.meta_timestamp, orig['meta_timestamp']) - - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual([], contexts) # length check + # Contexts should still be there, and should be complete + contexts = set([ctx.done() + for ctx, _ in CleavingContext.load_all(broker)]) + self.assertEqual({True}, contexts) # Check that entire listing is available headers, actual_listing = self.assert_container_listing(obj_names) @@ -1197,9 +1201,11 @@ class TestContainerSharding(BaseTestContainerSharding): [ShardRange.ACTIVE] * 3, [sr.state for sr in broker.get_shard_ranges()]) - # Make sure our cleaving contexts got cleaned up - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual([], contexts) + # Contexts should still be there, and should be complete + contexts = set([ctx.done() + for ctx, _ + in CleavingContext.load_all(broker)]) + self.assertEqual({True}, contexts) # check root shard ranges root_shard_ranges = self.direct_get_container_shard_ranges() @@ -2021,6 +2027,26 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2, [sr.state for sr in shard_ranges]) + # Check the current progress. It shouldn't be complete. + recon = direct_client.direct_get_recon(leader_node, "sharding") + expected_in_progress = {'all': [{'account': 'AUTH_test', + 'active': 0, + 'cleaved': 2, + 'created': 2, + 'found': 0, + 'db_state': 'sharding', + 'state': 'sharding', + 'error': None, + 'file_size': mock.ANY, + 'meta_timestamp': mock.ANY, + 'node_index': 0, + 'object_count': len(obj_names), + 'container': mock.ANY, + 'path': mock.ANY, + 'root': mock.ANY}]} + actual = recon['sharding_stats']['sharding']['sharding_in_progress'] + self.assertEqual(expected_in_progress, actual) + # stop *all* container servers for third shard range sr_part, sr_node_nums = self.get_part_and_node_numbers(shard_ranges[2]) for node_num in sr_node_nums: @@ -2078,6 +2104,26 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertEqual([ShardRange.ACTIVE] * 4, [sr.state for sr in shard_ranges]) + # Check the leader's progress again, this time is should be complete + recon = direct_client.direct_get_recon(leader_node, "sharding") + expected_in_progress = {'all': [{'account': 'AUTH_test', + 'active': 4, + 'cleaved': 0, + 'created': 0, + 'found': 0, + 'db_state': 'sharded', + 'state': 'sharded', + 'error': None, + 'file_size': mock.ANY, + 'meta_timestamp': mock.ANY, + 'node_index': 0, + 'object_count': len(obj_names), + 'container': mock.ANY, + 'path': mock.ANY, + 'root': mock.ANY}]} + actual = recon['sharding_stats']['sharding']['sharding_in_progress'] + self.assertEqual(expected_in_progress, actual) + def test_sharded_delete(self): all_obj_names = self._make_object_names(self.max_shard_size) self.put_objects(all_obj_names) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index b7cd4e2a14..c89f0eedfb 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -170,6 +170,7 @@ class TestSharder(BaseTestSharder): 'shards_account_prefix': '.shards_', 'auto_shard': False, 'recon_candidates_limit': 5, + 'recon_sharded_timeout': 43200, 'shard_replication_quorum': 2, 'existing_shard_replication_quorum': 2 } @@ -201,6 +202,7 @@ class TestSharder(BaseTestSharder): 'auto_create_account_prefix': '...', 'auto_shard': 'yes', 'recon_candidates_limit': 10, + 'recon_sharded_timeout': 7200, 'shard_replication_quorum': 1, 'existing_shard_replication_quorum': 0 } @@ -223,6 +225,7 @@ class TestSharder(BaseTestSharder): 'shards_account_prefix': '...shards_', 'auto_shard': True, 'recon_candidates_limit': 10, + 'recon_sharded_timeout': 7200, 'shard_replication_quorum': 1, 'existing_shard_replication_quorum': 0 } @@ -694,6 +697,114 @@ class TestSharder(BaseTestSharder): expected_candidate_stats, sharder, 'sharding_candidates') self._assert_recon_stats(None, sharder, 'sharding_progress') + # let's progress broker 1 (broker[0]) + brokers[0].enable_sharding(next(self.ts_iter)) + brokers[0].set_sharding_state() + shard_ranges = brokers[0].get_shard_ranges() + for sr in shard_ranges[:-1]: + sr.update_state(ShardRange.CLEAVED) + brokers[0].merge_shard_ranges(shard_ranges) + + with mock.patch('eventlet.sleep'), mock.patch.object( + sharder, '_process_broker' + ) as mock_process_broker: + sharder._local_device_ids = {999} + sharder._one_shard_cycle(Everything(), Everything()) + + expected_in_progress_stats = { + 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0', + 'meta_timestamp': mock.ANY, + 'file_size': os.stat(brokers[0].db_file).st_size, + 'path': brokers[0].db_file, 'root': 'a/c0', + 'node_index': 0, + 'found': 1, 'created': 0, 'cleaved': 3, 'active': 1, + 'state': 'sharding', 'db_state': 'sharding', + 'error': None}, + {'object_count': 0, 'account': 'a', 'container': 'c1', + 'meta_timestamp': mock.ANY, + 'file_size': os.stat(brokers[1].db_file).st_size, + 'path': brokers[1].db_file, 'root': 'a/c1', + 'node_index': 1, + 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2, + 'state': 'sharding', 'db_state': 'unsharded', + 'error': None}]} + self._assert_stats( + expected_in_progress_stats, sharder, 'sharding_in_progress') + + # Now complete sharding broker 1. + shard_ranges[-1].update_state(ShardRange.CLEAVED) + own_sr = brokers[0].get_own_shard_range() + own_sr.update_state(ShardRange.SHARDED) + brokers[0].merge_shard_ranges(shard_ranges + [own_sr]) + # make and complete a cleave context, this is used for the + # recon_sharded_timeout timer. + cxt = CleavingContext.load(brokers[0]) + cxt.misplaced_done = cxt.cleaving_done = True + ts_now = next(self.ts_iter) + with mock_timestamp_now(ts_now): + cxt.store(brokers[0]) + self.assertTrue(brokers[0].set_sharded_state()) + + with mock.patch('eventlet.sleep'), \ + mock.patch.object(sharder, '_process_broker') \ + as mock_process_broker, mock_timestamp_now(ts_now): + sharder._local_device_ids = {999} + sharder._one_shard_cycle(Everything(), Everything()) + + expected_in_progress_stats = { + 'all': [{'object_count': 0, 'account': 'a', 'container': 'c0', + 'meta_timestamp': mock.ANY, + 'file_size': os.stat(brokers[0].db_file).st_size, + 'path': brokers[0].db_file, 'root': 'a/c0', + 'node_index': 0, + 'found': 0, 'created': 0, 'cleaved': 4, 'active': 1, + 'state': 'sharded', 'db_state': 'sharded', + 'error': None}, + {'object_count': 0, 'account': 'a', 'container': 'c1', + 'meta_timestamp': mock.ANY, + 'file_size': os.stat(brokers[1].db_file).st_size, + 'path': brokers[1].db_file, 'root': 'a/c1', + 'node_index': 1, + 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2, + 'state': 'sharding', 'db_state': 'unsharded', + 'error': None}]} + self._assert_stats( + expected_in_progress_stats, sharder, 'sharding_in_progress') + + # one more cycle at recon_sharded_timeout seconds into the + # future to check that the completed broker is still reported + ts_now = Timestamp(ts_now.timestamp + + sharder.recon_sharded_timeout) + with mock.patch('eventlet.sleep'), \ + mock.patch.object(sharder, '_process_broker') \ + as mock_process_broker, mock_timestamp_now(ts_now): + sharder._local_device_ids = {999} + sharder._one_shard_cycle(Everything(), Everything()) + self._assert_stats( + expected_in_progress_stats, sharder, 'sharding_in_progress') + + # when we move recon_sharded_timeout + 1 seconds into the future, + # broker 1 will be removed from the progress report + ts_now = Timestamp(ts_now.timestamp + + sharder.recon_sharded_timeout + 1) + with mock.patch('eventlet.sleep'), \ + mock.patch.object(sharder, '_process_broker') \ + as mock_process_broker, mock_timestamp_now(ts_now): + sharder._local_device_ids = {999} + sharder._one_shard_cycle(Everything(), Everything()) + + expected_in_progress_stats = { + 'all': [{'object_count': 0, 'account': 'a', 'container': 'c1', + 'meta_timestamp': mock.ANY, + 'file_size': os.stat(brokers[1].db_file).st_size, + 'path': brokers[1].db_file, 'root': 'a/c1', + 'node_index': 1, + 'found': 0, 'created': 2, 'cleaved': 1, 'active': 2, + 'state': 'sharding', 'db_state': 'unsharded', + 'error': None}]} + self._assert_stats( + expected_in_progress_stats, sharder, 'sharding_in_progress') + def test_ratelimited_roundrobin(self): n_databases = 100 @@ -5426,21 +5537,21 @@ class TestSharder(BaseTestSharder): def test_audit_cleave_contexts(self): - def add_cleave_context(id, last_modified): + def add_cleave_context(id, last_modified, cleaving_done): params = {'ref': id, 'cursor': 'curs', 'max_row': 2, 'cleave_to_row': 2, 'last_cleave_to_row': 1, - 'cleaving_done': False, + 'cleaving_done': cleaving_done, 'misplaced_done': True, 'ranges_done': 2, 'ranges_todo': 4} key = 'X-Container-Sysmeta-Shard-Context-%s' % id - with mock_timestamp_now(Timestamp(last_modified)): + with mock_timestamp_now(last_modified): broker.update_metadata( {key: (json.dumps(params), - Timestamp(last_modified).internal)}) + last_modified.internal)}) def get_context(id, broker): data = broker.get_sharding_sysmeta().get('Context-%s' % id) @@ -5449,6 +5560,7 @@ class TestSharder(BaseTestSharder): return data reclaim_age = 100 + recon_sharded_timeout = 50 broker = self._make_broker() # sanity check @@ -5456,25 +5568,43 @@ class TestSharder(BaseTestSharder): self.assertEqual(UNSHARDED, broker.get_db_state()) # Setup some cleaving contexts - id_old, id_newish = [str(uuid4()) for _ in range(2)] - contexts = ((id_old, 1), - (id_newish, reclaim_age // 2)) - for id, last_modified in contexts: - add_cleave_context(id, last_modified) + id_old, id_newish, id_complete = [str(uuid4()) for _ in range(3)] + ts_old, ts_newish, ts_complete = ( + Timestamp(1), + Timestamp(reclaim_age // 2), + Timestamp(reclaim_age - recon_sharded_timeout)) + contexts = ((id_old, ts_old, False), + (id_newish, ts_newish, False), + (id_complete, ts_complete, True)) + for id, last_modified, cleaving_done in contexts: + add_cleave_context(id, last_modified, cleaving_done) - with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + sharder_conf = {'reclaim_age': str(reclaim_age), + 'recon_sharded_timeout': str(recon_sharded_timeout)} + + with self._mock_sharder(sharder_conf) as sharder: with mock_timestamp_now(Timestamp(reclaim_age + 2)): sharder._audit_cleave_contexts(broker) + # old context is stale, ie last modified reached reclaim_age and was + # never completed (done). old_ctx = get_context(id_old, broker) self.assertEqual(old_ctx, "") + # Newish context is almost stale, as in it's been 1/2 reclaim age since + # it was last modified yet it's not completed. So it haven't been + # cleaned up. newish_ctx = get_context(id_newish, broker) self.assertEqual(newish_ctx.ref, id_newish) - # If we push time another reclaim age later, and they all be removed - # minus id_missing_lm as it has a later last_modified. - with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + # Complete context is complete (done) and it's been + # recon_sharded_timeout time since it was marked completed so it's + # been removed + complete_ctx = get_context(id_complete, broker) + self.assertEqual(complete_ctx, "") + + # If we push time another reclaim age later, they are all removed + with self._mock_sharder(sharder_conf) as sharder: with mock_timestamp_now(Timestamp(reclaim_age * 2)): sharder._audit_cleave_contexts(broker) @@ -5853,6 +5983,7 @@ class TestCleavingContext(BaseTestSharder): # Now let's delete it. When deleted the metadata key will exist, but # the value will be "" as this means it'll be reaped later. ctx.delete(broker) + sysmeta = broker.get_sharding_sysmeta() for key, val in sysmeta.items(): if key == "Context-%s" % db_id: diff --git a/tools/playbooks/common/install_dependencies.yaml b/tools/playbooks/common/install_dependencies.yaml index 893b7d03b9..687607fad4 100644 --- a/tools/playbooks/common/install_dependencies.yaml +++ b/tools/playbooks/common/install_dependencies.yaml @@ -40,3 +40,4 @@ - nose - pyeclib - python-swiftclient + - mock