diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 8eaf029833..5cad5d4cf0 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -406,16 +406,21 @@ class ObjectReplicator(Daemon): self.kill_coros() self.last_replication_count = self.replication_count - def process_repl(self, policy, jobs, ips): + def process_repl(self, policy, ips, override_devices=None, + override_partitions=None): """ Helper function for collect_jobs to build jobs for replication using replication style storage policy """ + jobs = [] obj_ring = self.get_object_ring(policy.idx) data_dir = get_data_dir(policy.idx) for local_dev in [dev for dev in obj_ring.devs - if dev and dev['replication_ip'] in ips and - dev['replication_port'] == self.port]: + if (dev + and dev['replication_ip'] in ips + and dev['replication_port'] == self.port + and (override_devices is None + or dev['device'] in override_devices))]: dev_path = join(self.devices_dir, local_dev['device']) obj_path = join(dev_path, data_dir) tmp_path = join(dev_path, get_tmp_dir(int(policy))) @@ -430,6 +435,10 @@ class ObjectReplicator(Daemon): self.logger.exception('ERROR creating %s' % obj_path) continue for partition in os.listdir(obj_path): + if (override_partitions is not None + and partition not in override_partitions): + continue + try: job_path = join(obj_path, partition) part_nodes = obj_ring.get_part_nodes(int(partition)) @@ -445,17 +454,26 @@ class ObjectReplicator(Daemon): object_ring=obj_ring)) except ValueError: continue + return jobs - def collect_jobs(self): + def collect_jobs(self, override_devices=None, override_partitions=None): """ Returns a sorted list of jobs (dictionaries) that specify the partitions, nodes, etc to be rsynced. + + :param override_devices: if set, only jobs on these devices + will be returned + :param override_partitions: if set, only jobs on these partitions + will be returned + """ jobs = [] ips = whataremyips() for policy in POLICIES: # may need to branch here for future policy types - self.process_repl(policy, jobs, ips) + jobs += self.process_repl(policy, ips, + override_devices=override_devices, + override_partitions=override_partitions) random.shuffle(jobs) if self.handoffs_first: # Move the handoff parts to the front of the list @@ -473,24 +491,15 @@ class ObjectReplicator(Daemon): self.last_replication_count = -1 self.partition_times = [] - if override_devices is None: - override_devices = [] - if override_partitions is None: - override_partitions = [] - stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) eventlet.sleep() # Give spawns a cycle try: self.run_pool = GreenPool(size=self.concurrency) - jobs = self.collect_jobs() + jobs = self.collect_jobs(override_devices=override_devices, + override_partitions=override_partitions) for job in jobs: - if override_devices and job['device'] not in override_devices: - continue - if override_partitions and \ - job['partition'] not in override_partitions: - continue dev_path = join(self.devices_dir, job['device']) if self.mount_check and not ismount(dev_path): self.logger.warn(_('%s is not mounted'), job['device']) @@ -527,8 +536,14 @@ class ObjectReplicator(Daemon): def run_once(self, *args, **kwargs): start = time.time() self.logger.info(_("Running object replicator in script mode.")) + override_devices = list_from_csv(kwargs.get('devices')) override_partitions = list_from_csv(kwargs.get('partitions')) + if not override_devices: + override_devices = None + if not override_partitions: + override_partitions = None + self.replicate( override_devices=override_devices, override_partitions=override_partitions)