From b9ae377eab9c7ceba4f5909cd1f4e804bf3a5b8f Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Tue, 2 Sep 2014 15:46:16 +0100 Subject: [PATCH] Check for change before container replicator updates db As described in the related bug report, unnecessary updates to the container db during replication can impact object object GET performance in certain circumstances. This patch changes swift/container/replicator.py so that calls to merge_timestamps and update_reconciler_sync are conditional on values having actually changed. Related-Bug: 1332025 Change-Id: If498251656500ed7a3d7ca4b109ea1079b8513c2 --- swift/container/replicator.py | 13 ++-- test/unit/container/test_replicator.py | 92 ++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/swift/container/replicator.py b/swift/container/replicator.py index aed6042637..39cfc39c74 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -63,15 +63,18 @@ class ContainerReplicator(db_replicator.Replicator): broker.set_storage_policy_index( remote_info['storage_policy_index'], timestamp=status_changed_at.internal) - broker.merge_timestamps(*(remote_info[key] for key in ( - 'created_at', 'put_timestamp', 'delete_timestamp'))) + sync_timestamps = ('created_at', 'put_timestamp', + 'delete_timestamp') + if any(info[key] != remote_info[key] for key in sync_timestamps): + broker.merge_timestamps(*(remote_info[key] for key in + sync_timestamps)) rv = parent._handle_sync_response( node, response, info, broker, http) return rv def find_local_handoff_for_part(self, part): """ - Look through devices in the ring for the first handoff devie that was + Look through devices in the ring for the first handoff device that was identified during job creation as available on this node. :returns: a node entry from the ring @@ -179,10 +182,10 @@ class ContainerReplicator(db_replicator.Replicator): def _post_replicate_hook(self, broker, info, responses): if info['account'] == MISPLACED_OBJECTS_ACCOUNT: return - if not broker.has_multiple_policies(): + point = broker.get_reconciler_sync() + if not broker.has_multiple_policies() and info['max_row'] != point: broker.update_reconciler_sync(info['max_row']) return - point = broker.get_reconciler_sync() max_sync = self.dump_to_reconciler(broker, point) success = responses.count(True) >= quorum_size(len(responses)) if max_sync > point and success: diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 74cf7734a4..6438dc288a 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -31,6 +31,7 @@ from swift.common.storage_policy import POLICIES from test.unit.common import test_db_replicator from test.unit import patch_policies +from contextlib import contextmanager class TestReplicator(unittest.TestCase): @@ -341,6 +342,62 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): remote_info['status_changed_at'], info['status_changed_at'])) + @contextmanager + def _wrap_merge_timestamps(self, broker, calls): + def fake_merge_timestamps(*args, **kwargs): + calls.append(args[0]) + orig_merge_timestamps(*args, **kwargs) + + orig_merge_timestamps = broker.merge_timestamps + broker.merge_timestamps = fake_merge_timestamps + try: + yield True + finally: + broker.merge_timestamps = orig_merge_timestamps + + def test_sync_merge_timestamps(self): + ts = (Timestamp(t).internal for t in + itertools.count(int(time.time()))) + # setup a local container + broker = self._get_broker('a', 'c', node_index=0) + put_timestamp = ts.next() + broker.initialize(put_timestamp, POLICIES.default.idx) + # setup remote container + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_put_timestamp = ts.next() + remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx) + # replicate, expect call to merge_timestamps on remote and local + daemon = replicator.ContainerReplicator({}) + part, node = self._get_broker_part_node(remote_broker) + info = broker.get_replication_info() + local_calls = [] + remote_calls = [] + with self._wrap_merge_timestamps(broker, local_calls): + with self._wrap_merge_timestamps(broker, remote_calls): + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + self.assertEqual(1, len(remote_calls)) + self.assertEqual(1, len(local_calls)) + self.assertEqual(remote_put_timestamp, + broker.get_info()['put_timestamp']) + self.assertEqual(remote_put_timestamp, + remote_broker.get_info()['put_timestamp']) + + # replicate again, no changes so expect no calls to merge_timestamps + info = broker.get_replication_info() + local_calls = [] + remote_calls = [] + with self._wrap_merge_timestamps(broker, local_calls): + with self._wrap_merge_timestamps(broker, remote_calls): + success = daemon._repl_to_node(node, broker, part, info) + self.assertTrue(success) + self.assertEqual(0, len(remote_calls)) + self.assertEqual(0, len(local_calls)) + self.assertEqual(remote_put_timestamp, + broker.get_info()['put_timestamp']) + self.assertEqual(remote_put_timestamp, + remote_broker.get_info()['put_timestamp']) + def test_sync_bogus_db_quarantines(self): ts = (Timestamp(t).internal for t in itertools.count(int(time.time()))) @@ -851,6 +908,41 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): a, c, name = path.lstrip('/').split('/') self.assertEqual(most_recent_items[name], timestamp) + @contextmanager + def _wrap_update_reconciler_sync(self, broker, calls): + def wrapper_function(*args, **kwargs): + calls.append(args) + orig_function(*args, **kwargs) + + orig_function = broker.update_reconciler_sync + broker.update_reconciler_sync = wrapper_function + try: + yield True + finally: + broker.update_reconciler_sync = orig_function + + def test_post_replicate_hook(self): + ts = (Timestamp(t).internal for t in + itertools.count(int(time.time()))) + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(ts.next(), 0) + broker.put_object('foo', ts.next(), 0, 'text/plain', 'xyz', deleted=0, + storage_policy_index=0) + info = broker.get_replication_info() + self.assertEqual(1, info['max_row']) + self.assertEqual(-1, broker.get_reconciler_sync()) + daemon = replicator.ContainerReplicator({}) + calls = [] + with self._wrap_update_reconciler_sync(broker, calls): + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(1, len(calls)) + # repeated call to _post_replicate_hook with no change to info + # should not call update_reconciler_sync + calls = [] + with self._wrap_update_reconciler_sync(broker, calls): + daemon._post_replicate_hook(broker, info, []) + self.assertEqual(0, len(calls)) + if __name__ == '__main__': unittest.main()