Merge "Sharding: No stat updates before CLEAVED state"
This commit is contained in:
commit
247c17b60c
@ -61,6 +61,12 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
|||||||
ShardRange.ACTIVE, ShardRange.SHARDING,
|
ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||||
ShardRange.SHARDED, ShardRange.SHRINKING,
|
ShardRange.SHARDED, ShardRange.SHRINKING,
|
||||||
ShardRange.SHRUNK]
|
ShardRange.SHRUNK]
|
||||||
|
# shard's may not be fully populated while in the FOUND and CREATED
|
||||||
|
# state, so shards should only update their own shard range's object
|
||||||
|
# stats when they are in the following states
|
||||||
|
SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||||
|
ShardRange.SHARDING, ShardRange.SHARDED,
|
||||||
|
ShardRange.SHRINKING, ShardRange.SHRUNK]
|
||||||
|
|
||||||
# attribute names in order used when transforming shard ranges from dicts to
|
# attribute names in order used when transforming shard ranges from dicts to
|
||||||
# tuples and vice-versa
|
# tuples and vice-versa
|
||||||
@ -2351,10 +2357,14 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
# object_count
|
# object_count
|
||||||
shard_size = object_count - progress
|
shard_size = object_count - progress
|
||||||
|
|
||||||
# NB shard ranges are created with a non-zero object count so that
|
# NB shard ranges are created with a non-zero object count for a
|
||||||
# the apparent container object count remains constant, and the
|
# few reasons:
|
||||||
# container is non-deletable while shards have been found but not
|
# 1. so that the apparent container object count remains
|
||||||
# yet cleaved
|
# consistent;
|
||||||
|
# 2. the container is non-deletable while shards have been found
|
||||||
|
# but not yet cleaved; and
|
||||||
|
# 3. So we have a rough idea of size of the shards should be
|
||||||
|
# while cleaving.
|
||||||
found_ranges.append(
|
found_ranges.append(
|
||||||
{'index': index,
|
{'index': index,
|
||||||
'lower': str(last_shard_upper),
|
'lower': str(last_shard_upper),
|
||||||
|
@ -40,7 +40,7 @@ from swift.common.utils import get_logger, config_true_value, \
|
|||||||
Everything, config_auto_int_value, ShardRangeList, config_percent_value
|
Everything, config_auto_int_value, ShardRangeList, config_percent_value
|
||||||
from swift.container.backend import ContainerBroker, \
|
from swift.container.backend import ContainerBroker, \
|
||||||
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
|
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
|
||||||
SHARD_UPDATE_STATES, sift_shard_ranges
|
SHARD_UPDATE_STATES, sift_shard_ranges, SHARD_UPDATE_STAT_STATES
|
||||||
from swift.container.replicator import ContainerReplicator
|
from swift.container.replicator import ContainerReplicator
|
||||||
|
|
||||||
|
|
||||||
@ -2260,20 +2260,29 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
if not own_shard_range:
|
if not own_shard_range:
|
||||||
return
|
return
|
||||||
|
|
||||||
# do a reclaim *now* in order to get best estimate of tombstone count
|
# Don't update the osr stats including tombstones unless its CLEAVED+
|
||||||
# that is consistent with the current object_count
|
if own_shard_range.state in SHARD_UPDATE_STAT_STATES:
|
||||||
reclaimer = self._reclaim(broker)
|
# do a reclaim *now* in order to get best estimate of tombstone
|
||||||
tombstones = reclaimer.get_tombstone_count()
|
# count that is consistent with the current object_count
|
||||||
self.logger.debug('tombstones in %s = %d',
|
reclaimer = self._reclaim(broker)
|
||||||
quote(broker.path), tombstones)
|
tombstones = reclaimer.get_tombstone_count()
|
||||||
own_shard_range.update_tombstones(tombstones)
|
self.logger.debug('tombstones in %s = %d',
|
||||||
update_own_shard_range_stats(broker, own_shard_range)
|
quote(broker.path), tombstones)
|
||||||
|
# shrinking candidates are found in the root DB so that's the only
|
||||||
|
# place we need up to date tombstone stats.
|
||||||
|
own_shard_range.update_tombstones(tombstones)
|
||||||
|
update_own_shard_range_stats(broker, own_shard_range)
|
||||||
|
|
||||||
|
if not own_shard_range.reported:
|
||||||
|
broker.merge_shard_ranges(own_shard_range)
|
||||||
|
|
||||||
|
# we can't use `state not in SHARD_UPDATE_STAT_STATES` to return
|
||||||
|
# because there are cases we still want to update root even if the
|
||||||
|
# stats are wrong. Such as it's a new shard or something else has
|
||||||
|
# decided to remove the latch to update root.
|
||||||
if own_shard_range.reported:
|
if own_shard_range.reported:
|
||||||
# no change to the stats metadata
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# stats metadata has been updated so persist it
|
|
||||||
broker.merge_shard_ranges(own_shard_range)
|
|
||||||
# now get a consistent list of own and other shard ranges
|
# now get a consistent list of own and other shard ranges
|
||||||
shard_ranges = broker.get_shard_ranges(
|
shard_ranges = broker.get_shard_ranges(
|
||||||
include_own=True,
|
include_own=True,
|
||||||
|
@ -2937,7 +2937,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||||||
self.sharders.once(**kwargs)
|
self.sharders.once(**kwargs)
|
||||||
|
|
||||||
def test_manage_shard_ranges(self):
|
def test_manage_shard_ranges(self):
|
||||||
obj_names = self._make_object_names(7)
|
obj_names = self._make_object_names(10)
|
||||||
self.put_objects(obj_names)
|
self.put_objects(obj_names)
|
||||||
|
|
||||||
client.post_container(self.url, self.admin_token, self.container_name,
|
client.post_container(self.url, self.admin_token, self.container_name,
|
||||||
@ -2956,17 +2956,38 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||||||
'swift-manage-shard-ranges',
|
'swift-manage-shard-ranges',
|
||||||
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
self.get_db_file(self.brain.part, self.brain.nodes[0]),
|
||||||
'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
|
'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
|
||||||
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
|
self.assert_container_state(self.brain.nodes[0], 'unsharded', 3)
|
||||||
|
|
||||||
# "Run container-replicator to replicate them to other nodes."
|
# "Run container-replicator to replicate them to other nodes."
|
||||||
self.replicators.once()
|
self.replicators.once()
|
||||||
# "Run container-sharder on all nodes to shard the container."
|
# "Run container-sharder on all nodes to shard the container."
|
||||||
|
# first pass cleaves 2 shards
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
self.assert_container_state(self.brain.nodes[0], 'sharding', 3)
|
||||||
|
self.assert_container_state(self.brain.nodes[1], 'sharding', 3)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[2], 'sharding', 3)
|
||||||
|
self.assert_container_listing(obj_names)
|
||||||
|
|
||||||
|
# make the un-cleaved shard update the root container...
|
||||||
|
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
|
||||||
|
shard_part, nodes = self.get_part_and_node_numbers(shard_ranges[2])
|
||||||
|
self.sharders_once(additional_args='--partitions=%s' % shard_part)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[2], 'sharding', 3)
|
||||||
|
# ...it does not report zero-stats despite being empty, because it has
|
||||||
|
# not yet reached CLEAVED state
|
||||||
|
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
|
||||||
|
|
||||||
|
# second pass cleaves final shard
|
||||||
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
# Everybody's settled
|
# Everybody's settled
|
||||||
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
|
self.assert_container_state(self.brain.nodes[0], 'sharded', 3)
|
||||||
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
|
self.assert_container_state(self.brain.nodes[1], 'sharded', 3)
|
||||||
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
shard_ranges = self.assert_container_state(
|
||||||
|
self.brain.nodes[2], 'sharded', 3)
|
||||||
|
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
|
||||||
self.assert_container_listing(obj_names)
|
self.assert_container_listing(obj_names)
|
||||||
|
|
||||||
def test_manage_shard_ranges_compact(self):
|
def test_manage_shard_ranges_compact(self):
|
||||||
|
@ -1588,6 +1588,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
# verify that objects are not missed if shard ranges change between
|
# verify that objects are not missed if shard ranges change between
|
||||||
# cleaving batches
|
# cleaving batches
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
|
# this root db has very few object rows...
|
||||||
objects = [
|
objects = [
|
||||||
('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0),
|
('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0),
|
||||||
('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0),
|
('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0),
|
||||||
@ -1603,8 +1604,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
broker.enable_sharding(Timestamp.now())
|
broker.enable_sharding(Timestamp.now())
|
||||||
|
|
||||||
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
|
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
|
||||||
|
# shard ranges start life with object count that is typically much
|
||||||
|
# larger than this DB's object population...
|
||||||
shard_ranges = self._make_shard_ranges(
|
shard_ranges = self._make_shard_ranges(
|
||||||
shard_bounds, state=ShardRange.CREATED)
|
shard_bounds, state=ShardRange.CREATED, object_count=500000)
|
||||||
expected_shard_dbs = []
|
expected_shard_dbs = []
|
||||||
for shard_range in shard_ranges:
|
for shard_range in shard_ranges:
|
||||||
db_hash = hash_path(shard_range.account, shard_range.container)
|
db_hash = hash_path(shard_range.account, shard_range.container)
|
||||||
@ -1634,8 +1637,8 @@ class TestSharder(BaseTestSharder):
|
|||||||
updated_shard_ranges = broker.get_shard_ranges()
|
updated_shard_ranges = broker.get_shard_ranges()
|
||||||
self.assertEqual(3, len(updated_shard_ranges))
|
self.assertEqual(3, len(updated_shard_ranges))
|
||||||
|
|
||||||
# first 2 shard ranges should have updated object count, bytes used and
|
# now they have reached CLEAVED state, the first 2 shard ranges should
|
||||||
# meta_timestamp
|
# have updated object count, bytes used and meta_timestamp
|
||||||
shard_ranges[0].bytes_used = 23
|
shard_ranges[0].bytes_used = 23
|
||||||
shard_ranges[0].object_count = 4
|
shard_ranges[0].object_count = 4
|
||||||
shard_ranges[0].state = ShardRange.CLEAVED
|
shard_ranges[0].state = ShardRange.CLEAVED
|
||||||
@ -1646,6 +1649,11 @@ class TestSharder(BaseTestSharder):
|
|||||||
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
|
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
|
||||||
self._check_objects(objects[:4], expected_shard_dbs[0])
|
self._check_objects(objects[:4], expected_shard_dbs[0])
|
||||||
self._check_objects(objects[4:7], expected_shard_dbs[1])
|
self._check_objects(objects[4:7], expected_shard_dbs[1])
|
||||||
|
# the actual object counts were set in the new shard brokers' own_sr's
|
||||||
|
shard_broker = ContainerBroker(expected_shard_dbs[0])
|
||||||
|
self.assertEqual(4, shard_broker.get_own_shard_range().object_count)
|
||||||
|
shard_broker = ContainerBroker(expected_shard_dbs[1])
|
||||||
|
self.assertEqual(2, shard_broker.get_own_shard_range().object_count)
|
||||||
self.assertFalse(os.path.exists(expected_shard_dbs[2]))
|
self.assertFalse(os.path.exists(expected_shard_dbs[2]))
|
||||||
|
|
||||||
# third shard range should be unchanged - not yet cleaved
|
# third shard range should be unchanged - not yet cleaved
|
||||||
@ -5276,6 +5284,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
|
|
||||||
def test_update_root_container_own_range(self):
|
def test_update_root_container_own_range(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
|
obj_names = []
|
||||||
|
|
||||||
# nothing to send
|
# nothing to send
|
||||||
with self._mock_sharder() as sharder:
|
with self._mock_sharder() as sharder:
|
||||||
@ -5290,16 +5299,27 @@ class TestSharder(BaseTestSharder):
|
|||||||
broker.merge_shard_ranges([own_shard_range])
|
broker.merge_shard_ranges([own_shard_range])
|
||||||
# add an object, expect to see it reflected in the own shard range
|
# add an object, expect to see it reflected in the own shard range
|
||||||
# that is sent
|
# that is sent
|
||||||
broker.put_object(str(own_shard_range.object_count + 1),
|
obj_names.append(uuid4())
|
||||||
|
broker.put_object(str(obj_names[-1]),
|
||||||
next(self.ts_iter).internal, 1, '', '')
|
next(self.ts_iter).internal, 1, '', '')
|
||||||
with mock_timestamp_now() as now:
|
with mock_timestamp_now() as now:
|
||||||
# force own shard range meta updates to be at fixed timestamp
|
# check if the state if in SHARD_UPDATE_STAT_STATES
|
||||||
expected_sent = [
|
if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||||
dict(own_shard_range,
|
ShardRange.SHARDING, ShardRange.SHARDED,
|
||||||
meta_timestamp=now.internal,
|
ShardRange.SHRINKING, ShardRange.SHRUNK]:
|
||||||
object_count=own_shard_range.object_count + 1,
|
exp_obj_count = len(obj_names)
|
||||||
bytes_used=own_shard_range.bytes_used + 1)]
|
expected_sent = [
|
||||||
|
dict(own_shard_range,
|
||||||
|
meta_timestamp=now.internal,
|
||||||
|
object_count=len(obj_names),
|
||||||
|
bytes_used=len(obj_names))]
|
||||||
|
else:
|
||||||
|
exp_obj_count = own_shard_range.object_count
|
||||||
|
expected_sent = [
|
||||||
|
dict(own_shard_range)]
|
||||||
self.check_shard_ranges_sent(broker, expected_sent)
|
self.check_shard_ranges_sent(broker, expected_sent)
|
||||||
|
self.assertEqual(
|
||||||
|
exp_obj_count, broker.get_own_shard_range().object_count)
|
||||||
|
|
||||||
# initialise tombstones
|
# initialise tombstones
|
||||||
with mock_timestamp_now(next(self.ts_iter)):
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
@ -5311,6 +5331,8 @@ class TestSharder(BaseTestSharder):
|
|||||||
with annotate_failure(state):
|
with annotate_failure(state):
|
||||||
check_only_own_shard_range_sent(state)
|
check_only_own_shard_range_sent(state)
|
||||||
|
|
||||||
|
init_obj_count = len(obj_names)
|
||||||
|
|
||||||
def check_tombstones_sent(state):
|
def check_tombstones_sent(state):
|
||||||
own_shard_range = broker.get_own_shard_range()
|
own_shard_range = broker.get_own_shard_range()
|
||||||
self.assertTrue(own_shard_range.update_state(
|
self.assertTrue(own_shard_range.update_state(
|
||||||
@ -5318,19 +5340,25 @@ class TestSharder(BaseTestSharder):
|
|||||||
broker.merge_shard_ranges([own_shard_range])
|
broker.merge_shard_ranges([own_shard_range])
|
||||||
# delete an object, expect to see it reflected in the own shard
|
# delete an object, expect to see it reflected in the own shard
|
||||||
# range that is sent
|
# range that is sent
|
||||||
broker.delete_object(str(own_shard_range.object_count),
|
broker.delete_object(str(obj_names.pop(-1)),
|
||||||
next(self.ts_iter).internal)
|
next(self.ts_iter).internal)
|
||||||
with mock_timestamp_now() as now:
|
with mock_timestamp_now() as now:
|
||||||
# force own shard range meta updates to be at fixed timestamp
|
# check if the state if in SHARD_UPDATE_STAT_STATES
|
||||||
expected_sent = [
|
if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||||
dict(own_shard_range,
|
ShardRange.SHARDING, ShardRange.SHARDED,
|
||||||
meta_timestamp=now.internal,
|
ShardRange.SHRINKING, ShardRange.SHRUNK]:
|
||||||
object_count=own_shard_range.object_count - 1,
|
expected_sent = [
|
||||||
bytes_used=own_shard_range.bytes_used - 1,
|
dict(own_shard_range,
|
||||||
tombstones=own_shard_range.tombstones + 1)]
|
meta_timestamp=now.internal,
|
||||||
|
object_count=len(obj_names),
|
||||||
|
bytes_used=len(obj_names),
|
||||||
|
tombstones=init_obj_count - len(obj_names))]
|
||||||
|
else:
|
||||||
|
expected_sent = [
|
||||||
|
dict(own_shard_range)]
|
||||||
self.check_shard_ranges_sent(broker, expected_sent)
|
self.check_shard_ranges_sent(broker, expected_sent)
|
||||||
|
|
||||||
for state in ShardRange.STATES:
|
for i, state in enumerate(ShardRange.STATES):
|
||||||
with annotate_failure(state):
|
with annotate_failure(state):
|
||||||
check_tombstones_sent(state)
|
check_tombstones_sent(state)
|
||||||
|
|
||||||
@ -5371,6 +5399,7 @@ class TestSharder(BaseTestSharder):
|
|||||||
other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))
|
other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))
|
||||||
self.assertTrue(other_shard_ranges[0].set_deleted())
|
self.assertTrue(other_shard_ranges[0].set_deleted())
|
||||||
broker.merge_shard_ranges(other_shard_ranges)
|
broker.merge_shard_ranges(other_shard_ranges)
|
||||||
|
obj_names = []
|
||||||
|
|
||||||
# own range missing - send nothing
|
# own range missing - send nothing
|
||||||
with self._mock_sharder() as sharder:
|
with self._mock_sharder() as sharder:
|
||||||
@ -5385,17 +5414,22 @@ class TestSharder(BaseTestSharder):
|
|||||||
broker.merge_shard_ranges([own_shard_range])
|
broker.merge_shard_ranges([own_shard_range])
|
||||||
# add an object, expect to see it reflected in the own shard range
|
# add an object, expect to see it reflected in the own shard range
|
||||||
# that is sent
|
# that is sent
|
||||||
broker.put_object(str(own_shard_range.object_count + 1),
|
obj_names.append(uuid4())
|
||||||
|
broker.put_object(str(obj_names[-1]),
|
||||||
next(self.ts_iter).internal, 1, '', '')
|
next(self.ts_iter).internal, 1, '', '')
|
||||||
with mock_timestamp_now() as now:
|
with mock_timestamp_now() as now:
|
||||||
shard_ranges = broker.get_shard_ranges(include_deleted=True)
|
shard_ranges = broker.get_shard_ranges(include_deleted=True)
|
||||||
expected_sent = sorted([
|
exp_own_shard_range = own_shard_range.copy()
|
||||||
own_shard_range.copy(
|
# check if the state if in SHARD_UPDATE_STAT_STATES
|
||||||
meta_timestamp=now.internal,
|
if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||||
object_count=own_shard_range.object_count + 1,
|
ShardRange.SHARDING, ShardRange.SHARDED,
|
||||||
bytes_used=own_shard_range.bytes_used + 1,
|
ShardRange.SHRINKING, ShardRange.SHRUNK]:
|
||||||
tombstones=0)] +
|
exp_own_shard_range.object_count = len(obj_names)
|
||||||
shard_ranges,
|
exp_own_shard_range.bytes_used = len(obj_names)
|
||||||
|
exp_own_shard_range.meta_timestamp = now.internal
|
||||||
|
exp_own_shard_range.tombstones = 0
|
||||||
|
expected_sent = sorted(
|
||||||
|
[exp_own_shard_range] + shard_ranges,
|
||||||
key=lambda sr: (sr.upper, sr.state, sr.lower))
|
key=lambda sr: (sr.upper, sr.state, sr.lower))
|
||||||
self.check_shard_ranges_sent(
|
self.check_shard_ranges_sent(
|
||||||
broker, [dict(sr) for sr in expected_sent])
|
broker, [dict(sr) for sr in expected_sent])
|
||||||
|
Loading…
Reference in New Issue
Block a user