Merge "Allow per-policy overrides in object replicator."

This commit is contained in:
Jenkins 2015-02-03 21:58:32 +00:00 committed by Gerrit Code Review
commit 830315efec
3 changed files with 41 additions and 9 deletions

View File

@ -27,5 +27,8 @@ if __name__ == '__main__':
parser.add_option('-p', '--partitions', parser.add_option('-p', '--partitions',
help='Replicate only given partitions. ' help='Replicate only given partitions. '
'Comma-separated list') 'Comma-separated list')
parser.add_option('-i', '--policies',
help='Replicate only given policy indices. '
'Comma-separated list')
conf_file, options = parse_options(parser=parser, once=True) conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ObjectReplicator, conf_file, **options) run_daemon(ObjectReplicator, conf_file, **options)

View File

@ -456,7 +456,8 @@ class ObjectReplicator(Daemon):
continue continue
return jobs return jobs
def collect_jobs(self, override_devices=None, override_partitions=None): def collect_jobs(self, override_devices=None, override_partitions=None,
override_policies=None):
""" """
Returns a sorted list of jobs (dictionaries) that specify the Returns a sorted list of jobs (dictionaries) that specify the
partitions, nodes, etc to be rsynced. partitions, nodes, etc to be rsynced.
@ -465,11 +466,15 @@ class ObjectReplicator(Daemon):
will be returned will be returned
:param override_partitions: if set, only jobs on these partitions :param override_partitions: if set, only jobs on these partitions
will be returned will be returned
:param override_policies: if set, only jobs in these storage
policies will be returned
""" """
jobs = [] jobs = []
ips = whataremyips() ips = whataremyips()
for policy in POLICIES: for policy in POLICIES:
if (override_policies is not None
and str(policy.idx) not in override_policies):
continue
# may need to branch here for future policy types # may need to branch here for future policy types
jobs += self.process_repl(policy, ips, jobs += self.process_repl(policy, ips,
override_devices=override_devices, override_devices=override_devices,
@ -481,7 +486,8 @@ class ObjectReplicator(Daemon):
self.job_count = len(jobs) self.job_count = len(jobs)
return jobs return jobs
def replicate(self, override_devices=None, override_partitions=None): def replicate(self, override_devices=None, override_partitions=None,
override_policies=None):
"""Run a replication pass""" """Run a replication pass"""
self.start = time.time() self.start = time.time()
self.suffix_count = 0 self.suffix_count = 0
@ -498,7 +504,8 @@ class ObjectReplicator(Daemon):
try: try:
self.run_pool = GreenPool(size=self.concurrency) self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs(override_devices=override_devices, jobs = self.collect_jobs(override_devices=override_devices,
override_partitions=override_partitions) override_partitions=override_partitions,
override_policies=override_policies)
for job in jobs: for job in jobs:
dev_path = join(self.devices_dir, job['device']) dev_path = join(self.devices_dir, job['device'])
if self.mount_check and not ismount(dev_path): if self.mount_check and not ismount(dev_path):
@ -539,14 +546,18 @@ class ObjectReplicator(Daemon):
override_devices = list_from_csv(kwargs.get('devices')) override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions')) override_partitions = list_from_csv(kwargs.get('partitions'))
override_policies = list_from_csv(kwargs.get('policies'))
if not override_devices: if not override_devices:
override_devices = None override_devices = None
if not override_partitions: if not override_partitions:
override_partitions = None override_partitions = None
if not override_policies:
override_policies = None
self.replicate( self.replicate(
override_devices=override_devices, override_devices=override_devices,
override_partitions=override_partitions) override_partitions=override_partitions,
override_policies=override_policies)
total = (time.time() - start) / 60 total = (time.time() - start) / 60
self.logger.info( self.logger.info(
_("Object replication complete (once). (%.02f minutes)"), total) _("Object replication complete (once). (%.02f minutes)"), total)

View File

@ -261,8 +261,7 @@ class TestObjectReplicator(unittest.TestCase):
def blowup_mkdirs(path): def blowup_mkdirs(path):
raise OSError('Ow!') raise OSError('Ow!')
mkdirs_orig = object_replicator.mkdirs with mock.patch.object(object_replicator, 'mkdirs', blowup_mkdirs):
try:
rmtree(self.objects, ignore_errors=1) rmtree(self.objects, ignore_errors=1)
object_replicator.mkdirs = blowup_mkdirs object_replicator.mkdirs = blowup_mkdirs
self.replicator.collect_jobs() self.replicator.collect_jobs()
@ -275,8 +274,6 @@ class TestObjectReplicator(unittest.TestCase):
self.assertTrue(exc_args[0].startswith('ERROR creating ')) self.assertTrue(exc_args[0].startswith('ERROR creating '))
self.assertEquals(exc_kwargs, {}) self.assertEquals(exc_kwargs, {})
self.assertEquals(exc_str, 'Ow!') self.assertEquals(exc_str, 'Ow!')
finally:
object_replicator.mkdirs = mkdirs_orig
def test_collect_jobs(self): def test_collect_jobs(self):
jobs = self.replicator.collect_jobs() jobs = self.replicator.collect_jobs()
@ -546,6 +543,27 @@ class TestObjectReplicator(unittest.TestCase):
override_partitions=['1']) override_partitions=['1'])
self.assertFalse(os.access(part_path, os.F_OK)) self.assertFalse(os.access(part_path, os.F_OK))
def test_delete_policy_override_params(self):
df0 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o')
df1 = self.df_mgr.get_diskfile('sda', '99', 'a', 'c', 'o',
policy_idx=1)
mkdirs(df0._datadir)
mkdirs(df1._datadir)
pol0_part_path = os.path.join(self.objects, '99')
pol1_part_path = os.path.join(self.objects_1, '99')
# sanity checks
self.assertTrue(os.access(pol0_part_path, os.F_OK))
self.assertTrue(os.access(pol1_part_path, os.F_OK))
# a bogus policy index doesn't bother the replicator any more than a
# bogus device or partition does
self.replicator.run_once(policies='1,2,5')
self.assertFalse(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.access(pol0_part_path, os.F_OK))
def test_run_once_recover_from_failure(self): def test_run_once_recover_from_failure(self):
conf = dict(swift_dir=self.testdir, devices=self.devices, conf = dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1') mount_check='false', timeout='300', stats_interval='1')