Make db_replicator usync smaller containers

The current rule inside the db_replicator is to rsync+merge
containers during replication if the difference between rowids
differ by more than 50%:

  # if the difference in rowids between the two differs by
  # more than 50%, rsync then do a remote merge.
  if rinfo['max_row'] / float(info['max_row']) < 0.5:

This mean on smaller containers, that only have few rows, and differ
by a small number still rsync+merge rather then copying rows.

This change adds a new condition, the difference in the rowids must
be greater than the defined per_diff otherwise usync will be used:

  # if the difference in rowids between the two differs by
  # more than 50% and the difference is greater than per_diff,
  # rsync then do a remote merge.
  # NOTE: difference > per_diff stops us from dropping to rsync
  # on smaller containers, who have only a few rows to sync.
  if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
          info['max_row'] - rinfo['max_row'] > self.per_diff:

Change-Id: I9e779f71bf37714919a525404565dd075762b0d4
Closes-bug: #1019712
This commit is contained in:
Matthew Oliver 2015-10-09 17:25:08 +11:00 committed by Alistair Coles
parent 70a52e6149
commit 4a13dcc4a8
4 changed files with 33 additions and 9 deletions

View File

@ -434,8 +434,12 @@ class Replicator(Daemon):
if self._in_sync(rinfo, info, broker, local_sync): if self._in_sync(rinfo, info, broker, local_sync):
return True return True
# if the difference in rowids between the two differs by # if the difference in rowids between the two differs by
# more than 50%, rsync then do a remote merge. # more than 50% and the difference is greater than per_diff,
if rinfo['max_row'] / float(info['max_row']) < 0.5: # rsync then do a remote merge.
# NOTE: difference > per_diff stops us from dropping to rsync
# on smaller containers, who have only a few rows to sync.
if rinfo['max_row'] / float(info['max_row']) < 0.5 and \
info['max_row'] - rinfo['max_row'] > self.per_diff:
self.stats['remote_merge'] += 1 self.stats['remote_merge'] += 1
self.logger.increment('remote_merges') self.logger.increment('remote_merges')
return self._rsync_db(broker, node, http, info['id'], return self._rsync_db(broker, node, http, info['id'],

View File

@ -78,7 +78,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
broker.put_container('/a/c', time.time(), 0, 0, 0, broker.put_container('/a/c', time.time(), 0, 0, 0,
POLICIES.default.idx) POLICIES.default.idx)
# replicate # replicate
daemon = replicator.AccountReplicator({}) daemon = replicator.AccountReplicator({'per_diff': 1})
def _rsync_file(db_file, remote_file, **kwargs): def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1) remote_server, remote_path = remote_file.split('/', 1)

View File

@ -1187,9 +1187,9 @@ class TestReplToNode(unittest.TestCase):
db_replicator.ring = FakeRing() db_replicator.ring = FakeRing()
self.delete_db_calls = [] self.delete_db_calls = []
self.broker = FakeBroker() self.broker = FakeBroker()
self.replicator = TestReplicator({}) self.replicator = TestReplicator({'per_diff': 10})
self.fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000} self.fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
self.fake_info = {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'b', self.fake_info = {'id': 'a', 'point': -1, 'max_row': 20, 'hash': 'b',
'created_at': 100, 'put_timestamp': 0, 'created_at': 100, 'put_timestamp': 0,
'delete_timestamp': 0, 'count': 0, 'delete_timestamp': 0, 'count': 0,
'metadata': { 'metadata': {
@ -1201,7 +1201,7 @@ class TestReplToNode(unittest.TestCase):
self.replicator._http_connect = lambda *args: self.http self.replicator._http_connect = lambda *args: self.http
def test_repl_to_node_usync_success(self): def test_repl_to_node_usync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 5, "hash": "c"} rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo)) self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync() local_sync = self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node( self.assertEqual(self.replicator._repl_to_node(
@ -1212,7 +1212,7 @@ class TestReplToNode(unittest.TestCase):
]) ])
def test_repl_to_node_rsync_success(self): def test_repl_to_node_rsync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 4, "hash": "c"} rinfo = {"id": 3, "point": -1, "max_row": 9, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo)) self.http = ReplHttp(simplejson.dumps(rinfo))
self.broker.get_sync() self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node( self.assertEqual(self.replicator._repl_to_node(
@ -1229,7 +1229,7 @@ class TestReplToNode(unittest.TestCase):
]) ])
def test_repl_to_node_already_in_sync(self): def test_repl_to_node_already_in_sync(self):
rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "b"} rinfo = {"id": 3, "point": -1, "max_row": 20, "hash": "b"}
self.http = ReplHttp(simplejson.dumps(rinfo)) self.http = ReplHttp(simplejson.dumps(rinfo))
self.broker.get_sync() self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node( self.assertEqual(self.replicator._repl_to_node(
@ -1266,6 +1266,26 @@ class TestReplToNode(unittest.TestCase):
self.assertEqual(self.replicator._repl_to_node( self.assertEqual(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info), False) self.fake_node, FakeBroker(), '0', self.fake_info), False)
def test_repl_to_node_small_container_always_usync(self):
# Tests that a small container that is > 50% out of sync will
# still use usync.
rinfo = {"id": 3, "point": -1, "hash": "c"}
# Turn per_diff back to swift's default.
self.replicator.per_diff = 1000
for r, l in ((5, 20), (40, 100), (450, 1000), (550, 1500)):
rinfo['max_row'] = r
self.fake_info['max_row'] = l
self.replicator._usync_db = mock.Mock(return_value=True)
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEqual(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.replicator._usync_db.assert_has_calls([
mock.call(max(rinfo['point'], local_sync), self.broker,
self.http, rinfo['id'], self.fake_info['id'])
])
class FakeHTTPResponse(object): class FakeHTTPResponse(object):

View File

@ -192,7 +192,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
storage_policy_index=broker.storage_policy_index) storage_policy_index=broker.storage_policy_index)
# replicate # replicate
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'} node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({}) daemon = replicator.ContainerReplicator({'per_diff': 1})
def _rsync_file(db_file, remote_file, **kwargs): def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1) remote_server, remote_path = remote_file.split('/', 1)