diff --git a/bin/swift-account-replicator b/bin/swift-account-replicator index fec6d49ef7..072b6e0315 100755 --- a/bin/swift-account-replicator +++ b/bin/swift-account-replicator @@ -14,10 +14,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import optparse + from swift.account.replicator import AccountReplicator from swift.common.utils import parse_options from swift.common.daemon import run_daemon if __name__ == '__main__': - conf_file, options = parse_options(once=True) + parser = optparse.OptionParser("%prog CONFIG [options]") + parser.add_option('-d', '--devices', + help=('Replicate only given devices. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.')) + parser.add_option('-p', '--partitions', + help=('Replicate only given partitions. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.')) + conf_file, options = parse_options(parser=parser, once=True) run_daemon(AccountReplicator, conf_file, **options) diff --git a/bin/swift-container-replicator b/bin/swift-container-replicator index b3b235ef9a..d1990216c5 100755 --- a/bin/swift-container-replicator +++ b/bin/swift-container-replicator @@ -14,10 +14,21 @@ # See the License for the specific language governing permissions and # limitations under the License. +import optparse + from swift.container.replicator import ContainerReplicator from swift.common.utils import parse_options from swift.common.daemon import run_daemon if __name__ == '__main__': - conf_file, options = parse_options(once=True) + parser = optparse.OptionParser("%prog CONFIG [options]") + parser.add_option('-d', '--devices', + help=('Replicate only given devices. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.')) + parser.add_option('-p', '--partitions', + help=('Replicate only given partitions. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.')) + conf_file, options = parse_options(parser=parser, once=True) run_daemon(ContainerReplicator, conf_file, **options) diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 8002ff0e96..93c94537e6 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -33,7 +33,7 @@ from swift.common.direct_client import quote from swift.common.utils import get_logger, whataremyips, storage_directory, \ renamer, mkdirs, lock_parent_directory, config_true_value, \ unlink_older_than, dump_recon_cache, rsync_module_interpolation, \ - json, Timestamp + json, Timestamp, list_from_csv from swift.common import ring from swift.common.ring.utils import is_local_device from swift.common.http import HTTP_NOT_FOUND, HTTP_INSUFFICIENT_STORAGE @@ -47,6 +47,20 @@ from swift.common.swob import Response, HTTPNotFound, HTTPNoContent, \ DEBUG_TIMINGS_THRESHOLD = 10 +def parse_overrides(daemon_kwargs): + devices = list_from_csv(daemon_kwargs.get('devices', '')) + if not devices: + devices = Everything() + + partitions = [ + int(part) for part in + list_from_csv(daemon_kwargs.get('partitions', ''))] + if not partitions: + partitions = Everything() + + return devices, partitions + + def quarantine_db(object_file, server_type): """ In the case that a corrupt file is found, move it to a quarantined area to @@ -93,8 +107,7 @@ def roundrobin_datadirs(datadirs): def walk_datadir(datadir, node_id, part_filter): partitions = [pd for pd in os.listdir(datadir) - if looks_like_partition(pd) - and (part_filter is None or part_filter(pd))] + if looks_like_partition(pd) and part_filter(pd)] random.shuffle(partitions) for partition in partitions: part_dir = os.path.join(datadir, partition) @@ -136,6 +149,15 @@ def roundrobin_datadirs(datadirs): its.remove(it) +class Everything(object): + """ + A container that contains everything. If "e" is an instance of + Everything, then "x in e" is true for all x. + """ + def __contains__(self, element): + return True + + class ReplConnection(BufferedHTTPConnection): """ Helper to simplify REPLICATEing to a remote server. @@ -636,12 +658,22 @@ class Replicator(Daemon): return match.groups()[0] return "UNKNOWN" - def handoffs_only_filter(self, device_id): + def _partition_dir_filter(self, device_id, handoffs_only, + partitions_to_replicate): + def filt(partition_dir): partition = int(partition_dir) - primary_node_ids = [ - d['id'] for d in self.ring.get_part_nodes(partition)] - return device_id not in primary_node_ids + if handoffs_only: + primary_node_ids = [ + d['id'] for d in self.ring.get_part_nodes(partition)] + if device_id in primary_node_ids: + return False + + if partition not in partitions_to_replicate: + return False + + return True + return filt def report_up_to_date(self, full_info): @@ -649,6 +681,8 @@ class Replicator(Daemon): def run_once(self, *args, **kwargs): """Run a replication pass once.""" + devices_to_replicate, partitions_to_replicate = parse_overrides(kwargs) + self._zero_stats() dirs = [] ips = whataremyips(self.bind_ip) @@ -678,15 +712,21 @@ class Replicator(Daemon): self.logger.warning( _('Skipping %(device)s as it is not mounted') % node) continue + if node['device'] not in devices_to_replicate: + self.logger.debug( + 'Skipping device %s due to given arguments', + node['device']) + continue unlink_older_than( os.path.join(self.root, node['device'], 'tmp'), time.time() - self.reclaim_age) datadir = os.path.join(self.root, node['device'], self.datadir) if os.path.isdir(datadir): self._local_device_ids.add(node['id']) - filt = (self.handoffs_only_filter(node['id']) - if self.handoffs_only else None) - dirs.append((datadir, node['id'], filt)) + part_filt = self._partition_dir_filter( + node['id'], self.handoffs_only, + partitions_to_replicate) + dirs.append((datadir, node['id'], part_filt)) if not found_local: self.logger.error("Can't find itself %s with port %s in ring " "file, not replicating", diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index c2a12576e3..7c4143d641 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -1221,7 +1221,7 @@ class TestDBReplicator(unittest.TestCase): node_id = 1 results = list(db_replicator.roundrobin_datadirs( - [(datadir, node_id, None)])) + [(datadir, node_id, lambda p: True)])) expected = [ ('450', os.path.join(datadir, db_path), node_id), ] @@ -1243,13 +1243,13 @@ class TestDBReplicator(unittest.TestCase): set(os.listdir(datadir))) results = list(db_replicator.roundrobin_datadirs( - [(datadir, node_id, None)])) + [(datadir, node_id, lambda p: True)])) self.assertEqual(results, expected) self.assertEqual({'1054', '1060', '450'}, set(os.listdir(datadir))) results = list(db_replicator.roundrobin_datadirs( - [(datadir, node_id, None)])) + [(datadir, node_id, lambda p: True)])) self.assertEqual(results, expected) # non db file in '1060' dir is not deleted and exception is handled self.assertEqual({'1060', '450'}, @@ -1336,8 +1336,8 @@ class TestDBReplicator(unittest.TestCase): mock.patch(base + 'random.shuffle', _shuffle), \ mock.patch(base + 'os.rmdir', _rmdir): - datadirs = [('/srv/node/sda/containers', 1, None), - ('/srv/node/sdb/containers', 2, None)] + datadirs = [('/srv/node/sda/containers', 1, lambda p: True), + ('/srv/node/sdb/containers', 2, lambda p: True)] results = list(db_replicator.roundrobin_datadirs(datadirs)) # The results show that the .db files are returned, the devices # interleaved. @@ -1584,6 +1584,71 @@ class TestHandoffsOnly(unittest.TestCase): 'bcbcbcbc15d3835053d568c57e2c83b5', 'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)]) + def test_override_partitions(self): + replicator = TestReplicator({ + 'devices': self.root, + 'bind_port': 6201, + 'mount_check': 'no', + }) + + with patch.object(db_replicator, 'whataremyips', + return_value=['10.0.0.1']), \ + patch.object(replicator, '_replicate_object') as mock_repl, \ + patch.object(replicator, 'ring', self.FakeRing3Nodes()): + replicator.run_once(partitions="0,2") + + self.assertEqual(sorted(mock_repl.mock_calls), [ + mock.call('0', os.path.join( + self.root, 'sdp', 'containers', '0', '220', + '010101013cf2b7979af9eaa71cb67220', + '010101013cf2b7979af9eaa71cb67220.db'), 0), + mock.call('2', os.path.join( + self.root, 'sdq', 'containers', '2', '3b5', + 'bcbcbcbc15d3835053d568c57e2c83b5', + 'bcbcbcbc15d3835053d568c57e2c83b5.db'), 1)]) + + def test_override_devices(self): + replicator = TestReplicator({ + 'devices': self.root, + 'bind_port': 6201, + 'mount_check': 'no', + }) + + with patch.object(db_replicator, 'whataremyips', + return_value=['10.0.0.1']), \ + patch.object(replicator, '_replicate_object') as mock_repl, \ + patch.object(replicator, 'ring', self.FakeRing3Nodes()): + replicator.run_once(devices="sdp") + + self.assertEqual(sorted(mock_repl.mock_calls), [ + mock.call('0', os.path.join( + self.root, 'sdp', 'containers', '0', '220', + '010101013cf2b7979af9eaa71cb67220', + '010101013cf2b7979af9eaa71cb67220.db'), 0), + mock.call('1', os.path.join( + self.root, 'sdp', 'containers', '1', '98d', + 'abababab2b5368158355e799323b498d', + 'abababab2b5368158355e799323b498d.db'), 0)]) + + def test_override_devices_and_partitions(self): + replicator = TestReplicator({ + 'devices': self.root, + 'bind_port': 6201, + 'mount_check': 'no', + }) + + with patch.object(db_replicator, 'whataremyips', + return_value=['10.0.0.1']), \ + patch.object(replicator, '_replicate_object') as mock_repl, \ + patch.object(replicator, 'ring', self.FakeRing3Nodes()): + replicator.run_once(partitions="0,2", devices="sdp") + + self.assertEqual(sorted(mock_repl.mock_calls), [ + mock.call('0', os.path.join( + self.root, 'sdp', 'containers', '0', '220', + '010101013cf2b7979af9eaa71cb67220', + '010101013cf2b7979af9eaa71cb67220.db'), 0)]) + class TestReplToNode(unittest.TestCase): def setUp(self):