From ece4b04e8215f80b55c367abc523290b503efc76 Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Mon, 8 Aug 2022 16:25:55 +1000 Subject: [PATCH] Sharding: No stat updates before CLEAVED state Once a shard container has been created as part of the sharder cycle it pulls the shards own_shard_range, updates the object_count and bytes_used and pushes this to the root container. The root container can use these to display the current container stats. However, it is not until a shard gets to the CLEAVED state, that it holds enough information for it's namespace, so before this the number it returns is incorrect. Further, when we find and create a shard, it starts out with the number of objects, at the time, that are expected to go into them. This is better answer then, say, nothing. So it's better for the shard to send it's current own_shard_range but don't update the stats until it can be authoritive of that answer. This patch adds a new SHARD_UPDATE_STAT_STATES that track what ShardRange states a shard needs to be in in order to be responsible, current definition is: SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED, ShardRange.SHRINKING, ShardRange.SHRUNK] As we don't want to update the OSR stats and the meta_timestmap, also move tombstone updates to only happen when in a SHARD_UPDATE_STAT_STATES state. Change-Id: I838dbba3c791fffa6a36ffdcf73eceeaff718373 --- swift/container/backend.py | 18 ++++-- swift/container/sharder.py | 33 +++++++---- test/probe/test_sharder.py | 31 ++++++++-- test/unit/container/test_sharder.py | 88 ++++++++++++++++++++--------- 4 files changed, 122 insertions(+), 48 deletions(-) diff --git a/swift/container/backend.py b/swift/container/backend.py index 90f2bf73f6..132f9ed551 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -61,6 +61,12 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED, ShardRange.SHRINKING, ShardRange.SHRUNK] +# shard's may not be fully populated while in the FOUND and CREATED +# state, so shards should only update their own shard range's object +# stats when they are in the following states +SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE, + ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK] # attribute names in order used when transforming shard ranges from dicts to # tuples and vice-versa @@ -2351,10 +2357,14 @@ class ContainerBroker(DatabaseBroker): # object_count shard_size = object_count - progress - # NB shard ranges are created with a non-zero object count so that - # the apparent container object count remains constant, and the - # container is non-deletable while shards have been found but not - # yet cleaved + # NB shard ranges are created with a non-zero object count for a + # few reasons: + # 1. so that the apparent container object count remains + # consistent; + # 2. the container is non-deletable while shards have been found + # but not yet cleaved; and + # 3. So we have a rough idea of size of the shards should be + # while cleaving. found_ranges.append( {'index': index, 'lower': str(last_shard_upper), diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 14de69d0dc..7afa3d840f 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -40,7 +40,7 @@ from swift.common.utils import get_logger, config_true_value, \ Everything, config_auto_int_value, ShardRangeList, config_percent_value from swift.container.backend import ContainerBroker, \ RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \ - SHARD_UPDATE_STATES, sift_shard_ranges + SHARD_UPDATE_STATES, sift_shard_ranges, SHARD_UPDATE_STAT_STATES from swift.container.replicator import ContainerReplicator @@ -2260,20 +2260,29 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): if not own_shard_range: return - # do a reclaim *now* in order to get best estimate of tombstone count - # that is consistent with the current object_count - reclaimer = self._reclaim(broker) - tombstones = reclaimer.get_tombstone_count() - self.logger.debug('tombstones in %s = %d', - quote(broker.path), tombstones) - own_shard_range.update_tombstones(tombstones) - update_own_shard_range_stats(broker, own_shard_range) + # Don't update the osr stats including tombstones unless its CLEAVED+ + if own_shard_range.state in SHARD_UPDATE_STAT_STATES: + # do a reclaim *now* in order to get best estimate of tombstone + # count that is consistent with the current object_count + reclaimer = self._reclaim(broker) + tombstones = reclaimer.get_tombstone_count() + self.logger.debug('tombstones in %s = %d', + quote(broker.path), tombstones) + # shrinking candidates are found in the root DB so that's the only + # place we need up to date tombstone stats. + own_shard_range.update_tombstones(tombstones) + update_own_shard_range_stats(broker, own_shard_range) + + if not own_shard_range.reported: + broker.merge_shard_ranges(own_shard_range) + + # we can't use `state not in SHARD_UPDATE_STAT_STATES` to return + # because there are cases we still want to update root even if the + # stats are wrong. Such as it's a new shard or something else has + # decided to remove the latch to update root. if own_shard_range.reported: - # no change to the stats metadata return - # stats metadata has been updated so persist it - broker.merge_shard_ranges(own_shard_range) # now get a consistent list of own and other shard ranges shard_ranges = broker.get_shard_ranges( include_own=True, diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 090d7048c1..302783a56b 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -2935,7 +2935,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding): self.sharders.once(**kwargs) def test_manage_shard_ranges(self): - obj_names = self._make_object_names(7) + obj_names = self._make_object_names(10) self.put_objects(obj_names) client.post_container(self.url, self.admin_token, self.container_name, @@ -2954,17 +2954,38 @@ class TestManagedContainerSharding(BaseTestContainerSharding): 'swift-manage-shard-ranges', self.get_db_file(self.brain.part, self.brain.nodes[0]), 'find_and_replace', '3', '--enable', '--minimum-shard-size', '2']) - self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 3) # "Run container-replicator to replicate them to other nodes." self.replicators.once() # "Run container-sharder on all nodes to shard the container." + # first pass cleaves 2 shards + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'sharding', 3) + self.assert_container_state(self.brain.nodes[1], 'sharding', 3) + shard_ranges = self.assert_container_state( + self.brain.nodes[2], 'sharding', 3) + self.assert_container_listing(obj_names) + + # make the un-cleaved shard update the root container... + self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges]) + shard_part, nodes = self.get_part_and_node_numbers(shard_ranges[2]) + self.sharders_once(additional_args='--partitions=%s' % shard_part) + shard_ranges = self.assert_container_state( + self.brain.nodes[2], 'sharding', 3) + # ...it does not report zero-stats despite being empty, because it has + # not yet reached CLEAVED state + self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges]) + + # second pass cleaves final shard self.sharders_once(additional_args='--partitions=%s' % self.brain.part) # Everybody's settled - self.assert_container_state(self.brain.nodes[0], 'sharded', 2) - self.assert_container_state(self.brain.nodes[1], 'sharded', 2) - self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_state(self.brain.nodes[0], 'sharded', 3) + self.assert_container_state(self.brain.nodes[1], 'sharded', 3) + shard_ranges = self.assert_container_state( + self.brain.nodes[2], 'sharded', 3) + self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges]) self.assert_container_listing(obj_names) def test_manage_shard_ranges_compact(self): diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 0ee97eafcc..e76565b125 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -1588,6 +1588,7 @@ class TestSharder(BaseTestSharder): # verify that objects are not missed if shard ranges change between # cleaving batches broker = self._make_broker() + # this root db has very few object rows... objects = [ ('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0), ('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0), @@ -1603,8 +1604,10 @@ class TestSharder(BaseTestSharder): broker.enable_sharding(Timestamp.now()) shard_bounds = (('', 'd'), ('d', 'x'), ('x', '')) + # shard ranges start life with object count that is typically much + # larger than this DB's object population... shard_ranges = self._make_shard_ranges( - shard_bounds, state=ShardRange.CREATED) + shard_bounds, state=ShardRange.CREATED, object_count=500000) expected_shard_dbs = [] for shard_range in shard_ranges: db_hash = hash_path(shard_range.account, shard_range.container) @@ -1634,8 +1637,8 @@ class TestSharder(BaseTestSharder): updated_shard_ranges = broker.get_shard_ranges() self.assertEqual(3, len(updated_shard_ranges)) - # first 2 shard ranges should have updated object count, bytes used and - # meta_timestamp + # now they have reached CLEAVED state, the first 2 shard ranges should + # have updated object count, bytes used and meta_timestamp shard_ranges[0].bytes_used = 23 shard_ranges[0].object_count = 4 shard_ranges[0].state = ShardRange.CLEAVED @@ -1646,6 +1649,11 @@ class TestSharder(BaseTestSharder): self._check_shard_range(shard_ranges[1], updated_shard_ranges[1]) self._check_objects(objects[:4], expected_shard_dbs[0]) self._check_objects(objects[4:7], expected_shard_dbs[1]) + # the actual object counts were set in the new shard brokers' own_sr's + shard_broker = ContainerBroker(expected_shard_dbs[0]) + self.assertEqual(4, shard_broker.get_own_shard_range().object_count) + shard_broker = ContainerBroker(expected_shard_dbs[1]) + self.assertEqual(2, shard_broker.get_own_shard_range().object_count) self.assertFalse(os.path.exists(expected_shard_dbs[2])) # third shard range should be unchanged - not yet cleaved @@ -5276,6 +5284,7 @@ class TestSharder(BaseTestSharder): def test_update_root_container_own_range(self): broker = self._make_broker() + obj_names = [] # nothing to send with self._mock_sharder() as sharder: @@ -5290,16 +5299,27 @@ class TestSharder(BaseTestSharder): broker.merge_shard_ranges([own_shard_range]) # add an object, expect to see it reflected in the own shard range # that is sent - broker.put_object(str(own_shard_range.object_count + 1), + obj_names.append(uuid4()) + broker.put_object(str(obj_names[-1]), next(self.ts_iter).internal, 1, '', '') with mock_timestamp_now() as now: - # force own shard range meta updates to be at fixed timestamp - expected_sent = [ - dict(own_shard_range, - meta_timestamp=now.internal, - object_count=own_shard_range.object_count + 1, - bytes_used=own_shard_range.bytes_used + 1)] + # check if the state if in SHARD_UPDATE_STAT_STATES + if state in [ShardRange.CLEAVED, ShardRange.ACTIVE, + ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK]: + exp_obj_count = len(obj_names) + expected_sent = [ + dict(own_shard_range, + meta_timestamp=now.internal, + object_count=len(obj_names), + bytes_used=len(obj_names))] + else: + exp_obj_count = own_shard_range.object_count + expected_sent = [ + dict(own_shard_range)] self.check_shard_ranges_sent(broker, expected_sent) + self.assertEqual( + exp_obj_count, broker.get_own_shard_range().object_count) # initialise tombstones with mock_timestamp_now(next(self.ts_iter)): @@ -5311,6 +5331,8 @@ class TestSharder(BaseTestSharder): with annotate_failure(state): check_only_own_shard_range_sent(state) + init_obj_count = len(obj_names) + def check_tombstones_sent(state): own_shard_range = broker.get_own_shard_range() self.assertTrue(own_shard_range.update_state( @@ -5318,19 +5340,25 @@ class TestSharder(BaseTestSharder): broker.merge_shard_ranges([own_shard_range]) # delete an object, expect to see it reflected in the own shard # range that is sent - broker.delete_object(str(own_shard_range.object_count), + broker.delete_object(str(obj_names.pop(-1)), next(self.ts_iter).internal) with mock_timestamp_now() as now: - # force own shard range meta updates to be at fixed timestamp - expected_sent = [ - dict(own_shard_range, - meta_timestamp=now.internal, - object_count=own_shard_range.object_count - 1, - bytes_used=own_shard_range.bytes_used - 1, - tombstones=own_shard_range.tombstones + 1)] + # check if the state if in SHARD_UPDATE_STAT_STATES + if state in [ShardRange.CLEAVED, ShardRange.ACTIVE, + ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK]: + expected_sent = [ + dict(own_shard_range, + meta_timestamp=now.internal, + object_count=len(obj_names), + bytes_used=len(obj_names), + tombstones=init_obj_count - len(obj_names))] + else: + expected_sent = [ + dict(own_shard_range)] self.check_shard_ranges_sent(broker, expected_sent) - for state in ShardRange.STATES: + for i, state in enumerate(ShardRange.STATES): with annotate_failure(state): check_tombstones_sent(state) @@ -5371,6 +5399,7 @@ class TestSharder(BaseTestSharder): other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', ''))) self.assertTrue(other_shard_ranges[0].set_deleted()) broker.merge_shard_ranges(other_shard_ranges) + obj_names = [] # own range missing - send nothing with self._mock_sharder() as sharder: @@ -5385,17 +5414,22 @@ class TestSharder(BaseTestSharder): broker.merge_shard_ranges([own_shard_range]) # add an object, expect to see it reflected in the own shard range # that is sent - broker.put_object(str(own_shard_range.object_count + 1), + obj_names.append(uuid4()) + broker.put_object(str(obj_names[-1]), next(self.ts_iter).internal, 1, '', '') with mock_timestamp_now() as now: shard_ranges = broker.get_shard_ranges(include_deleted=True) - expected_sent = sorted([ - own_shard_range.copy( - meta_timestamp=now.internal, - object_count=own_shard_range.object_count + 1, - bytes_used=own_shard_range.bytes_used + 1, - tombstones=0)] + - shard_ranges, + exp_own_shard_range = own_shard_range.copy() + # check if the state if in SHARD_UPDATE_STAT_STATES + if state in [ShardRange.CLEAVED, ShardRange.ACTIVE, + ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK]: + exp_own_shard_range.object_count = len(obj_names) + exp_own_shard_range.bytes_used = len(obj_names) + exp_own_shard_range.meta_timestamp = now.internal + exp_own_shard_range.tombstones = 0 + expected_sent = sorted( + [exp_own_shard_range] + shard_ranges, key=lambda sr: (sr.upper, sr.state, sr.lower)) self.check_shard_ranges_sent( broker, [dict(sr) for sr in expected_sent])