Merge "Report final in_progress when sharding is complete"
This commit is contained in:
commit
5c3eb488f2
@ -440,6 +440,17 @@ use = egg:swift#xprofile
|
||||
# integer value. A negative value implies no limit.
|
||||
# recon_candidates_limit = 5
|
||||
#
|
||||
# As the sharder visits each container that's currently sharding it dumps to
|
||||
# recon their current progress. To be able to mark their progress as completed
|
||||
# this in-progress check will need to monitor containers that have just
|
||||
# completed sharding. The recon_sharded_timeout parameter says for how long a
|
||||
# container whose just finished sharding should be checked by the in-progress
|
||||
# check. This is to allow anything monitoring the sharding recon dump to have
|
||||
# enough time to collate and see things complete. The time is capped at
|
||||
# reclaim_age, so this parameter should be less than or equal to reclaim_age.
|
||||
# The default is 12 hours (12 x 60 x 60)
|
||||
# recon_sharded_timeout = 43200
|
||||
#
|
||||
# Large databases tend to take a while to work with, but we want to make sure
|
||||
# we write down our progress. Use a larger-than-normal broker timeout to make
|
||||
# us less likely to bomb out on a LockTimeout.
|
||||
|
@ -518,6 +518,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.shrinking_candidates = []
|
||||
self.recon_candidates_limit = int(
|
||||
conf.get('recon_candidates_limit', 5))
|
||||
self.recon_sharded_timeout = int(
|
||||
conf.get('recon_sharded_timeout', 43200))
|
||||
self.broker_timeout = config_positive_int_value(
|
||||
conf.get('broker_timeout', 60))
|
||||
replica_count = self.ring.replica_count
|
||||
@ -645,9 +647,19 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
def _record_sharding_progress(self, broker, node, error):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if (broker.get_db_state() in (UNSHARDED, SHARDING) and
|
||||
own_shard_range.state in (ShardRange.SHARDING,
|
||||
ShardRange.SHARDED)):
|
||||
db_state = broker.get_db_state()
|
||||
if (db_state in (UNSHARDED, SHARDING, SHARDED)
|
||||
and own_shard_range.state in (ShardRange.SHARDING,
|
||||
ShardRange.SHARDED)):
|
||||
if db_state == SHARDED:
|
||||
context_ts = max([float(ts) for c, ts in
|
||||
CleavingContext.load_all(broker)]) or None
|
||||
if not context_ts or (context_ts + self.recon_sharded_timeout
|
||||
< Timestamp.now().timestamp):
|
||||
# not contexts or last context timestamp too old for the
|
||||
# broker to be recorded
|
||||
return
|
||||
|
||||
info = self._make_stats_info(broker, node, own_shard_range)
|
||||
info['state'] = own_shard_range.state_text
|
||||
info['db_state'] = broker.get_db_state()
|
||||
@ -995,8 +1007,11 @@ class ContainerSharder(ContainerReplicator):
|
||||
def _audit_cleave_contexts(self, broker):
|
||||
now = Timestamp.now()
|
||||
for context, last_mod in CleavingContext.load_all(broker):
|
||||
if Timestamp(last_mod).timestamp + self.reclaim_age < \
|
||||
now.timestamp:
|
||||
last_mod = Timestamp(last_mod)
|
||||
is_done = context.done() and last_mod.timestamp + \
|
||||
self.recon_sharded_timeout < now.timestamp
|
||||
is_stale = last_mod.timestamp + self.reclaim_age < now.timestamp
|
||||
if is_done or is_stale:
|
||||
context.delete(broker)
|
||||
|
||||
def _audit_container(self, broker):
|
||||
@ -1646,7 +1661,6 @@ class ContainerSharder(ContainerReplicator):
|
||||
modified_shard_ranges.append(own_shard_range)
|
||||
broker.merge_shard_ranges(modified_shard_ranges)
|
||||
if broker.set_sharded_state():
|
||||
cleaving_context.delete(broker)
|
||||
return True
|
||||
else:
|
||||
self.logger.warning(
|
||||
|
@ -41,6 +41,7 @@ from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest, get_server_number, \
|
||||
wait_for_server_to_hangup
|
||||
from test.debug_logger import debug_logger
|
||||
import mock
|
||||
|
||||
|
||||
MIN_SHARD_CONTAINER_THRESHOLD = 4
|
||||
@ -888,8 +889,10 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
|
||||
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
|
||||
[sr['state'] for sr in orig_root_shard_ranges])
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
# Contexts should still be there, and should be complete
|
||||
contexts = set([ctx.done()
|
||||
for ctx, _ in CleavingContext.load_all(broker)])
|
||||
self.assertEqual({True}, contexts)
|
||||
self.direct_delete_container(expect_failure=True)
|
||||
|
||||
self.assertLengthEqual(found['normal_dbs'], 2)
|
||||
@ -939,9 +942,10 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
orig['state_timestamp'])
|
||||
self.assertGreaterEqual(updated.meta_timestamp,
|
||||
orig['meta_timestamp'])
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
# Contexts should still be there, and should be complete
|
||||
contexts = set([ctx.done()
|
||||
for ctx, _ in CleavingContext.load_all(broker)])
|
||||
self.assertEqual({True}, contexts)
|
||||
|
||||
# Check that entire listing is available
|
||||
headers, actual_listing = self.assert_container_listing(obj_names)
|
||||
@ -1197,9 +1201,11 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
[ShardRange.ACTIVE] * 3,
|
||||
[sr.state for sr in broker.get_shard_ranges()])
|
||||
|
||||
# Make sure our cleaving contexts got cleaned up
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts)
|
||||
# Contexts should still be there, and should be complete
|
||||
contexts = set([ctx.done()
|
||||
for ctx, _
|
||||
in CleavingContext.load_all(broker)])
|
||||
self.assertEqual({True}, contexts)
|
||||
|
||||
# check root shard ranges
|
||||
root_shard_ranges = self.direct_get_container_shard_ranges()
|
||||
@ -2021,6 +2027,26 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assertEqual([ShardRange.CLEAVED] * 2 + [ShardRange.CREATED] * 2,
|
||||
[sr.state for sr in shard_ranges])
|
||||
|
||||
# Check the current progress. It shouldn't be complete.
|
||||
recon = direct_client.direct_get_recon(leader_node, "sharding")
|
||||
expected_in_progress = {'all': [{'account': 'AUTH_test',
|
||||
'active': 0,
|
||||
'cleaved': 2,
|
||||
'created': 2,
|
||||
'found': 0,
|
||||
'db_state': 'sharding',
|
||||
'state': 'sharding',
|
||||
'error': None,
|
||||
'file_size': mock.ANY,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'node_index': 0,
|
||||
'object_count': len(obj_names),
|
||||
'container': mock.ANY,
|
||||
'path': mock.ANY,
|
||||
'root': mock.ANY}]}
|
||||
actual = recon['sharding_stats']['sharding']['sharding_in_progress']
|
||||
self.assertEqual(expected_in_progress, actual)
|
||||
|
||||
# stop *all* container servers for third shard range
|
||||
sr_part, sr_node_nums = self.get_part_and_node_numbers(shard_ranges[2])
|
||||
for node_num in sr_node_nums:
|
||||
@ -2078,6 +2104,26 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
self.assertEqual([ShardRange.ACTIVE] * 4,
|
||||
[sr.state for sr in shard_ranges])
|
||||
|
||||
# Check the leader's progress again, this time is should be complete
|
||||
recon = direct_client.direct_get_recon(leader_node, "sharding")
|
||||
expected_in_progress = {'all': [{'account': 'AUTH_test',
|
||||
'active': 4,
|
||||
'cleaved': 0,
|
||||
'created': 0,
|
||||
'found': 0,
|
||||
'db_state': 'sharded',
|
||||
'state': 'sharded',
|
||||
'error': None,
|
||||
'file_size': mock.ANY,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'node_index': 0,
|
||||
'object_count': len(obj_names),
|
||||
'container': mock.ANY,
|
||||
'path': mock.ANY,
|
||||
'root': mock.ANY}]}
|
||||
actual = recon['sharding_stats']['sharding']['sharding_in_progress']
|
||||
self.assertEqual(expected_in_progress, actual)
|
||||
|
||||
def test_sharded_delete(self):
|
||||
all_obj_names = self._make_object_names(self.max_shard_size)
|
||||
self.put_objects(all_obj_names)
|
||||
|
@ -170,6 +170,7 @@ class TestSharder(BaseTestSharder):
|
||||
'shards_account_prefix': '.shards_',
|
||||
'auto_shard': False,
|
||||
'recon_candidates_limit': 5,
|
||||
'recon_sharded_timeout': 43200,
|
||||
'shard_replication_quorum': 2,
|
||||
'existing_shard_replication_quorum': 2
|
||||
}
|
||||
@ -201,6 +202,7 @@ class TestSharder(BaseTestSharder):
|
||||
'auto_create_account_prefix': '...',
|
||||
'auto_shard': 'yes',
|
||||
'recon_candidates_limit': 10,
|
||||
'recon_sharded_timeout': 7200,
|
||||
'shard_replication_quorum': 1,
|
||||
'existing_shard_replication_quorum': 0
|
||||
}
|
||||
@ -223,6 +225,7 @@ class TestSharder(BaseTestSharder):
|
||||
'shards_account_prefix': '...shards_',
|
||||
'auto_shard': True,
|
||||
'recon_candidates_limit': 10,
|
||||
'recon_sharded_timeout': 7200,
|
||||
'shard_replication_quorum': 1,
|
||||
'existing_shard_replication_quorum': 0
|
||||
}
|
||||
@ -694,6 +697,114 @@ class TestSharder(BaseTestSharder):
|
||||
expected_candidate_stats, sharder, 'sharding_candidates')
|
||||
self._assert_recon_stats(None, sharder, 'sharding_progress')
|
||||
|
||||
# let's progress broker 1 (broker[0])
|
||||
brokers[0].enable_sharding(next(self.ts_iter))
|
||||
brokers[0].set_sharding_state()
|
||||
shard_ranges = brokers[0].get_shard_ranges()
|
||||
for sr in shard_ranges[:-1]:
|
||||
sr.update_state(ShardRange.CLEAVED)
|
||||
brokers[0].merge_shard_ranges(shard_ranges)
|
||||
|
||||
with mock.patch('eventlet.sleep'), mock.patch.object(
|
||||
sharder, '_process_broker'
|
||||
) as mock_process_broker:
|
||||
sharder._local_device_ids = {999}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
|
||||
expected_in_progress_stats = {
|
||||
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
|
||||
'meta_timestamp': mock.ANY,
|
||||
'file_size': os.stat(brokers[0].db_file).st_size,
|
||||
'path': brokers[0].db_file, 'root': 'a/c0',
|
||||
'node_index': 0,
|
||||
'found': 1, 'created': 0, 'cleaved': 3, 'active': 1,
|
||||
'state': 'sharding', 'db_state': 'sharding',
|
||||
'error': None},
|
||||
{'object_count': 0, 'account': 'a', 'container': 'c1',
|
||||
'meta_timestamp': mock.ANY,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size,
|
||||
'path': brokers[1].db_file, 'root': 'a/c1',
|
||||
'node_index': 1,
|
||||
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
|
||||
'state': 'sharding', 'db_state': 'unsharded',
|
||||
'error': None}]}
|
||||
self._assert_stats(
|
||||
expected_in_progress_stats, sharder, 'sharding_in_progress')
|
||||
|
||||
# Now complete sharding broker 1.
|
||||
shard_ranges[-1].update_state(ShardRange.CLEAVED)
|
||||
own_sr = brokers[0].get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.SHARDED)
|
||||
brokers[0].merge_shard_ranges(shard_ranges + [own_sr])
|
||||
# make and complete a cleave context, this is used for the
|
||||
# recon_sharded_timeout timer.
|
||||
cxt = CleavingContext.load(brokers[0])
|
||||
cxt.misplaced_done = cxt.cleaving_done = True
|
||||
ts_now = next(self.ts_iter)
|
||||
with mock_timestamp_now(ts_now):
|
||||
cxt.store(brokers[0])
|
||||
self.assertTrue(brokers[0].set_sharded_state())
|
||||
|
||||
with mock.patch('eventlet.sleep'), \
|
||||
mock.patch.object(sharder, '_process_broker') \
|
||||
as mock_process_broker, mock_timestamp_now(ts_now):
|
||||
sharder._local_device_ids = {999}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
|
||||
expected_in_progress_stats = {
|
||||
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
|
||||
'meta_timestamp': mock.ANY,
|
||||
'file_size': os.stat(brokers[0].db_file).st_size,
|
||||
'path': brokers[0].db_file, 'root': 'a/c0',
|
||||
'node_index': 0,
|
||||
'found': 0, 'created': 0, 'cleaved': 4, 'active': 1,
|
||||
'state': 'sharded', 'db_state': 'sharded',
|
||||
'error': None},
|
||||
{'object_count': 0, 'account': 'a', 'container': 'c1',
|
||||
'meta_timestamp': mock.ANY,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size,
|
||||
'path': brokers[1].db_file, 'root': 'a/c1',
|
||||
'node_index': 1,
|
||||
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
|
||||
'state': 'sharding', 'db_state': 'unsharded',
|
||||
'error': None}]}
|
||||
self._assert_stats(
|
||||
expected_in_progress_stats, sharder, 'sharding_in_progress')
|
||||
|
||||
# one more cycle at recon_sharded_timeout seconds into the
|
||||
# future to check that the completed broker is still reported
|
||||
ts_now = Timestamp(ts_now.timestamp +
|
||||
sharder.recon_sharded_timeout)
|
||||
with mock.patch('eventlet.sleep'), \
|
||||
mock.patch.object(sharder, '_process_broker') \
|
||||
as mock_process_broker, mock_timestamp_now(ts_now):
|
||||
sharder._local_device_ids = {999}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
self._assert_stats(
|
||||
expected_in_progress_stats, sharder, 'sharding_in_progress')
|
||||
|
||||
# when we move recon_sharded_timeout + 1 seconds into the future,
|
||||
# broker 1 will be removed from the progress report
|
||||
ts_now = Timestamp(ts_now.timestamp +
|
||||
sharder.recon_sharded_timeout + 1)
|
||||
with mock.patch('eventlet.sleep'), \
|
||||
mock.patch.object(sharder, '_process_broker') \
|
||||
as mock_process_broker, mock_timestamp_now(ts_now):
|
||||
sharder._local_device_ids = {999}
|
||||
sharder._one_shard_cycle(Everything(), Everything())
|
||||
|
||||
expected_in_progress_stats = {
|
||||
'all': [{'object_count': 0, 'account': 'a', 'container': 'c1',
|
||||
'meta_timestamp': mock.ANY,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size,
|
||||
'path': brokers[1].db_file, 'root': 'a/c1',
|
||||
'node_index': 1,
|
||||
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
|
||||
'state': 'sharding', 'db_state': 'unsharded',
|
||||
'error': None}]}
|
||||
self._assert_stats(
|
||||
expected_in_progress_stats, sharder, 'sharding_in_progress')
|
||||
|
||||
def test_ratelimited_roundrobin(self):
|
||||
n_databases = 100
|
||||
|
||||
@ -5426,21 +5537,21 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
def test_audit_cleave_contexts(self):
|
||||
|
||||
def add_cleave_context(id, last_modified):
|
||||
def add_cleave_context(id, last_modified, cleaving_done):
|
||||
params = {'ref': id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'cleaving_done': cleaving_done,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
|
||||
with mock_timestamp_now(Timestamp(last_modified)):
|
||||
with mock_timestamp_now(last_modified):
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params),
|
||||
Timestamp(last_modified).internal)})
|
||||
last_modified.internal)})
|
||||
|
||||
def get_context(id, broker):
|
||||
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
|
||||
@ -5449,6 +5560,7 @@ class TestSharder(BaseTestSharder):
|
||||
return data
|
||||
|
||||
reclaim_age = 100
|
||||
recon_sharded_timeout = 50
|
||||
broker = self._make_broker()
|
||||
|
||||
# sanity check
|
||||
@ -5456,25 +5568,43 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
|
||||
# Setup some cleaving contexts
|
||||
id_old, id_newish = [str(uuid4()) for _ in range(2)]
|
||||
contexts = ((id_old, 1),
|
||||
(id_newish, reclaim_age // 2))
|
||||
for id, last_modified in contexts:
|
||||
add_cleave_context(id, last_modified)
|
||||
id_old, id_newish, id_complete = [str(uuid4()) for _ in range(3)]
|
||||
ts_old, ts_newish, ts_complete = (
|
||||
Timestamp(1),
|
||||
Timestamp(reclaim_age // 2),
|
||||
Timestamp(reclaim_age - recon_sharded_timeout))
|
||||
contexts = ((id_old, ts_old, False),
|
||||
(id_newish, ts_newish, False),
|
||||
(id_complete, ts_complete, True))
|
||||
for id, last_modified, cleaving_done in contexts:
|
||||
add_cleave_context(id, last_modified, cleaving_done)
|
||||
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
sharder_conf = {'reclaim_age': str(reclaim_age),
|
||||
'recon_sharded_timeout': str(recon_sharded_timeout)}
|
||||
|
||||
with self._mock_sharder(sharder_conf) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
# old context is stale, ie last modified reached reclaim_age and was
|
||||
# never completed (done).
|
||||
old_ctx = get_context(id_old, broker)
|
||||
self.assertEqual(old_ctx, "")
|
||||
|
||||
# Newish context is almost stale, as in it's been 1/2 reclaim age since
|
||||
# it was last modified yet it's not completed. So it haven't been
|
||||
# cleaned up.
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx.ref, id_newish)
|
||||
|
||||
# If we push time another reclaim age later, and they all be removed
|
||||
# minus id_missing_lm as it has a later last_modified.
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
# Complete context is complete (done) and it's been
|
||||
# recon_sharded_timeout time since it was marked completed so it's
|
||||
# been removed
|
||||
complete_ctx = get_context(id_complete, broker)
|
||||
self.assertEqual(complete_ctx, "")
|
||||
|
||||
# If we push time another reclaim age later, they are all removed
|
||||
with self._mock_sharder(sharder_conf) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
@ -5853,6 +5983,7 @@ class TestCleavingContext(BaseTestSharder):
|
||||
# Now let's delete it. When deleted the metadata key will exist, but
|
||||
# the value will be "" as this means it'll be reaped later.
|
||||
ctx.delete(broker)
|
||||
|
||||
sysmeta = broker.get_sharding_sysmeta()
|
||||
for key, val in sysmeta.items():
|
||||
if key == "Context-%s" % db_id:
|
||||
|
@ -40,3 +40,4 @@
|
||||
- nose
|
||||
- pyeclib
|
||||
- python-swiftclient
|
||||
- mock
|
||||
|
Loading…
Reference in New Issue
Block a user