expirer: bad config should not loop forever
Change-Id: I9413c72f41465fb8026848f71ec3b39fa990c3b7
This commit is contained in:
parent
a2df74ffe2
commit
f3adce1375
@ -29,7 +29,7 @@ from swift.common.daemon import Daemon
|
|||||||
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
from swift.common.internal_client import InternalClient, UnexpectedResponse
|
||||||
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
from swift.common.utils import get_logger, dump_recon_cache, split_path, \
|
||||||
Timestamp, config_true_value, normalize_delete_at_timestamp, \
|
Timestamp, config_true_value, normalize_delete_at_timestamp, \
|
||||||
RateLimitedIterator, md5, non_negative_float
|
RateLimitedIterator, md5, non_negative_float, non_negative_int
|
||||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
from swift.common.http import HTTP_NOT_FOUND, HTTP_CONFLICT, \
|
||||||
HTTP_PRECONDITION_FAILED
|
HTTP_PRECONDITION_FAILED
|
||||||
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
|
from swift.common.recon import RECON_OBJECT_FILE, DEFAULT_RECON_CACHE_PATH
|
||||||
@ -174,8 +174,9 @@ class ObjectExpirer(Daemon):
|
|||||||
global_conf={'log_name': '%s-ic' % self.conf.get(
|
global_conf={'log_name': '%s-ic' % self.conf.get(
|
||||||
'log_name', self.log_route)})
|
'log_name', self.log_route)})
|
||||||
|
|
||||||
self.processes = int(self.conf.get('processes', 0))
|
self.processes = non_negative_int(self.conf.get('processes', 0))
|
||||||
self.process = int(self.conf.get('process', 0))
|
self.process = non_negative_int(self.conf.get('process', 0))
|
||||||
|
self._validate_processes_config()
|
||||||
|
|
||||||
def report(self, final=False):
|
def report(self, final=False):
|
||||||
"""
|
"""
|
||||||
@ -330,7 +331,7 @@ class ObjectExpirer(Daemon):
|
|||||||
# we shouldn't yield the object during the delay
|
# we shouldn't yield the object during the delay
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Only one expirer daemon assigned for one task
|
# Only one expirer daemon assigned for each task
|
||||||
if self.hash_mod('%s/%s' % (task_container, task_object),
|
if self.hash_mod('%s/%s' % (task_container, task_object),
|
||||||
divisor) != my_index:
|
divisor) != my_index:
|
||||||
continue
|
continue
|
||||||
@ -365,6 +366,9 @@ class ObjectExpirer(Daemon):
|
|||||||
These will override the values from the config file if
|
These will override the values from the config file if
|
||||||
provided.
|
provided.
|
||||||
"""
|
"""
|
||||||
|
# these config values are available to override at the command line,
|
||||||
|
# blow-up now if they're wrong
|
||||||
|
self.override_proceses_config_from_command_line(**kwargs)
|
||||||
# This if-clause will be removed when general task queue feature is
|
# This if-clause will be removed when general task queue feature is
|
||||||
# implemented.
|
# implemented.
|
||||||
if not self.dequeue_from_legacy:
|
if not self.dequeue_from_legacy:
|
||||||
@ -375,7 +379,6 @@ class ObjectExpirer(Daemon):
|
|||||||
'with dequeue_from_legacy == true.')
|
'with dequeue_from_legacy == true.')
|
||||||
return
|
return
|
||||||
|
|
||||||
self.get_process_values(kwargs)
|
|
||||||
pool = GreenPool(self.concurrency)
|
pool = GreenPool(self.concurrency)
|
||||||
self.report_first_time = self.report_last_time = time()
|
self.report_first_time = self.report_last_time = time()
|
||||||
self.report_objects = 0
|
self.report_objects = 0
|
||||||
@ -430,6 +433,9 @@ class ObjectExpirer(Daemon):
|
|||||||
:param kwargs: Extra keyword args to fulfill the Daemon interface; this
|
:param kwargs: Extra keyword args to fulfill the Daemon interface; this
|
||||||
daemon has no additional keyword args.
|
daemon has no additional keyword args.
|
||||||
"""
|
"""
|
||||||
|
# these config values are available to override at the command line
|
||||||
|
# blow-up now if they're wrong
|
||||||
|
self.override_proceses_config_from_command_line(**kwargs)
|
||||||
sleep(random() * self.interval)
|
sleep(random() * self.interval)
|
||||||
while True:
|
while True:
|
||||||
begin = time()
|
begin = time()
|
||||||
@ -441,7 +447,7 @@ class ObjectExpirer(Daemon):
|
|||||||
if elapsed < self.interval:
|
if elapsed < self.interval:
|
||||||
sleep(random() * (self.interval - elapsed))
|
sleep(random() * (self.interval - elapsed))
|
||||||
|
|
||||||
def get_process_values(self, kwargs):
|
def override_proceses_config_from_command_line(self, **kwargs):
|
||||||
"""
|
"""
|
||||||
Sets self.processes and self.process from the kwargs if those
|
Sets self.processes and self.process from the kwargs if those
|
||||||
values exist, otherwise, leaves those values as they were set in
|
values exist, otherwise, leaves those values as they were set in
|
||||||
@ -452,19 +458,20 @@ class ObjectExpirer(Daemon):
|
|||||||
line when the daemon is run.
|
line when the daemon is run.
|
||||||
"""
|
"""
|
||||||
if kwargs.get('processes') is not None:
|
if kwargs.get('processes') is not None:
|
||||||
self.processes = int(kwargs['processes'])
|
self.processes = non_negative_int(kwargs['processes'])
|
||||||
|
|
||||||
if kwargs.get('process') is not None:
|
if kwargs.get('process') is not None:
|
||||||
self.process = int(kwargs['process'])
|
self.process = non_negative_int(kwargs['process'])
|
||||||
|
|
||||||
if self.process < 0:
|
self._validate_processes_config()
|
||||||
raise ValueError(
|
|
||||||
'process must be an integer greater than or equal to 0')
|
|
||||||
|
|
||||||
if self.processes < 0:
|
def _validate_processes_config(self):
|
||||||
raise ValueError(
|
"""
|
||||||
'processes must be an integer greater than or equal to 0')
|
Used in constructor and in override_proceses_config_from_command_line
|
||||||
|
to validate the processes configuration requirements.
|
||||||
|
|
||||||
|
:raiess: ValueError if processes config is invalid
|
||||||
|
"""
|
||||||
if self.processes and self.process >= self.processes:
|
if self.processes and self.process >= self.processes:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'process must be less than processes')
|
'process must be less than processes')
|
||||||
|
@ -195,94 +195,87 @@ class TestObjectExpirer(TestCase):
|
|||||||
_do_test_init_ic_log_name({'log_name': 'my-object-expirer'},
|
_do_test_init_ic_log_name({'log_name': 'my-object-expirer'},
|
||||||
'my-object-expirer-ic')
|
'my-object-expirer-ic')
|
||||||
|
|
||||||
def test_get_process_values_from_kwargs(self):
|
def test_set_process_values_from_kwargs(self):
|
||||||
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
||||||
vals = {
|
vals = {
|
||||||
'processes': 5,
|
'processes': 5,
|
||||||
'process': 1,
|
'process': 1,
|
||||||
}
|
}
|
||||||
x.get_process_values(vals)
|
x.override_proceses_config_from_command_line(**vals)
|
||||||
self.assertEqual(x.processes, 5)
|
self.assertEqual(x.processes, 5)
|
||||||
self.assertEqual(x.process, 1)
|
self.assertEqual(x.process, 1)
|
||||||
|
|
||||||
def test_get_process_values_from_config(self):
|
def test_set_process_values_from_config(self):
|
||||||
vals = {
|
conf = {
|
||||||
'processes': 5,
|
'processes': 5,
|
||||||
'process': 1,
|
'process': 1,
|
||||||
}
|
}
|
||||||
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
x = expirer.ObjectExpirer(conf, swift=self.fake_swift)
|
||||||
x.get_process_values({})
|
|
||||||
self.assertEqual(x.processes, 5)
|
self.assertEqual(x.processes, 5)
|
||||||
self.assertEqual(x.process, 1)
|
self.assertEqual(x.process, 1)
|
||||||
|
|
||||||
def test_get_process_values_negative_process(self):
|
def test_set_process_values_negative_process(self):
|
||||||
vals = {
|
vals = {
|
||||||
'processes': 5,
|
'processes': 5,
|
||||||
'process': -1,
|
'process': -1,
|
||||||
}
|
}
|
||||||
# from config
|
# from config
|
||||||
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
expected_msg = 'must be a non-negative integer'
|
||||||
expected_msg = 'process must be an integer greater' \
|
|
||||||
' than or equal to 0'
|
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values({})
|
expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertIn(expected_msg, str(ctx.exception))
|
||||||
# from kwargs
|
# from kwargs
|
||||||
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values(vals)
|
x.override_proceses_config_from_command_line(**vals)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertIn(expected_msg, str(ctx.exception))
|
||||||
|
|
||||||
def test_get_process_values_negative_processes(self):
|
def test_set_process_values_negative_processes(self):
|
||||||
vals = {
|
vals = {
|
||||||
'processes': -5,
|
'processes': -5,
|
||||||
'process': 1,
|
'process': 1,
|
||||||
}
|
}
|
||||||
# from config
|
# from config
|
||||||
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
expected_msg = 'must be a non-negative integer'
|
||||||
expected_msg = 'processes must be an integer greater' \
|
|
||||||
' than or equal to 0'
|
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values({})
|
expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertIn(expected_msg, str(ctx.exception))
|
||||||
# from kwargs
|
# from kwargs
|
||||||
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values(vals)
|
x.override_proceses_config_from_command_line(**vals)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertIn(expected_msg, str(ctx.exception))
|
||||||
|
|
||||||
def test_get_process_values_process_greater_than_processes(self):
|
def test_set_process_values_process_greater_than_processes(self):
|
||||||
vals = {
|
vals = {
|
||||||
'processes': 5,
|
'processes': 5,
|
||||||
'process': 7,
|
'process': 7,
|
||||||
}
|
}
|
||||||
# from config
|
# from config
|
||||||
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
|
||||||
expected_msg = 'process must be less than processes'
|
expected_msg = 'process must be less than processes'
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values({})
|
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertEqual(str(ctx.exception), expected_msg)
|
||||||
# from kwargs
|
# from kwargs
|
||||||
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values(vals)
|
x.override_proceses_config_from_command_line(**vals)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertEqual(str(ctx.exception), expected_msg)
|
||||||
|
|
||||||
def test_get_process_values_process_equal_to_processes(self):
|
def test_set_process_values_process_equal_to_processes(self):
|
||||||
vals = {
|
vals = {
|
||||||
'processes': 5,
|
'processes': 5,
|
||||||
'process': 5,
|
'process': 5,
|
||||||
}
|
}
|
||||||
# from config
|
# from config
|
||||||
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
|
||||||
expected_msg = 'process must be less than processes'
|
expected_msg = 'process must be less than processes'
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values({})
|
expirer.ObjectExpirer(vals, swift=self.fake_swift)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertEqual(str(ctx.exception), expected_msg)
|
||||||
# from kwargs
|
# from kwargs
|
||||||
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
|
||||||
with self.assertRaises(ValueError) as ctx:
|
with self.assertRaises(ValueError) as ctx:
|
||||||
x.get_process_values(vals)
|
x.override_proceses_config_from_command_line(**vals)
|
||||||
self.assertEqual(str(ctx.exception), expected_msg)
|
self.assertEqual(str(ctx.exception), expected_msg)
|
||||||
|
|
||||||
def test_valid_delay_reaping(self):
|
def test_valid_delay_reaping(self):
|
||||||
@ -1375,6 +1368,61 @@ class TestObjectExpirer(TestCase):
|
|||||||
self.assertEqual(str(log_kwargs['exc_info'][1]),
|
self.assertEqual(str(log_kwargs['exc_info'][1]),
|
||||||
'exception 1')
|
'exception 1')
|
||||||
|
|
||||||
|
def test_run_forever_bad_process_values_config(self):
|
||||||
|
conf = {
|
||||||
|
'processes': -1,
|
||||||
|
'process': -2,
|
||||||
|
'interval': 1,
|
||||||
|
}
|
||||||
|
iterations = [0]
|
||||||
|
|
||||||
|
def wrap_with_exit(orig_f, exit_after_count=3):
|
||||||
|
def wrapped_f(*args, **kwargs):
|
||||||
|
iterations[0] += 1
|
||||||
|
if iterations[0] > exit_after_count:
|
||||||
|
raise SystemExit('that is enough for now')
|
||||||
|
return orig_f(*args, **kwargs)
|
||||||
|
return wrapped_f
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError) as ctx:
|
||||||
|
# we should blow up here
|
||||||
|
x = expirer.ObjectExpirer(conf, logger=self.logger,
|
||||||
|
swift=self.fake_swift)
|
||||||
|
x.pop_queue = lambda a, c, o: None
|
||||||
|
x.run_once = wrap_with_exit(x.run_once)
|
||||||
|
# at least we should hopefully we blow up here?
|
||||||
|
x.run_forever()
|
||||||
|
|
||||||
|
# bad config should exit immediately with ValueError
|
||||||
|
self.assertIn('must be a non-negative integer', str(ctx.exception))
|
||||||
|
|
||||||
|
def test_run_forever_bad_process_values_command_line(self):
|
||||||
|
conf = {
|
||||||
|
'interval': 1,
|
||||||
|
}
|
||||||
|
bad_kwargs = {
|
||||||
|
'processes': -1,
|
||||||
|
'process': -2,
|
||||||
|
}
|
||||||
|
iterations = [0]
|
||||||
|
|
||||||
|
def wrap_with_exit(orig_f, exit_after_count=3):
|
||||||
|
def wrapped_f(*args, **kwargs):
|
||||||
|
iterations[0] += 1
|
||||||
|
if iterations[0] > exit_after_count:
|
||||||
|
raise SystemExit('that is enough for now')
|
||||||
|
return orig_f(*args, **kwargs)
|
||||||
|
return wrapped_f
|
||||||
|
|
||||||
|
with self.assertRaises(ValueError) as ctx:
|
||||||
|
x = expirer.ObjectExpirer(conf, logger=self.logger,
|
||||||
|
swift=self.fake_swift)
|
||||||
|
x.run_once = wrap_with_exit(x.run_once)
|
||||||
|
x.run_forever(**bad_kwargs)
|
||||||
|
|
||||||
|
# bad command args should exit immediately with ValueError
|
||||||
|
self.assertIn('must be a non-negative integer', str(ctx.exception))
|
||||||
|
|
||||||
def test_delete_actual_object(self):
|
def test_delete_actual_object(self):
|
||||||
got_env = [None]
|
got_env = [None]
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user