Merge "Refactor and add tests for db_replicator"

This commit is contained in:
Jenkins 2013-07-19 01:21:54 +00:00 committed by Gerrit Code Review
commit bbaf490887

View File

@ -22,10 +22,13 @@ import math
from mock import patch
from shutil import rmtree
from tempfile import mkdtemp, NamedTemporaryFile
import mock
import simplejson
from swift.common import db_replicator
from swift.common.utils import normalize_timestamp
from swift.container import server as container_server
from swift.common.exceptions import DriveNotMounted
from test.unit import FakeLogger
@ -74,6 +77,12 @@ class FakeRingWithNodes:
meta=''
), dict(
id=4, weight=10.0, zone=4, ip='1.1.1.4', port=6000, device='sdb',
meta=''
), dict(
id=5, weight=10.0, zone=5, ip='1.1.1.5', port=6000, device='sdb',
meta=''
), dict(
id=6, weight=10.0, zone=6, ip='1.1.1.6', port=6000, device='sdb',
meta='')]
def __init__(self, path, reload_time=15, ring_name=None):
@ -118,8 +127,9 @@ def _mock_process(*args):
class ReplHttp:
def __init__(self, response=None):
def __init__(self, response=None, set_status=200):
self.response = response
self.set_status = set_status
replicated = False
host = 'localhost'
@ -127,7 +137,7 @@ class ReplHttp:
self.replicated = True
class Response:
status = 200
status = self.set_status
data = self.response
def read(innerself):
@ -381,7 +391,7 @@ class TestDBReplicator(unittest.TestCase):
def test_in_sync(self):
replicator = TestReplicator({})
self.assertEquals(replicator._in_sync(
{'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b'},
{'id': 'a', 'point': 0, 'max_row': 0, 'hash': 'b'},
{'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b'},
FakeBroker(), -1), True)
self.assertEquals(replicator._in_sync(
@ -402,25 +412,12 @@ class TestDBReplicator(unittest.TestCase):
replicator = TestReplicator({})
replicator._usync_db(0, FakeBroker(), fake_http, '12345', '67890')
def test_repl_to_node(self):
replicator = TestReplicator({})
fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
fake_info = {'id': 'a', 'point': -1, 'max_row': 0, 'hash': 'b',
'created_at': 100, 'put_timestamp': 0,
'delete_timestamp': 0,
'metadata': {'Test': ('Value', normalize_timestamp(1))}}
replicator._http_connect = lambda *args: ReplHttp(
'{"id": 3, "point": -1}')
self.assertEquals(replicator._repl_to_node(
fake_node, FakeBroker(), '0', fake_info), True)
def test_stats(self):
# I'm not sure how to test that this logs the right thing,
# but we can at least make sure it gets covered.
replicator = TestReplicator({})
replicator._zero_stats()
replicator._report_stats()
def test_replicate_object(self):
db_replicator.ring = FakeRingWithNodes()
replicator = TestReplicator({})
@ -503,6 +500,7 @@ class TestDBReplicator(unittest.TestCase):
[(('Found /path/to/file for /a%20c%20t/c%20o%20n when it should '
'be on partition 0; will replicate out and remove.',), {})])
def test_delete_db(self):
db_replicator.lock_parent_directory = lock_parent_directory
replicator = TestReplicator({})
@ -753,6 +751,105 @@ class TestDBReplicator(unittest.TestCase):
db_replicator.os.path.exists = orig_exists
db_replicator.random.shuffle = orig_shuffle
@mock.patch("swift.common.db_replicator.ReplConnection", mock.Mock())
def test_http_connect(self):
node = "node"
partition = "partition"
db_file = __file__
replicator = TestReplicator({})
replicator._http_connect(node, partition, db_file)
db_replicator.ReplConnection.assert_has_calls(
mock.call(node, partition,
os.path.basename(db_file).split('.', 1)[0],
replicator.logger))
class TestReplToNode(unittest.TestCase):
def setUp(self):
db_replicator.ring = FakeRing()
self.delete_db_calls = []
self.broker = FakeBroker()
self.replicator = TestReplicator({})
self.fake_node = {'ip': '127.0.0.1', 'device': 'sda1', 'port': 1000}
self.fake_info = {'id': 'a', 'point': -1, 'max_row': 10, 'hash': 'b',
'created_at': 100, 'put_timestamp': 0,
'delete_timestamp': 0, 'count': 0,
'metadata': {'Test': ('Value', normalize_timestamp(1))}}
self.replicator.logger= mock.Mock()
self.replicator._rsync_db = mock.Mock(return_value=True)
self.replicator._usync_db = mock.Mock(return_value=True)
self.http = ReplHttp('{"id": 3, "point": -1}')
self.replicator._http_connect = lambda *args: self.http
def test_repl_to_node_usync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 5, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.replicator._usync_db.assert_has_calls([
mock.call(max(rinfo['point'], local_sync), self.broker,
self.http, rinfo['id'], self.fake_info['id'])
])
def test_repl_to_node_rsync_success(self):
rinfo = {"id": 3, "point": -1, "max_row": 4, "hash": "c"}
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.replicator.logger.increment.assert_has_calls([
mock.call.increment('remote_merges')
])
self.replicator._rsync_db.assert_has_calls([
mock.call(self.broker, self.fake_node, self.http, self.fake_info['id'],
replicate_method='rsync_then_merge',
replicate_timeout=(self.fake_info['count'] / 2000))
])
def test_repl_to_node_already_in_sync(self):
rinfo = {"id": 3, "point": -1, "max_row": 10, "hash": "b"}
self.http = ReplHttp(simplejson.dumps(rinfo))
local_sync = self.broker.get_sync()
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.assertEquals(self.replicator._rsync_db.call_count, 0)
self.assertEquals(self.replicator._usync_db.call_count, 0)
def test_repl_to_node_not_found(self):
self.http = ReplHttp('{"id": 3, "point": -1}', set_status=404)
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, self.broker, '0', self.fake_info), True)
self.replicator.logger.increment.assert_has_calls([
mock.call.increment('rsyncs')
])
self.replicator._rsync_db.assert_has_calls([
mock.call(self.broker, self.fake_node, self.http, self.fake_info['id'])
])
def test_repl_to_node_drive_not_mounted(self):
self.http = ReplHttp('{"id": 3, "point": -1}', set_status=507)
self.assertRaises(DriveNotMounted, self.replicator._repl_to_node,
self.fake_node, FakeBroker(), '0', self.fake_info)
def test_repl_to_node_300_status(self):
self.http = ReplHttp('{"id": 3, "point": -1}', set_status=300)
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info), None)
def test_repl_to_node_http_connect_fails(self):
self.replicator._http_connect = lambda *args: None
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info), False)
def test_repl_to_node_not_response(self):
self.http = mock.Mock(replicate=mock.Mock(return_value=None))
self.assertEquals(self.replicator._repl_to_node(
self.fake_node, FakeBroker(), '0', self.fake_info), False)
if __name__ == '__main__':
unittest.main()