Add unit case

"op" has three judgments in ReplicatorRpc.dispatch, rsync_then_merge,complete_rsync
and other. But it only has two times judgments,including rsync_then_merge and other
in test_REPLICATE_works, so I add complete_rsync in test_account and test_container

Change-Id: I8277b556062dd6b30bf85dedd636d56517f10d8d
This commit is contained in:
zheng yin 2016-08-03 17:11:25 +08:00
parent d819ae00a5
commit b1b410902d
2 changed files with 52 additions and 2 deletions

View File

@ -171,7 +171,7 @@ class TestAccountController(unittest.TestCase):
resp = req.get_response(self.account_controller)
self.assertEqual(resp.status_int, 507)
def test_REPLICATE_works(self):
def test_REPLICATE_rsync_then_merge_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'account', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('account', 'p', 'a'),
@ -192,6 +192,32 @@ class TestAccountController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
def test_REPLICATE_complete_rsync_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'account', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('account', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
def fake_complete_rsync(self, drive, db_file, args):
return HTTPNoContent()
# check complete_rsync
with mock.patch("swift.common.db_replicator.ReplicatorRpc."
"complete_rsync", fake_complete_rsync):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["complete_rsync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
def test_REPLICATE_value_error_works(self):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
# check valuerror
wsgi_input_valuerror = '["sync" : sync, "-1"]'
inbuf1 = WsgiBytesIO(wsgi_input_valuerror)

View File

@ -1268,7 +1268,7 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.container_controller)
self.assertEqual(resp.status_int, 507)
def test_REPLICATE_works(self):
def test_REPLICATE_rsync_then_merge_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'containers', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('containers', 'p', 'a'),
@ -1289,6 +1289,30 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
def test_REPLICATE_complete_rsync_works(self):
mkdirs(os.path.join(self.testdir, 'sda1', 'containers', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('containers', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
def fake_complete_rsync(self, drive, db_file, args):
return HTTPNoContent()
with mock.patch("swift.container.replicator.ContainerReplicatorRpc."
"complete_rsync", fake_complete_rsync):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["complete_rsync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 204)
def test_REPLICATE_value_error_works(self):
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
# check valuerror
wsgi_input_valuerror = '["sync" : sync, "-1"]'
inbuf1 = WsgiBytesIO(wsgi_input_valuerror)