Sharder: add timing metrics for individual steps and total time spent.
Change-Id: Ie2a8e4eced6688e5a98aa37c3c7b0c13fd2ddeee
This commit is contained in:
parent
f99a6e5762
commit
744e9a94af
@ -2331,9 +2331,13 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
return
|
return
|
||||||
|
|
||||||
# now look and deal with misplaced objects.
|
# now look and deal with misplaced objects.
|
||||||
|
move_start_ts = time.time()
|
||||||
self._move_misplaced_objects(broker)
|
self._move_misplaced_objects(broker)
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.move_misplaced', move_start_ts)
|
||||||
|
|
||||||
is_leader = node['index'] == 0 and self.auto_shard and not is_deleted
|
is_leader = node['index'] == 0 and self.auto_shard and not is_deleted
|
||||||
|
|
||||||
if state in (UNSHARDED, COLLAPSED):
|
if state in (UNSHARDED, COLLAPSED):
|
||||||
if is_leader and broker.is_root_container():
|
if is_leader and broker.is_root_container():
|
||||||
# bootstrap sharding of root container
|
# bootstrap sharding of root container
|
||||||
@ -2348,11 +2352,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# container has been given shard ranges rather than
|
# container has been given shard ranges rather than
|
||||||
# found them e.g. via replication or a shrink event,
|
# found them e.g. via replication or a shrink event,
|
||||||
# or manually triggered cleaving.
|
# or manually triggered cleaving.
|
||||||
|
db_start_ts = time.time()
|
||||||
if broker.set_sharding_state():
|
if broker.set_sharding_state():
|
||||||
state = SHARDING
|
state = SHARDING
|
||||||
self.info(broker, 'Kick off container cleaving, '
|
self.info(broker, 'Kick off container cleaving, '
|
||||||
'own shard range in state %r',
|
'own shard range in state %r',
|
||||||
own_shard_range.state_text)
|
own_shard_range.state_text)
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.set_state', db_start_ts)
|
||||||
elif is_leader:
|
elif is_leader:
|
||||||
if broker.set_sharding_state():
|
if broker.set_sharding_state():
|
||||||
state = SHARDING
|
state = SHARDING
|
||||||
@ -2363,6 +2370,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
own_shard_range.state_text)
|
own_shard_range.state_text)
|
||||||
|
|
||||||
if state == SHARDING:
|
if state == SHARDING:
|
||||||
|
cleave_start_ts = time.time()
|
||||||
if is_leader:
|
if is_leader:
|
||||||
num_found = self._find_shard_ranges(broker)
|
num_found = self._find_shard_ranges(broker)
|
||||||
else:
|
else:
|
||||||
@ -2377,6 +2385,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
|
|
||||||
# always try to cleave any pending shard ranges
|
# always try to cleave any pending shard ranges
|
||||||
cleave_complete = self._cleave(broker)
|
cleave_complete = self._cleave(broker)
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.cleave', cleave_start_ts)
|
||||||
|
|
||||||
if cleave_complete:
|
if cleave_complete:
|
||||||
if self._complete_sharding(broker):
|
if self._complete_sharding(broker):
|
||||||
@ -2384,6 +2394,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
self._increment_stat('visited', 'completed', statsd=True)
|
self._increment_stat('visited', 'completed', statsd=True)
|
||||||
self.info(broker, 'Completed cleaving, DB set to sharded '
|
self.info(broker, 'Completed cleaving, DB set to sharded '
|
||||||
'state')
|
'state')
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.completed',
|
||||||
|
broker.get_own_shard_range().epoch)
|
||||||
else:
|
else:
|
||||||
self.info(broker, 'Completed cleaving, DB remaining in '
|
self.info(broker, 'Completed cleaving, DB remaining in '
|
||||||
'sharding state')
|
'sharding state')
|
||||||
@ -2391,6 +2404,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
if state == SHARDED and broker.is_root_container():
|
if state == SHARDED and broker.is_root_container():
|
||||||
# look for shrink stats
|
# look for shrink stats
|
||||||
|
send_start_ts = time.time()
|
||||||
self._identify_shrinking_candidate(broker, node)
|
self._identify_shrinking_candidate(broker, node)
|
||||||
if is_leader:
|
if is_leader:
|
||||||
self._find_and_enable_shrinking_candidates(broker)
|
self._find_and_enable_shrinking_candidates(broker)
|
||||||
@ -2400,6 +2414,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
self._send_shard_ranges(broker, shard_range.account,
|
self._send_shard_ranges(broker, shard_range.account,
|
||||||
shard_range.container,
|
shard_range.container,
|
||||||
[shard_range])
|
[shard_range])
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.send_sr', send_start_ts)
|
||||||
|
|
||||||
if not broker.is_root_container():
|
if not broker.is_root_container():
|
||||||
# Update the root container with this container's shard range
|
# Update the root container with this container's shard range
|
||||||
@ -2408,7 +2424,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# sharding a shard, this is when the root will see the new
|
# sharding a shard, this is when the root will see the new
|
||||||
# shards move to ACTIVE state and the sharded shard
|
# shards move to ACTIVE state and the sharded shard
|
||||||
# simultaneously become deleted.
|
# simultaneously become deleted.
|
||||||
|
update_start_ts = time.time()
|
||||||
self._update_root_container(broker)
|
self._update_root_container(broker)
|
||||||
|
self.logger.timing_since(
|
||||||
|
'sharder.sharding.update_root', update_start_ts)
|
||||||
|
|
||||||
self.debug(broker,
|
self.debug(broker,
|
||||||
'Finished processing, state %s%s',
|
'Finished processing, state %s%s',
|
||||||
|
@ -2466,6 +2466,16 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertEqual('', context.cursor)
|
self.assertEqual('', context.cursor)
|
||||||
self.assertEqual(10, context.cleave_to_row)
|
self.assertEqual(10, context.cleave_to_row)
|
||||||
self.assertEqual(12, context.max_row) # note that max row increased
|
self.assertEqual(12, context.max_row) # note that max row increased
|
||||||
|
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||||
|
self.assertEqual('sharder.sharding.move_misplaced',
|
||||||
|
self.logger.log_dict['timing_since'][-3][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.set_state',
|
||||||
|
self.logger.log_dict['timing_since'][-2][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.cleave',
|
||||||
|
self.logger.log_dict['timing_since'][-1][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0)
|
||||||
lines = sharder.logger.get_lines_for_level('info')
|
lines = sharder.logger.get_lines_for_level('info')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
["Kick off container cleaving, own shard range in state "
|
["Kick off container cleaving, own shard range in state "
|
||||||
@ -2511,6 +2521,80 @@ class TestSharder(BaseTestSharder):
|
|||||||
'Completed cleaving, DB set to sharded state, path: a/c, db: %s'
|
'Completed cleaving, DB set to sharded state, path: a/c, db: %s'
|
||||||
% broker.db_file, lines[1:])
|
% broker.db_file, lines[1:])
|
||||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||||
|
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||||
|
self.assertEqual('sharder.sharding.move_misplaced',
|
||||||
|
self.logger.log_dict['timing_since'][-4][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-4][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.cleave',
|
||||||
|
self.logger.log_dict['timing_since'][-3][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.completed',
|
||||||
|
self.logger.log_dict['timing_since'][-2][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.send_sr',
|
||||||
|
self.logger.log_dict['timing_since'][-1][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0)
|
||||||
|
|
||||||
|
def test_cleave_timing_metrics(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
objects = [{'name': 'obj_%03d' % i,
|
||||||
|
'created_at': Timestamp.now().normal,
|
||||||
|
'content_type': 'text/plain',
|
||||||
|
'etag': 'etag_%d' % i,
|
||||||
|
'size': 1024 * i,
|
||||||
|
'deleted': i % 2,
|
||||||
|
'storage_policy_index': 0,
|
||||||
|
} for i in range(1, 8)]
|
||||||
|
broker.merge_items([dict(obj) for obj in objects])
|
||||||
|
broker.enable_sharding(Timestamp.now())
|
||||||
|
shard_ranges = self._make_shard_ranges(
|
||||||
|
(('', 'obj_004'), ('obj_004', '')), state=ShardRange.CREATED)
|
||||||
|
expected_shard_dbs = []
|
||||||
|
for shard_range in shard_ranges:
|
||||||
|
db_hash = hash_path(shard_range.account, shard_range.container)
|
||||||
|
expected_shard_dbs.append(
|
||||||
|
os.path.join(self.tempdir, 'sda', 'containers', '0',
|
||||||
|
db_hash[-3:], db_hash, db_hash + '.db'))
|
||||||
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
|
self.assertTrue(broker.set_sharding_state())
|
||||||
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
|
'index': 0}
|
||||||
|
|
||||||
|
with self._mock_sharder() as sharder:
|
||||||
|
sharder._audit_container = mock.MagicMock()
|
||||||
|
sharder._process_broker(broker, node, 99)
|
||||||
|
|
||||||
|
lines = sharder.logger.get_lines_for_level('info')
|
||||||
|
self.assertEqual(
|
||||||
|
'Starting to cleave (2 todo), path: a/c, db: %s'
|
||||||
|
% broker.db_file, lines[0])
|
||||||
|
self.assertIn(
|
||||||
|
'Completed cleaving, DB set to sharded state, path: a/c, db: %s'
|
||||||
|
% broker.db_file, lines[1:])
|
||||||
|
|
||||||
|
self.assertTrue(self.logger.log_dict['timing_since'])
|
||||||
|
self.assertEqual('sharder.sharding.move_misplaced',
|
||||||
|
self.logger.log_dict['timing_since'][-4][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-4][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.cleave',
|
||||||
|
self.logger.log_dict['timing_since'][-3][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-3][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.completed',
|
||||||
|
self.logger.log_dict['timing_since'][-2][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-2][0][1], 0)
|
||||||
|
self.assertEqual('sharder.sharding.send_sr',
|
||||||
|
self.logger.log_dict['timing_since'][-1][0][0])
|
||||||
|
self.assertGreater(self.logger.log_dict['timing_since'][-1][0][1], 0)
|
||||||
|
|
||||||
|
# check shard ranges were updated to ACTIVE
|
||||||
|
self.assertEqual([ShardRange.ACTIVE] * 2,
|
||||||
|
[sr.state for sr in broker.get_shard_ranges()])
|
||||||
|
shard_broker = ContainerBroker(expected_shard_dbs[0])
|
||||||
|
actual_objects = shard_broker.get_objects()
|
||||||
|
self.assertEqual(objects[:4], actual_objects)
|
||||||
|
shard_broker = ContainerBroker(expected_shard_dbs[1])
|
||||||
|
actual_objects = shard_broker.get_objects()
|
||||||
|
self.assertEqual(objects[4:], actual_objects)
|
||||||
|
|
||||||
def test_cleave_multiple_storage_policies(self):
|
def test_cleave_multiple_storage_policies(self):
|
||||||
# verify that objects in all storage policies are cleaved
|
# verify that objects in all storage policies are cleaved
|
||||||
|
Loading…
Reference in New Issue
Block a user