Report final in_progress when sharding is complete

On every sharder cycle up update in progress recon stats for each sharding
container. However, we tend to not run it one final time once sharding
is complete because the DB state is changed to SHARDED and therefore the
in_progress stats never get their final update.
For those collecting this data to monitor, this makes sharding/cleaving shards
never complete.

This patch, adds a new option `recon_shared_timeout` which will now
allow sharded containers to be processed by `_record_sharding_progress()`
after they've finished sharding for an amount of time.

Change-Id: I5fa39d41f9cd3b211e45d2012fd709f4135f595e
This commit is contained in:
Matthew Oliver 2021-02-24 15:06:08 +11:00 committed by Alistair Coles
parent b1309c95e5
commit 1de9834816
5 changed files with 230 additions and 27 deletions

View File

@ -440,6 +440,17 @@ use = egg:swift#xprofile
# integer value. A negative value implies no limit.
# recon_candidates_limit = 5
#
# As the sharder visits each container that's currently sharding it dumps to
# recon their current progress. To be able to mark their progress as completed
# this in-progress check will need to monitor containers that have just
# completed sharding. The recon_sharded_timeout parameter says for how long a
# container whose just finished sharding should be checked by the in-progress
# check. This is to allow anything monitoring the sharding recon dump to have
# enough time to collate and see things complete. The time is capped at
# reclaim_age, so this parameter should be less than or equal to reclaim_age.
# The default is 12 hours (12 x 60 x 60)
# recon_sharded_timeout = 43200
#
# Large databases tend to take a while to work with, but we want to make sure
# we write down our progress. Use a larger-than-normal broker timeout to make
# us less likely to bomb out on a LockTimeout.

View File

