Optimize replication of targeted devices/partitions.

swift-object-replicator lets you specify --devices and --partitions to
perform a single replication pass over just those devices and
partitions. However, it still scans every device and every partition
to build up a list of jobs to do, then throws away the jobs for the
wrong devices and partitions. This isn't too bad with partitions since
it only wastes some CPU, but with devices, it results in unnecessary
disk IO.

This commit pushes the device and partition filtering a little further
down into collect_jobs to avoid wasted work.

Change-Id: Ia711bfc5a86ed4a080d27e08fe923cb4cb92da43
This commit is contained in:
Samuel Merritt 2015-01-22 11:54:43 -08:00
parent 7def1d2921
commit e5ca7e26a3

View File

@ -406,16 +406,21 @@ class ObjectReplicator(Daemon):
self.kill_coros() self.kill_coros()
self.last_replication_count = self.replication_count self.last_replication_count = self.replication_count
def process_repl(self, policy, jobs, ips): def process_repl(self, policy, ips, override_devices=None,
override_partitions=None):
""" """
Helper function for collect_jobs to build jobs for replication Helper function for collect_jobs to build jobs for replication
using replication style storage policy using replication style storage policy
""" """
jobs = []
obj_ring = self.get_object_ring(policy.idx) obj_ring = self.get_object_ring(policy.idx)
data_dir = get_data_dir(policy.idx) data_dir = get_data_dir(policy.idx)
for local_dev in [dev for dev in obj_ring.devs for local_dev in [dev for dev in obj_ring.devs
if dev and dev['replication_ip'] in ips and if (dev
dev['replication_port'] == self.port]: and dev['replication_ip'] in ips
and dev['replication_port'] == self.port
and (override_devices is None
or dev['device'] in override_devices))]:
dev_path = join(self.devices_dir, local_dev['device']) dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, data_dir) obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy))) tmp_path = join(dev_path, get_tmp_dir(int(policy)))
@ -430,6 +435,10 @@ class ObjectReplicator(Daemon):
self.logger.exception('ERROR creating %s' % obj_path) self.logger.exception('ERROR creating %s' % obj_path)
continue continue
for partition in os.listdir(obj_path): for partition in os.listdir(obj_path):
if (override_partitions is not None
and partition not in override_partitions):
continue
try: try:
job_path = join(obj_path, partition) job_path = join(obj_path, partition)
part_nodes = obj_ring.get_part_nodes(int(partition)) part_nodes = obj_ring.get_part_nodes(int(partition))
@ -445,17 +454,26 @@ class ObjectReplicator(Daemon):
object_ring=obj_ring)) object_ring=obj_ring))
except ValueError: except ValueError:
continue continue
return jobs
def collect_jobs(self): def collect_jobs(self, override_devices=None, override_partitions=None):
""" """
Returns a sorted list of jobs (dictionaries) that specify the Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced. partitions, nodes, etc to be rsynced.
:param override_devices: if set, only jobs on these devices
will be returned
:param override_partitions: if set, only jobs on these partitions
will be returned
""" """
jobs = [] jobs = []
ips = whataremyips() ips = whataremyips()
for policy in POLICIES: for policy in POLICIES:
# may need to branch here for future policy types # may need to branch here for future policy types
self.process_repl(policy, jobs, ips) jobs += self.process_repl(policy, ips,
override_devices=override_devices,
override_partitions=override_partitions)
random.shuffle(jobs) random.shuffle(jobs)
if self.handoffs_first: if self.handoffs_first:
# Move the handoff parts to the front of the list # Move the handoff parts to the front of the list
@ -473,24 +491,15 @@ class ObjectReplicator(Daemon):
self.last_replication_count = -1 self.last_replication_count = -1
self.partition_times = [] self.partition_times = []
if override_devices is None:
override_devices = []
if override_partitions is None:
override_partitions = []
stats = eventlet.spawn(self.heartbeat) stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups) lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle eventlet.sleep() # Give spawns a cycle
try: try:
self.run_pool = GreenPool(size=self.concurrency) self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs() jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions)
for job in jobs: for job in jobs:
if override_devices and job['device'] not in override_devices:
continue
if override_partitions and \
job['partition'] not in override_partitions:
continue
dev_path = join(self.devices_dir, job['device']) dev_path = join(self.devices_dir, job['device'])
if self.mount_check and not ismount(dev_path): if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), job['device']) self.logger.warn(_('%s is not mounted'), job['device'])
@ -527,8 +536,14 @@ class ObjectReplicator(Daemon):
def run_once(self, *args, **kwargs): def run_once(self, *args, **kwargs):
start = time.time() start = time.time()
self.logger.info(_("Running object replicator in script mode.")) self.logger.info(_("Running object replicator in script mode."))
override_devices = list_from_csv(kwargs.get('devices')) override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions')) override_partitions = list_from_csv(kwargs.get('partitions'))
if not override_devices:
override_devices = None
if not override_partitions:
override_partitions = None
self.replicate( self.replicate(
override_devices=override_devices, override_devices=override_devices,
override_partitions=override_partitions) override_partitions=override_partitions)