More Test[Account|Container]Controller test updates

Added another test that tries to replicate using an unknown sync method,
with and without an existing DB file.

Change-Id: I09a86615f012e4341268ea30cbc06099528f896b
This commit is contained in:
Christian Schwede 2016-08-31 12:49:07 +00:00
parent eadab846b3
commit 2a58b1efa4
2 changed files with 53 additions and 2 deletions

View File

@ -33,7 +33,8 @@ from swift import __version__ as swift_version
from swift.common.swob import (Request, WsgiBytesIO, HTTPNoContent) from swift.common.swob import (Request, WsgiBytesIO, HTTPNoContent)
from swift.common import constraints from swift.common import constraints
from swift.account.server import AccountController from swift.account.server import AccountController
from swift.common.utils import (normalize_timestamp, replication, public) from swift.common.utils import (normalize_timestamp, replication, public,
mkdirs, storage_directory)
from swift.common.request_helpers import get_sys_meta_prefix from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies, debug_logger from test.unit import patch_policies, debug_logger
from swift.common.storage_policy import StoragePolicy, POLICIES from swift.common.storage_policy import StoragePolicy, POLICIES
@ -212,6 +213,31 @@ class TestAccountController(unittest.TestCase):
resp = req.get_response(self.controller) resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 400) self.assertEqual(resp.status_int, 400)
def test_REPLICATE_unknown_sync(self):
# First without existing DB file
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["unknown_sync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
mkdirs(os.path.join(self.testdir, 'sda1', 'accounts', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('accounts', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["unknown_sync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 500)
def test_HEAD_not_found(self): def test_HEAD_not_found(self):
# Test the case in which account does not exist (can be recreated) # Test the case in which account does not exist (can be recreated)
req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'})

View File

@ -40,7 +40,7 @@ import swift.container
from swift.container import server as container_server from swift.container import server as container_server
from swift.common import constraints from swift.common import constraints
from swift.common.utils import (Timestamp, mkdirs, public, replication, from swift.common.utils import (Timestamp, mkdirs, public, replication,
lock_parent_directory) storage_directory, lock_parent_directory)
from test.unit import fake_http_connect, debug_logger from test.unit import fake_http_connect, debug_logger
from swift.common.storage_policy import (POLICIES, StoragePolicy) from swift.common.storage_policy import (POLICIES, StoragePolicy)
from swift.common.request_helpers import get_sys_meta_prefix from swift.common.request_helpers import get_sys_meta_prefix
@ -1308,6 +1308,31 @@ class TestContainerController(unittest.TestCase):
resp = req.get_response(self.controller) resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 400) self.assertEqual(resp.status_int, 400)
def test_REPLICATE_unknown_sync(self):
# First without existing DB file
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["unknown_sync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 404)
mkdirs(os.path.join(self.testdir, 'sda1', 'containers', 'p', 'a', 'a'))
db_file = os.path.join(self.testdir, 'sda1',
storage_directory('containers', 'p', 'a'),
'a' + '.db')
open(db_file, 'w')
req = Request.blank('/sda1/p/a/',
environ={'REQUEST_METHOD': 'REPLICATE'},
headers={})
json_string = '["unknown_sync", "a.db"]'
inbuf = WsgiBytesIO(json_string)
req.environ['wsgi.input'] = inbuf
resp = req.get_response(self.controller)
self.assertEqual(resp.status_int, 500)
def test_DELETE(self): def test_DELETE(self):
req = Request.blank( req = Request.blank(
'/sda1/p/a/c', '/sda1/p/a/c',