@ -382,6 +382,8 @@ class ContainerSharder(ContainerReplicator):
self.sharding_candidates = []
self.recon_candidates_limit = int(
conf.get('recon_candidates_limit', 5))
self.recon_sharded_timeout = int(
conf.get('recon_sharded_timeout', 43200))
self.broker_timeout = config_positive_int_value(
conf.get('broker_timeout', 60))
replica_count = self.ring.replica_count
@ -493,9 +495,19 @@ class ContainerSharder(ContainerReplicator):
def _record_sharding_progress(self, broker, node, error):
own_shard_range = broker.get_own_shard_range()
if (broker.get_db_state() in (UNSHARDED, SHARDING) and
own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHARDED)):
db_state = broker.get_db_state()
if (db_state in (UNSHARDED, SHARDING, SHARDED)
and own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHARDED)):
if db_state == SHARDED:
context_ts = max([float(ts) for c, ts in
CleavingContext.load_all(broker)]) or None
if not context_ts or (context_ts + self.recon_sharded_timeout
< Timestamp.now().timestamp):
# not contexts or last context timestamp too old for the
# broker to be recorded
return
info = self._make_stats_info(broker, node, own_shard_range)
info['state'] = own_shard_range.state_text
info['db_state'] = broker.get_db_state()
@ -834,8 +846,11 @@ class ContainerSharder(ContainerReplicator):
def _audit_cleave_contexts(self, broker):
now = Timestamp.now()
for context, last_mod in CleavingContext.load_all(broker):
if Timestamp(last_mod).timestamp + self.reclaim_age < \
now.timestamp:
last_mod = Timestamp(last_mod)
is_done = context.done() and last_mod.timestamp + \
self.recon_sharded_timeout < now.timestamp
is_stale = last_mod.timestamp + self.reclaim_age < now.timestamp
if is_done or is_stale:
context.delete(broker)
def _audit_container(self, broker):
@ -1485,7 +1500,6 @@ class ContainerSharder(ContainerReplicator):
modified_shard_ranges.append(own_shard_range)
broker.merge_shard_ranges(modified_shard_ranges)
if broker.set_sharded_state():
cleaving_context.delete(broker)
return True
else:
self.logger.warning(

View File

@ -41,6 +41,7 @@ from test.probe.brain import BrainSplitter
from test.probe.common import ReplProbeTest, get_server_number, \
wait_for_server_to_hangup
from test.debug_logger import debug_logger
import mock
MIN_SHARD_CONTAINER_THRESHOLD = 4
@ -888,8 +889,10 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
[sr['state'] for sr in orig_root_shard_ranges])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
# Contexts should still be there, and should be complete
contexts = set([ctx.done()
for ctx, _ in CleavingContext.load_all(broker)])
self.assertEqual({True}, contexts)
self.direct_delete_container(expect_failure=True)
self.assertLengthEqual(found['normal_dbs'], 2)
@ -939,9 +942,10 @@ class TestContainerSharding(BaseTestContainerSharding):
orig['state_timestamp'])
self.assertGreaterEqual(updated.meta_timestamp,
orig['meta_timestamp'])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
# Contexts should still be there, and should be complete
contexts = set([ctx.done()
for ctx, _ in CleavingContext.load_all(broker)])
self.assertEqual({True}, contexts)
# Check that entire listing is available
headers, actual_listing = self.assert_container_listing(obj_names)
@ -1197,9 +1201,11 @@ class TestContainerSharding(BaseTestContainerSharding):
[ShardRange.ACTIVE] * 3,
[sr.state for sr in broker.get_shard_ranges()])
# Make sure our cleaving contexts got cleaned up
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts)
# Contexts should still be there, and should be complete
contexts = set([ctx.done()
for ctx, _
in CleavingContext.load_all(broker)])
self.assertEqual({True}, contexts)
# check root shard ranges
root_shard_ranges = self.direct_get_container_shard_ranges()
@ -2021,6 +2027,26 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
[sr.state for sr in shard_ranges])
# Check the current progress. It shouldn't be complete.
recon = direct_client.direct_get_recon(leader_node, "sharding")
expected_in_progress = {'all': [{'account': 'AUTH_test',
'active': 0,
'cleaved': 2,
'created': 2,
'found': 0,
'db_state': 'sharding',
'state': 'sharding',
'error': None,
'file_size': mock.ANY,
'meta_timestamp': mock.ANY,
'node_index': 0,
'object_count': len(obj_names),
'container': mock.ANY,
'path': mock.ANY,
'root': mock.ANY}]}
actual = recon['sharding_stats']['sharding']['sharding_in_progress']
self.assertEqual(expected_in_progress, actual)
# stop *all* container servers for third shard range
sr_part, sr_node_nums = self.get_part_and_node_numbers(shard_ranges[2])
for node_num in sr_node_nums:
@ -2078,6 +2104,26 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual([ShardRange.ACTIVE] * 4,
[sr.state for sr in shard_ranges])
# Check the leader's progress again, this time is should be complete
recon = direct_client.direct_get_recon(leader_node, "sharding")
expected_in_progress = {'all': [{'account': 'AUTH_test',
'active': 4,
'cleaved': 0,
'created': 0,
'found': 0,
'db_state': 'sharded',
'state': 'sharded',
'error': None,
'file_size': mock.ANY,
'meta_timestamp': mock.ANY,
'node_index': 0,
'object_count': len(obj_names),
'container': mock.ANY,
'path': mock.ANY,
'root': mock.ANY}]}
actual = recon['sharding_stats']['sharding']['sharding_in_progress']
self.assertEqual(expected_in_progress, actual)
def test_sharded_delete(self):
all_obj_names = self._make_object_names(self.max_shard_size)
self.put_objects(all_obj_names)

View File

