diff --git a/swift/container/replicator.py b/swift/container/replicator.py index aed6042637..39cfc39c74 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -63,15 +63,18 @@ class ContainerReplicator(db_replicator.Replicator): broker.set_storage_policy_index( remote_info['storage_policy_index'], timestamp=status_changed_at.internal) - broker.merge_timestamps(*(remote_info[key] for key in ( - 'created_at', 'put_timestamp', 'delete_timestamp'))) + sync_timestamps = ('created_at', 'put_timestamp', + 'delete_timestamp') + if any(info[key] != remote_info[key] for key in sync_timestamps): + broker.merge_timestamps(*(remote_info[key] for key in + sync_timestamps)) rv = parent._handle_sync_response( node, response, info, broker, http) return rv def find_local_handoff_for_part(self, part): """ - Look through devices in the ring for the first handoff devie that was + Look through devices in the ring for the first handoff device that was identified during job creation as available on this node. :returns: a node entry from the ring @@ -179,10 +182,10 @@ class ContainerReplicator(db_replicator.Replicator): def _post_replicate_hook(self, broker, info, responses): if info['account'] == MISPLACED_OBJECTS_ACCOUNT: return - if not broker.has_multiple_policies(): + point = broker.get_reconciler_sync() + if not broker.has_multiple_policies() and info['max_row'] != point: broker.update_reconciler_sync(info['max_row']) return - point = broker.get_reconciler_sync() max_sync = self.dump_to_reconciler(broker, point) success = responses.count(True) >= quorum_size(len(responses)) if max_sync > point and success: diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 74cf7734a4..6438dc288a 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -31,6 +31,7 @@ from swift.common.storage_policy import POLICIES from test.unit.common import test_db_replicator from test.unit import patch_policies +from contextlib import contextmanager class TestReplicator(unittest.TestCase): @@ -341,6 +342,62 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): remote_info['status_changed_at'], info['status_changed_at'])) + @contextmanager + def _wrap_merge_timestamps(self, broker, calls): + def fake_merge_timestamps(*args, **kwargs): + calls.append(args[0]) + orig_merge_timestamps(*args, **kwargs) + + orig_merge_timestamps = broker.merge_timestamps + broker.merge_timestamps = fake_merge_timestamps + try: + yield True + finally: + broker.merge_timestamps = orig_merge_timestamps + + def test_sync_merge_timestamps(self): + ts = (Timestamp(t).internal for t in + itertools.count(int(time.time()))) + # setup a local container + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = ts.next() + broker.initialize(put_timestamp, POLICIES.default.idx) + # setup remote container + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_put_timestamp = ts.next() + remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx) + # replicate, expect call to merge_timestamps on remote and local + daemon = replicator.ContainerReplicator({}) + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + local_calls = [] + remote_calls = [] + with self._wrap_merge_timestamps(broker, local_calls): + with self._wrap_merge_timestamps(broker, remote_calls): + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + self.assertEqual(1, len(remote_calls)) + self.assertEqual(1, len(local_calls)) + self.assertEqual(remote_put_timestamp, + broker.get_info()['put_timestamp']) + self.assertEqual(remote_put_timestamp, + remote_broker.get_info()['put_timestamp']) + + # replicate again, no changes so expect no calls to merge_timestamps + info = broker.get_replication_info() + local_calls = [] + remote_calls = [] + with self._wrap_merge_timestamps(broker, local_calls): + with self._wrap_merge_timestamps(broker, remote_calls): + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + self.assertEqual(0, len(remote_calls)) + self.assertEqual(0, len(local_calls)) + self.assertEqual(remote_put_timestamp, + broker.get_info()['put_timestamp']) + self.assertEqual(remote_put_timestamp, + remote_broker.get_info()['put_timestamp']) + def test_sync_bogus_db_quarantines(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) @@ -851,6 +908,41 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): a, c, name = path.lstrip('/').split('/') self.assertEqual(most_recent_items[name], timestamp) + @contextmanager + def _wrap_update_reconciler_sync(self, broker, calls): + def wrapper_function(*args, **kwargs): + calls.append(args) + orig_function(*args, **kwargs) + + orig_function = broker.update_reconciler_sync + broker.update_reconciler_sync = wrapper_function + try: + yield True + finally: + broker.update_reconciler_sync = orig_function + + def test_post_replicate_hook(self): + ts = (Timestamp(t).internal for t in + itertools.count(int(time.time()))) + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(ts.next(), 0) + broker.put_object('foo', ts.next(), 0, 'text/plain', 'xyz', deleted=0, + storage_policy_index=0) + info = broker.get_replication_info() + self.assertEqual(1, info['max_row']) + self.assertEqual(-1, broker.get_reconciler_sync()) + daemon = replicator.ContainerReplicator({}) + calls = [] + with self._wrap_update_reconciler_sync(broker, calls): + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(1, len(calls)) + # repeated call to _post_replicate_hook with no change to info + # should not call update_reconciler_sync + calls = [] + with self._wrap_update_reconciler_sync(broker, calls): + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, len(calls)) + if __name__ == '__main__': unittest.main()