Merge "Deprecate broken handoffs_first in favor of handoffs_only"
This commit is contained in:
commit
cdd72dd34f
@ -298,7 +298,18 @@ use = egg:swift#recon
|
||||
# lockup_timeout = 1800
|
||||
# ring_check_interval = 15
|
||||
# recon_cache_path = /var/cache/swift
|
||||
# handoffs_first = False
|
||||
# The handoffs_only mode option is for special case emergency situations during
|
||||
# rebalance such as disk full in the cluster. This option SHOULD NOT BE
|
||||
# CHANGED, except for extreme situations. When handoffs_only mode is enabled
|
||||
# the reconstructor will *only* revert rebalance fragments to primaries and not
|
||||
# attempt to sync any primary parts with neighbor primaries. This will force
|
||||
# the reconstructor to sync and delete handoffs fragments more quickly and
|
||||
# minimize the time of the rebalance by limiting the number of rebuilds. The
|
||||
# handoffs_only option is only for temporary use, it should be disabled as soon
|
||||
# as the emergency situation is resolved. When handoffs_only is not set, the
|
||||
# deprecated handoffs_first option will be honored as a synonym, but may be
|
||||
# ignored in a future release.
|
||||
# handoffs_only = False
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
# (most favorable to the process) to 19 (least favorable to the process).
|
||||
|
@ -149,8 +149,24 @@ class ObjectReconstructor(Daemon):
|
||||
self.headers = {
|
||||
'Content-Length': '0',
|
||||
'user-agent': 'obj-reconstructor %s' % os.getpid()}
|
||||
self.handoffs_first = config_true_value(conf.get('handoffs_first',
|
||||
False))
|
||||
if 'handoffs_first' in conf:
|
||||
self.logger.warning(
|
||||
'The handoffs_first option is deprecated in favor '
|
||||
'of handoffs_only. This option may be ignored in a '
|
||||
'future release.')
|
||||
# honor handoffs_first for backwards compatibility
|
||||
default_handoffs_only = config_true_value(conf['handoffs_first'])
|
||||
else:
|
||||
default_handoffs_only = False
|
||||
self.handoffs_only = config_true_value(
|
||||
conf.get('handoffs_only', default_handoffs_only))
|
||||
if self.handoffs_only:
|
||||
self.logger.warning(
|
||||
'Handoff only mode is not intended for normal '
|
||||
'operation, use handoffs_only with care.')
|
||||
elif default_handoffs_only:
|
||||
self.logger.warning('Ignored handoffs_first option in favor '
|
||||
'of handoffs_only.')
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
|
||||
def load_object_ring(self, policy):
|
||||
@ -656,6 +672,8 @@ class ObjectReconstructor(Daemon):
|
||||
if syncd_with >= len(job['sync_to']):
|
||||
self.delete_reverted_objs(
|
||||
job, reverted_objs, job['frag_index'])
|
||||
else:
|
||||
self.handoffs_remaining += 1
|
||||
self.logger.timing_since('partition.delete.timing', begin)
|
||||
|
||||
def _get_part_jobs(self, local_dev, part_path, partition, policy):
|
||||
@ -684,6 +702,9 @@ class ObjectReconstructor(Daemon):
|
||||
:param policy: the policy
|
||||
|
||||
:returns: a list of dicts of job info
|
||||
|
||||
N.B. If this function ever returns an empty list of jobs the entire
|
||||
partition will be deleted.
|
||||
"""
|
||||
# find all the fi's in the part, and which suffixes have them
|
||||
try:
|
||||
@ -876,12 +897,12 @@ class ObjectReconstructor(Daemon):
|
||||
"""
|
||||
Helper function for collect_jobs to build jobs for reconstruction
|
||||
using EC style storage policy
|
||||
|
||||
N.B. If this function ever returns an empty list of jobs the entire
|
||||
partition will be deleted.
|
||||
"""
|
||||
jobs = self._get_part_jobs(**part_info)
|
||||
random.shuffle(jobs)
|
||||
if self.handoffs_first:
|
||||
# Move the handoff revert jobs to the front of the list
|
||||
jobs.sort(key=lambda job: job['job_type'], reverse=True)
|
||||
self.job_count += len(jobs)
|
||||
return jobs
|
||||
|
||||
@ -897,6 +918,7 @@ class ObjectReconstructor(Daemon):
|
||||
self.reconstruction_part_count = 0
|
||||
self.reconstruction_device_count = 0
|
||||
self.last_reconstruction_count = -1
|
||||
self.handoffs_remaining = 0
|
||||
|
||||
def delete_partition(self, path):
|
||||
self.logger.info(_("Removing partition: %s"), path)
|
||||
@ -932,6 +954,11 @@ class ObjectReconstructor(Daemon):
|
||||
self.run_pool.spawn(self.delete_partition,
|
||||
part_info['part_path'])
|
||||
for job in jobs:
|
||||
if (self.handoffs_only and job['job_type'] != REVERT):
|
||||
self.logger.debug('Skipping %s job for %s '
|
||||
'while in handoffs_only mode.',
|
||||
job['job_type'], job['path'])
|
||||
continue
|
||||
self.run_pool.spawn(self.process_job, job)
|
||||
with Timeout(self.lockup_timeout):
|
||||
self.run_pool.waitall()
|
||||
@ -943,6 +970,16 @@ class ObjectReconstructor(Daemon):
|
||||
stats.kill()
|
||||
lockup_detector.kill()
|
||||
self.stats_line()
|
||||
if self.handoffs_only:
|
||||
if self.handoffs_remaining > 0:
|
||||
self.logger.info(_(
|
||||
"Handoffs only mode still has handoffs remaining. "
|
||||
"Next pass will continue to revert handoffs."))
|
||||
else:
|
||||
self.logger.warning(_(
|
||||
"Handoffs only mode found no handoffs remaining. "
|
||||
"You should disable handoffs_only once all nodes "
|
||||
"are reporting no handoffs remaining."))
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
start = time.time()
|
||||
|
@ -719,7 +719,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
rmtree(testring, ignore_errors=1)
|
||||
|
||||
def test_build_reconstruction_jobs(self):
|
||||
self.reconstructor.handoffs_first = False
|
||||
self.reconstructor._reset_stats()
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
|
||||
@ -728,13 +727,40 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
object_reconstructor.REVERT))
|
||||
self.assert_expected_jobs(part_info['partition'], jobs)
|
||||
|
||||
self.reconstructor.handoffs_first = True
|
||||
self.reconstructor._reset_stats()
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
|
||||
self.assertTrue(jobs[0]['job_type'] ==
|
||||
object_reconstructor.REVERT)
|
||||
self.assert_expected_jobs(part_info['partition'], jobs)
|
||||
def test_handoffs_only(self):
|
||||
self.reconstructor.handoffs_only = True
|
||||
|
||||
found_job_types = set()
|
||||
|
||||
def fake_process_job(job):
|
||||
# increment failure counter
|
||||
self.reconstructor.handoffs_remaining += 1
|
||||
found_job_types.add(job['job_type'])
|
||||
|
||||
self.reconstructor.process_job = fake_process_job
|
||||
|
||||
# only revert jobs
|
||||
self.reconstructor.reconstruct()
|
||||
self.assertEqual(found_job_types, {object_reconstructor.REVERT})
|
||||
# but failures keep handoffs remaining
|
||||
msgs = self.reconstructor.logger.get_lines_for_level('info')
|
||||
self.assertIn('Next pass will continue to revert handoffs', msgs[-1])
|
||||
self.logger._clear()
|
||||
|
||||
found_job_types = set()
|
||||
|
||||
def fake_process_job(job):
|
||||
# success does not increment failure counter
|
||||
found_job_types.add(job['job_type'])
|
||||
|
||||
self.reconstructor.process_job = fake_process_job
|
||||
|
||||
# only revert jobs ... but all handoffs cleared out successfully
|
||||
self.reconstructor.reconstruct()
|
||||
self.assertEqual(found_job_types, {object_reconstructor.REVERT})
|
||||
# it's time to turn off handoffs_only
|
||||
msgs = self.reconstructor.logger.get_lines_for_level('warning')
|
||||
self.assertIn('You should disable handoffs_only', msgs[-1])
|
||||
|
||||
def test_get_partners(self):
|
||||
# we're going to perform an exhaustive test of every possible
|
||||
@ -1156,6 +1182,57 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
def ts(self):
|
||||
return next(self.ts_iter)
|
||||
|
||||
def test_handoffs_only_default(self):
|
||||
# sanity neither option added to default conf
|
||||
self.conf.pop('handoffs_only', None)
|
||||
self.conf.pop('handoffs_first', None)
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
self.conf, logger=self.logger)
|
||||
self.assertFalse(self.reconstructor.handoffs_only)
|
||||
|
||||
def test_handoffs_first_enables_handoffs_only(self):
|
||||
self.conf.pop('handoffs_only', None) # sanity
|
||||
self.conf['handoffs_first'] = True
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
self.conf, logger=self.logger)
|
||||
self.assertTrue(self.reconstructor.handoffs_only)
|
||||
warnings = self.logger.get_lines_for_level('warning')
|
||||
expected = [
|
||||
'The handoffs_first option is deprecated in favor '
|
||||
'of handoffs_only. This option may be ignored in a '
|
||||
'future release.',
|
||||
'Handoff only mode is not intended for normal operation, '
|
||||
'use handoffs_only with care.',
|
||||
]
|
||||
self.assertEqual(expected, warnings)
|
||||
|
||||
def test_handoffs_only_ignores_handoffs_first(self):
|
||||
self.conf['handoffs_first'] = True
|
||||
self.conf['handoffs_only'] = False
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
self.conf, logger=self.logger)
|
||||
self.assertFalse(self.reconstructor.handoffs_only)
|
||||
warnings = self.logger.get_lines_for_level('warning')
|
||||
expected = [
|
||||
'The handoffs_first option is deprecated in favor of '
|
||||
'handoffs_only. This option may be ignored in a future release.',
|
||||
'Ignored handoffs_first option in favor of handoffs_only.',
|
||||
]
|
||||
self.assertEqual(expected, warnings)
|
||||
|
||||
def test_handoffs_only_enabled(self):
|
||||
self.conf.pop('handoffs_first', None) # sanity
|
||||
self.conf['handoffs_only'] = True
|
||||
self.reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
self.conf, logger=self.logger)
|
||||
self.assertTrue(self.reconstructor.handoffs_only)
|
||||
warnings = self.logger.get_lines_for_level('warning')
|
||||
expected = [
|
||||
'Handoff only mode is not intended for normal operation, '
|
||||
'use handoffs_only with care.',
|
||||
]
|
||||
self.assertEqual(expected, warnings)
|
||||
|
||||
def test_two_ec_policies(self):
|
||||
with patch_policies([
|
||||
StoragePolicy(0, name='zero', is_deprecated=True),
|
||||
@ -2345,7 +2422,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
self.assertEqual(call['node'], sync_to[0])
|
||||
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
|
||||
|
||||
def test_process_job_revert_is_handoff(self):
|
||||
def test_process_job_revert_is_handoff_fails(self):
|
||||
replicas = self.policy.object_ring.replicas
|
||||
frag_index = random.randint(0, replicas - 1)
|
||||
sync_to = [random.choice([n for n in self.policy.object_ring.devs
|
||||
@ -2375,7 +2452,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
# in this test ssync always fails, until we encounter ourselves in
|
||||
# the list of possible handoff's to sync to
|
||||
# the list of possible handoff's to sync to, so handoffs_remaining
|
||||
# should increment
|
||||
return False, {}
|
||||
|
||||
expected_suffix_calls = set([
|
||||
@ -2401,6 +2479,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
call = ssync_calls[0]
|
||||
self.assertEqual(call['node'], sync_to[0])
|
||||
self.assertEqual(set(call['suffixes']), set(['123', 'abc']))
|
||||
self.assertEqual(self.reconstructor.handoffs_remaining, 1)
|
||||
|
||||
def test_process_job_revert_cleanup(self):
|
||||
replicas = self.policy.object_ring.replicas
|
||||
@ -2446,6 +2525,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
}
|
||||
|
||||
def ssync_response_callback(*args):
|
||||
# success should not increment handoffs_remaining
|
||||
return True, {ohash: {'ts_data': ts}}
|
||||
|
||||
ssync_calls = []
|
||||
@ -2469,6 +2549,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
df_mgr.get_hashes(self.local_dev['device'], str(partition), [],
|
||||
self.policy)
|
||||
self.assertFalse(os.access(df._datadir, os.F_OK))
|
||||
self.assertEqual(self.reconstructor.handoffs_remaining, 0)
|
||||
|
||||
def test_process_job_revert_cleanup_tombstone(self):
|
||||
sync_to = [random.choice([n for n in self.policy.object_ring.devs
|
||||
|
Loading…
x
Reference in New Issue
Block a user