@ -167,6 +167,7 @@ class TestSharder(BaseTestSharder):
'shards_account_prefix': '.shards_',
'auto_shard': False,
'recon_candidates_limit': 5,
'recon_sharded_timeout': 43200,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2
}
@ -198,6 +199,7 @@ class TestSharder(BaseTestSharder):
'auto_create_account_prefix': '...',
'auto_shard': 'yes',
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
}
@ -220,6 +222,7 @@ class TestSharder(BaseTestSharder):
'shards_account_prefix': '...shards_',
'auto_shard': True,
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
}
@ -685,6 +688,114 @@ class TestSharder(BaseTestSharder):
expected_candidate_stats, sharder, 'sharding_candidates')
self._assert_recon_stats(None, sharder, 'sharding_progress')
# let's progress broker 1 (broker[0])
brokers[0].enable_sharding(next(self.ts_iter))
brokers[0].set_sharding_state()
shard_ranges = brokers[0].get_shard_ranges()
for sr in shard_ranges[:-1]:
sr.update_state(ShardRange.CLEAVED)
brokers[0].merge_shard_ranges(shard_ranges)
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 1, 'created': 0, 'cleaved': 3, 'active': 1,
'state': 'sharding', 'db_state': 'sharding',
'error': None},
{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# Now complete sharding broker 1.
shard_ranges[-1].update_state(ShardRange.CLEAVED)
own_sr = brokers[0].get_own_shard_range()
own_sr.update_state(ShardRange.SHARDED)
brokers[0].merge_shard_ranges(shard_ranges + [own_sr])
# make and complete a cleave context, this is used for the
# recon_sharded_timeout timer.
cxt = CleavingContext.load(brokers[0])
cxt.misplaced_done = cxt.cleaving_done = True
ts_now = next(self.ts_iter)
with mock_timestamp_now(ts_now):
cxt.store(brokers[0])
self.assertTrue(brokers[0].set_sharded_state())
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 0, 'created': 0, 'cleaved': 4, 'active': 1,
'state': 'sharded', 'db_state': 'sharded',
'error': None},
{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# one more cycle at recon_sharded_timeout seconds into the
# future to check that the completed broker is still reported
ts_now = Timestamp(ts_now.timestamp +
sharder.recon_sharded_timeout)
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# when we move recon_sharded_timeout + 1 seconds into the future,
# broker 1 will be removed from the progress report
ts_now = Timestamp(ts_now.timestamp +
sharder.recon_sharded_timeout + 1)
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
def test_ratelimited_roundrobin(self):
n_databases = 100
@ -5357,21 +5468,21 @@ class TestSharder(BaseTestSharder):
def test_audit_cleave_contexts(self):
def add_cleave_context(id, last_modified):
def add_cleave_context(id, last_modified, cleaving_done):
params = {'ref': id,
'cursor': 'curs',
'max_row': 2,
'cleave_to_row': 2,
'last_cleave_to_row': 1,
'cleaving_done': False,
'cleaving_done': cleaving_done,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
with mock_timestamp_now(Timestamp(last_modified)):
with mock_timestamp_now(last_modified):
broker.update_metadata(
{key: (json.dumps(params),
Timestamp(last_modified).internal)})
last_modified.internal)})
def get_context(id, broker):
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
@ -5380,6 +5491,7 @@ class TestSharder(BaseTestSharder):
return data
reclaim_age = 100
recon_sharded_timeout = 50
broker = self._make_broker()
# sanity check
@ -5387,25 +5499,43 @@ class TestSharder(BaseTestSharder):
self.assertEqual(UNSHARDED, broker.get_db_state())
# Setup some cleaving contexts
id_old, id_newish = [str(uuid4()) for _ in range(2)]
contexts = ((id_old, 1),
(id_newish, reclaim_age // 2))
for id, last_modified in contexts:
add_cleave_context(id, last_modified)
id_old, id_newish, id_complete = [str(uuid4()) for _ in range(3)]
ts_old, ts_newish, ts_complete = (
Timestamp(1),
Timestamp(reclaim_age // 2),
Timestamp(reclaim_age - recon_sharded_timeout))
contexts = ((id_old, ts_old, False),
(id_newish, ts_newish, False),
(id_complete, ts_complete, True))
for id, last_modified, cleaving_done in contexts:
add_cleave_context(id, last_modified, cleaving_done)
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
sharder_conf = {'reclaim_age': str(reclaim_age),
'recon_sharded_timeout': str(recon_sharded_timeout)}
with self._mock_sharder(sharder_conf) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
sharder._audit_cleave_contexts(broker)
# old context is stale, ie last modified reached reclaim_age and was
# never completed (done).
old_ctx = get_context(id_old, broker)
self.assertEqual(old_ctx, "")
# Newish context is almost stale, as in it's been 1/2 reclaim age since
# it was last modified yet it's not completed. So it haven't been
# cleaned up.
newish_ctx = get_context(id_newish, broker)
self.assertEqual(newish_ctx.ref, id_newish)
# If we push time another reclaim age later, and they all be removed
# minus id_missing_lm as it has a later last_modified.
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
# Complete context is complete (done) and it's been
# recon_sharded_timeout time since it was marked completed so it's
# been removed
complete_ctx = get_context(id_complete, broker)
self.assertEqual(complete_ctx, "")
# If we push time another reclaim age later, they are all removed
with self._mock_sharder(sharder_conf) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
sharder._audit_cleave_contexts(broker)
@ -5561,6 +5691,7 @@ class TestCleavingContext(BaseTestSharder):
# Now let's delete it. When deleted the metadata key will exist, but
# the value will be "" as this means it'll be reaped later.
ctx.delete(broker)
sysmeta = broker.get_sharding_sysmeta()
for key, val in sysmeta.items():
if key == "Context-%s" % db_id:

View File

@ -40,3 +40,4 @@
- nose
- pyeclib
- python-swiftclient
- mock