Optimize obj replicator/reconstructor healthchecks
DaemonStrategy class calls Daemon.is_healthy() method every 0.1 seconds to ensure that all workers are running as wanted. On object replicator/reconstructor daemons, is_healthy() check if the rings changed to decide if workers must be created/killed. With large rings, this operation can be CPU intensive, especially on low-end CPU. This patch: - increases the check interval to 5 seconds by default, because none of these daemons are critical for performance (they are not in the datapath). But it allows each daemon to change this value if necessary - ensures that before doing a computation of all devices in the ring, object replicator/reconstructor checks that the ring really changed (by checking the mtime of the ring.gz files) On an Atom N2800 processor, this patch reduced the CPU usage of the main object replicator/reconstructor from 70% of a core to 0%. Change-Id: I2867e2be539f325778e2f044a151fd0773a7c390
This commit is contained in:
parent
9d4dc29fb3
commit
804776b379
@ -45,6 +45,7 @@ class Daemon(object):
|
||||
multiple daemonized workers, they simply provide the behavior of the daemon
|
||||
and context specific knowledge about how workers should be started.
|
||||
"""
|
||||
WORKERS_HEALTHCHECK_INTERVAL = 5.0
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
@ -239,7 +240,7 @@ class DaemonStrategy(object):
|
||||
if not self.spawned_pids():
|
||||
self.logger.notice('Finished %s', os.getpid())
|
||||
break
|
||||
time.sleep(0.1)
|
||||
time.sleep(self.daemon.WORKERS_HEALTHCHECK_INTERVAL)
|
||||
self.daemon.post_multiprocess_run()
|
||||
return 0
|
||||
|
||||
|
@ -210,6 +210,7 @@ class ObjectReconstructor(Daemon):
|
||||
'rebuild_handoff_node_count', 2))
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self.all_local_devices = self.get_local_devices()
|
||||
self.rings_mtime = None
|
||||
|
||||
def get_worker_args(self, once=False, **kwargs):
|
||||
"""
|
||||
@ -263,6 +264,11 @@ class ObjectReconstructor(Daemon):
|
||||
if now > self._next_rcache_update:
|
||||
self._next_rcache_update = now + self.stats_interval
|
||||
self.aggregate_recon_update()
|
||||
rings_mtime = [os.path.getmtime(self.load_object_ring(
|
||||
policy).serialized_path) for policy in self.policies]
|
||||
if self.rings_mtime == rings_mtime:
|
||||
return True
|
||||
self.rings_mtime = rings_mtime
|
||||
return self.get_local_devices() == self.all_local_devices
|
||||
|
||||
def aggregate_recon_update(self):
|
||||
|
@ -139,6 +139,8 @@ class ObjectReplicator(Daemon):
|
||||
int(conf.get('bind_port', 6200))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
self.replicator_workers = int(conf.get('replicator_workers', 0))
|
||||
self.policies = [policy for policy in POLICIES
|
||||
if policy.policy_type == REPL_POLICY]
|
||||
self.stats_interval = int(conf.get('stats_interval', '300'))
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
self.next_check = time.time() + self.ring_check_interval
|
||||
@ -187,6 +189,7 @@ class ObjectReplicator(Daemon):
|
||||
self.is_multiprocess_worker = None
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self._child_process_reaper_queue = queue.LightQueue()
|
||||
self.rings_mtime = None
|
||||
|
||||
def _zero_stats(self):
|
||||
self.stats_for_dev = defaultdict(Stats)
|
||||
@ -204,7 +207,7 @@ class ObjectReplicator(Daemon):
|
||||
def _get_my_replication_ips(self):
|
||||
my_replication_ips = set()
|
||||
ips = whataremyips()
|
||||
for policy in POLICIES:
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
for local_dev in [dev for dev in policy.object_ring.devs
|
||||
if dev and dev['replication_ip'] in ips and
|
||||
@ -291,6 +294,11 @@ class ObjectReplicator(Daemon):
|
||||
if time.time() >= self._next_rcache_update:
|
||||
update = self.aggregate_recon_update()
|
||||
dump_recon_cache(update, self.rcache, self.logger)
|
||||
rings_mtime = [os.path.getmtime(self.load_object_ring(
|
||||
policy).serialized_path) for policy in self.policies]
|
||||
if self.rings_mtime == rings_mtime:
|
||||
return True
|
||||
self.rings_mtime = rings_mtime
|
||||
return self.get_local_devices() == self.all_local_devices
|
||||
|
||||
def get_local_devices(self):
|
||||
@ -303,9 +311,7 @@ class ObjectReplicator(Daemon):
|
||||
"""
|
||||
ips = whataremyips(self.bind_ip)
|
||||
local_devices = set()
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type != REPL_POLICY:
|
||||
continue
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
for device in policy.object_ring.devs:
|
||||
if device and is_local_device(
|
||||
@ -877,7 +883,7 @@ class ObjectReplicator(Daemon):
|
||||
"""
|
||||
jobs = []
|
||||
ips = whataremyips(self.bind_ip)
|
||||
for policy in POLICIES:
|
||||
for policy in self.policies:
|
||||
# Skip replication if next_part_power is set. In this case
|
||||
# every object is hard-linked twice, but the replicator can't
|
||||
# detect them and would create a second copy of the file if not
|
||||
@ -891,15 +897,14 @@ class ObjectReplicator(Daemon):
|
||||
policy.name)
|
||||
continue
|
||||
|
||||
if policy.policy_type == REPL_POLICY:
|
||||
if (override_policies is not None and
|
||||
policy.idx not in override_policies):
|
||||
continue
|
||||
# ensure rings are loaded for policy
|
||||
self.load_object_ring(policy)
|
||||
jobs += self.build_replication_jobs(
|
||||
policy, ips, override_devices=override_devices,
|
||||
override_partitions=override_partitions)
|
||||
if (override_policies is not None and
|
||||
policy.idx not in override_policies):
|
||||
continue
|
||||
# ensure rings are loaded for policy
|
||||
self.load_object_ring(policy)
|
||||
jobs += self.build_replication_jobs(
|
||||
policy, ips, override_devices=override_devices,
|
||||
override_partitions=override_partitions)
|
||||
random.shuffle(jobs)
|
||||
if self.handoffs_first:
|
||||
# Move the handoff parts to the front of the list
|
||||
|
@ -215,6 +215,7 @@ class FakeRing(Ring):
|
||||
|
||||
def __init__(self, replicas=3, max_more_nodes=0, part_power=0,
|
||||
base_port=1000):
|
||||
self.serialized_path = '/foo/bar/object.ring.gz'
|
||||
self._base_port = base_port
|
||||
self.max_more_nodes = max_more_nodes
|
||||
self._part_shift = 32 - part_power
|
||||
|
@ -1811,14 +1811,18 @@ class TestWorkerReconstructor(unittest.TestCase):
|
||||
logger=self.logger)
|
||||
# file does not exist to start
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# ... and isn't created until _next_rcache_update
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
# ... but if we wait 5 mins (by default)
|
||||
orig_next_update = reconstructor._next_rcache_update
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=now + 301):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
self.assertGreater(reconstructor._next_rcache_update, orig_next_update)
|
||||
# ... it will be created
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
@ -1831,13 +1835,19 @@ class TestWorkerReconstructor(unittest.TestCase):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
reconstructor.get_local_devices = lambda: {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
reconstructor.all_local_devices = {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=12):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
|
||||
def test_is_healthy_detects_ring_change(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
@ -1850,13 +1860,26 @@ class TestWorkerReconstructor(unittest.TestCase):
|
||||
self.assertEqual(14, len(p.object_ring.devs)) # sanity check
|
||||
worker_args = list(reconstructor.get_worker_args())
|
||||
self.assertFalse(worker_args[0]['override_devices']) # no local devs
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# expand ring - now there are local devices
|
||||
p.object_ring.set_replicas(28)
|
||||
self.assertEqual(28, len(p.object_ring.devs)) # sanity check
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
|
||||
# If ring.gz mtime did not change, there is no change to detect
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=10):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# Now, ring.gz mtime changed, so the change will be detected
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=11):
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
|
||||
self.assertNotEqual(worker_args, list(reconstructor.get_worker_args()))
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
with mock.patch('swift.obj.reconstructor.os.path.getmtime',
|
||||
return_value=12):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
|
||||
def test_final_recon_dump(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
|
Loading…
x
Reference in New Issue
Block a user