Merge "use diskfile in ssync_sender tests"
This commit is contained in:
commit
30c0c086f9
@ -13,26 +13,36 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import os
|
||||||
import StringIO
|
import StringIO
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from swift.common import exceptions
|
from swift.common import exceptions, utils
|
||||||
from swift.obj import ssync_sender
|
from swift.obj import ssync_sender, diskfile
|
||||||
|
|
||||||
|
from test.unit import DebugLogger
|
||||||
|
|
||||||
|
|
||||||
class FakeReplicator(object):
|
class FakeReplicator(object):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, testdir):
|
||||||
self.logger = mock.MagicMock()
|
self.logger = mock.MagicMock()
|
||||||
self.conn_timeout = 1
|
self.conn_timeout = 1
|
||||||
self.node_timeout = 2
|
self.node_timeout = 2
|
||||||
self.http_timeout = 3
|
self.http_timeout = 3
|
||||||
self.network_chunk_size = 65536
|
self.network_chunk_size = 65536
|
||||||
self.disk_chunk_size = 4096
|
self.disk_chunk_size = 4096
|
||||||
self._diskfile_mgr = mock.MagicMock()
|
conf = {
|
||||||
|
'devices': testdir,
|
||||||
|
'mount_check': 'false',
|
||||||
|
}
|
||||||
|
self._diskfile_mgr = diskfile.DiskFileManager(conf, DebugLogger())
|
||||||
|
|
||||||
|
|
||||||
class NullBufferedHTTPConnection(object):
|
class NullBufferedHTTPConnection(object):
|
||||||
@ -82,9 +92,33 @@ class FakeConnection(object):
|
|||||||
class TestSender(unittest.TestCase):
|
class TestSender(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.replicator = FakeReplicator()
|
self.testdir = os.path.join(
|
||||||
|
tempfile.mkdtemp(), 'tmp_test_ssync_sender')
|
||||||
|
self.replicator = FakeReplicator(self.testdir)
|
||||||
self.sender = ssync_sender.Sender(self.replicator, None, None, None)
|
self.sender = ssync_sender.Sender(self.replicator, None, None, None)
|
||||||
|
|
||||||
|
def _make_open_diskfile(self, device='dev', partition='9',
|
||||||
|
account='a', container='c', obj='o', body='test',
|
||||||
|
extra_metadata=None):
|
||||||
|
object_parts = account, container, obj
|
||||||
|
req_timestamp = utils.normalize_timestamp(time.time())
|
||||||
|
df = self.sender.daemon._diskfile_mgr.get_diskfile(device, partition,
|
||||||
|
*object_parts)
|
||||||
|
content_length = len(body)
|
||||||
|
etag = hashlib.md5(body).hexdigest()
|
||||||
|
with df.create() as writer:
|
||||||
|
writer.write(body)
|
||||||
|
metadata = {
|
||||||
|
'X-Timestamp': req_timestamp,
|
||||||
|
'Content-Length': content_length,
|
||||||
|
'ETag': etag,
|
||||||
|
}
|
||||||
|
if extra_metadata:
|
||||||
|
metadata.update(extra_metadata)
|
||||||
|
writer.put(metadata)
|
||||||
|
df.open()
|
||||||
|
return df
|
||||||
|
|
||||||
def test_call_catches_MessageTimeout(self):
|
def test_call_catches_MessageTimeout(self):
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
@ -527,26 +561,26 @@ class TestSender(unittest.TestCase):
|
|||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_is_deleted(self):
|
def test_updates_is_deleted(self):
|
||||||
|
device = 'dev'
|
||||||
|
part = '9'
|
||||||
|
object_parts = ('a', 'c', 'o')
|
||||||
|
df = self._make_open_diskfile(device, part, *object_parts)
|
||||||
|
object_hash = utils.hash_path(*object_parts)
|
||||||
|
delete_timestamp = utils.normalize_timestamp(time.time())
|
||||||
|
df.delete(delete_timestamp)
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
self.sender.job = {'device': 'dev', 'partition': '9'}
|
self.sender.job = {'device': device, 'partition': part}
|
||||||
self.sender.node = {}
|
self.sender.node = {}
|
||||||
self.sender.send_list = ['0123abc']
|
self.sender.send_list = [object_hash]
|
||||||
self.sender.send_delete = mock.MagicMock()
|
self.sender.send_delete = mock.MagicMock()
|
||||||
self.sender.send_put = mock.MagicMock()
|
self.sender.send_put = mock.MagicMock()
|
||||||
self.sender.response = FakeResponse(
|
self.sender.response = FakeResponse(
|
||||||
chunk_body=(
|
chunk_body=(
|
||||||
':UPDATES: START\r\n'
|
':UPDATES: START\r\n'
|
||||||
':UPDATES: END\r\n'))
|
':UPDATES: END\r\n'))
|
||||||
df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash()
|
|
||||||
df.account = 'a'
|
|
||||||
df.container = 'c'
|
|
||||||
df.obj = 'o'
|
|
||||||
dfdel = exceptions.DiskFileDeleted()
|
|
||||||
dfdel.timestamp = '1381679759.90941'
|
|
||||||
df.open.side_effect = dfdel
|
|
||||||
self.sender.updates()
|
self.sender.updates()
|
||||||
self.sender.send_delete.assert_called_once_with(
|
self.sender.send_delete.assert_called_once_with(
|
||||||
'/a/c/o', '1381679759.90941')
|
'/a/c/o', delete_timestamp)
|
||||||
self.assertEqual(self.sender.send_put.mock_calls, [])
|
self.assertEqual(self.sender.send_put.mock_calls, [])
|
||||||
# note that the delete line isn't actually sent since we mock
|
# note that the delete line isn't actually sent since we mock
|
||||||
# send_delete; send_delete is tested separately.
|
# send_delete; send_delete is tested separately.
|
||||||
@ -556,25 +590,30 @@ class TestSender(unittest.TestCase):
|
|||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_put(self):
|
def test_updates_put(self):
|
||||||
|
device = 'dev'
|
||||||
|
part = '9'
|
||||||
|
object_parts = ('a', 'c', 'o')
|
||||||
|
df = self._make_open_diskfile(device, part, *object_parts)
|
||||||
|
object_hash = utils.hash_path(*object_parts)
|
||||||
|
expected = df.get_metadata()
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
self.sender.job = {'device': 'dev', 'partition': '9'}
|
self.sender.job = {'device': device, 'partition': part}
|
||||||
self.sender.node = {}
|
self.sender.node = {}
|
||||||
self.sender.send_list = ['0123abc']
|
self.sender.send_list = [object_hash]
|
||||||
df = mock.MagicMock()
|
|
||||||
df.get_metadata.return_value = {'Content-Length': 123}
|
|
||||||
self.sender.send_delete = mock.MagicMock()
|
self.sender.send_delete = mock.MagicMock()
|
||||||
self.sender.send_put = mock.MagicMock()
|
self.sender.send_put = mock.MagicMock()
|
||||||
self.sender.response = FakeResponse(
|
self.sender.response = FakeResponse(
|
||||||
chunk_body=(
|
chunk_body=(
|
||||||
':UPDATES: START\r\n'
|
':UPDATES: START\r\n'
|
||||||
':UPDATES: END\r\n'))
|
':UPDATES: END\r\n'))
|
||||||
df = self.sender.daemon._diskfile_mgr.get_diskfile_from_hash()
|
|
||||||
df.account = 'a'
|
|
||||||
df.container = 'c'
|
|
||||||
df.obj = 'o'
|
|
||||||
self.sender.updates()
|
self.sender.updates()
|
||||||
self.assertEqual(self.sender.send_delete.mock_calls, [])
|
self.assertEqual(self.sender.send_delete.mock_calls, [])
|
||||||
self.sender.send_put.assert_called_once_with('/a/c/o', df)
|
self.assertEqual(1, len(self.sender.send_put.mock_calls))
|
||||||
|
args, _kwargs = self.sender.send_put.call_args
|
||||||
|
path, df = args
|
||||||
|
self.assertEqual(path, '/a/c/o')
|
||||||
|
self.assert_(isinstance(df, diskfile.DiskFile))
|
||||||
|
self.assertEqual(expected, df.get_metadata())
|
||||||
# note that the put line isn't actually sent since we mock send_put;
|
# note that the put line isn't actually sent since we mock send_put;
|
||||||
# send_put is tested separately.
|
# send_put is tested separately.
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -711,18 +750,11 @@ class TestSender(unittest.TestCase):
|
|||||||
'\r\n\r\n')
|
'\r\n\r\n')
|
||||||
|
|
||||||
def test_send_put_initial_timeout(self):
|
def test_send_put_initial_timeout(self):
|
||||||
|
df = self._make_open_diskfile()
|
||||||
|
df._disk_chunk_size = 2
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
df = mock.MagicMock()
|
|
||||||
self.sender.connection.send = lambda d: eventlet.sleep(1)
|
self.sender.connection.send = lambda d: eventlet.sleep(1)
|
||||||
self.sender.daemon.node_timeout = 0.01
|
self.sender.daemon.node_timeout = 0.01
|
||||||
df.get_metadata.return_value = {
|
|
||||||
'name': '/a/c/o',
|
|
||||||
'X-Timestamp': '1381679759.90941',
|
|
||||||
'Content-Length': '3',
|
|
||||||
'Etag': '900150983cd24fb0d6963f7d28e17f72',
|
|
||||||
'Some-Other-Header': 'value'}
|
|
||||||
df.content_length = 3
|
|
||||||
df.__iter__ = lambda x: iter(['ab', 'c'])
|
|
||||||
exc = None
|
exc = None
|
||||||
try:
|
try:
|
||||||
self.sender.send_put('/a/c/o', df)
|
self.sender.send_put('/a/c/o', df)
|
||||||
@ -731,22 +763,20 @@ class TestSender(unittest.TestCase):
|
|||||||
self.assertEqual(str(exc), '0.01 seconds: send_put')
|
self.assertEqual(str(exc), '0.01 seconds: send_put')
|
||||||
|
|
||||||
def test_send_put_chunk_timeout(self):
|
def test_send_put_chunk_timeout(self):
|
||||||
|
df = self._make_open_diskfile()
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
df = mock.MagicMock()
|
|
||||||
self.sender.daemon.node_timeout = 0.01
|
self.sender.daemon.node_timeout = 0.01
|
||||||
df.get_metadata.return_value = {
|
|
||||||
'name': '/a/c/o',
|
|
||||||
'X-Timestamp': '1381679759.90941',
|
|
||||||
'Content-Length': '3',
|
|
||||||
'Etag': '900150983cd24fb0d6963f7d28e17f72',
|
|
||||||
'Some-Other-Header': 'value'}
|
|
||||||
|
|
||||||
def iterator(dontcare):
|
one_shot = [None]
|
||||||
self.sender.connection.send = lambda d: eventlet.sleep(1)
|
|
||||||
return iter(['ab', 'c'])
|
def mock_send(data):
|
||||||
|
try:
|
||||||
|
one_shot.pop()
|
||||||
|
except IndexError:
|
||||||
|
eventlet.sleep(1)
|
||||||
|
|
||||||
|
self.sender.connection.send = mock_send
|
||||||
|
|
||||||
df.content_length = 3
|
|
||||||
df.reader().__iter__ = iterator
|
|
||||||
exc = None
|
exc = None
|
||||||
try:
|
try:
|
||||||
self.sender.send_put('/a/c/o', df)
|
self.sender.send_put('/a/c/o', df)
|
||||||
@ -755,31 +785,27 @@ class TestSender(unittest.TestCase):
|
|||||||
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
|
self.assertEqual(str(exc), '0.01 seconds: send_put chunk')
|
||||||
|
|
||||||
def test_send_put(self):
|
def test_send_put(self):
|
||||||
|
body = 'test'
|
||||||
|
extra_metadata = {'Some-Other-Header': 'value'}
|
||||||
|
df = self._make_open_diskfile(body=body,
|
||||||
|
extra_metadata=extra_metadata)
|
||||||
|
expected = dict(df.get_metadata())
|
||||||
|
expected['body'] = body
|
||||||
|
expected['chunk_size'] = len(body)
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
df = mock.MagicMock()
|
|
||||||
df.get_metadata.return_value = {
|
|
||||||
'name': '/a/c/o',
|
|
||||||
'X-Timestamp': '1381679759.90941',
|
|
||||||
'Content-Length': '3',
|
|
||||||
'Etag': '900150983cd24fb0d6963f7d28e17f72',
|
|
||||||
'Some-Other-Header': 'value'}
|
|
||||||
df.content_length = 3
|
|
||||||
df.reader().__iter__ = lambda x: iter(['ab', 'c'])
|
|
||||||
self.sender.send_put('/a/c/o', df)
|
self.sender.send_put('/a/c/o', df)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(self.sender.connection.sent),
|
''.join(self.sender.connection.sent),
|
||||||
'82\r\n'
|
'82\r\n'
|
||||||
'PUT /a/c/o\r\n'
|
'PUT /a/c/o\r\n'
|
||||||
'Content-Length: 3\r\n'
|
'Content-Length: %(Content-Length)s\r\n'
|
||||||
'Etag: 900150983cd24fb0d6963f7d28e17f72\r\n'
|
'ETag: %(ETag)s\r\n'
|
||||||
'Some-Other-Header: value\r\n'
|
'Some-Other-Header: value\r\n'
|
||||||
'X-Timestamp: 1381679759.90941\r\n'
|
'X-Timestamp: %(X-Timestamp)s\r\n'
|
||||||
'\r\n'
|
'\r\n'
|
||||||
'\r\n'
|
'\r\n'
|
||||||
'2\r\n'
|
'%(chunk_size)s\r\n'
|
||||||
'ab\r\n'
|
'%(body)s\r\n' % expected)
|
||||||
'1\r\n'
|
|
||||||
'c\r\n')
|
|
||||||
|
|
||||||
def test_disconnect_timeout(self):
|
def test_disconnect_timeout(self):
|
||||||
self.sender.connection = FakeConnection()
|
self.sender.connection = FakeConnection()
|
||||||
|
Loading…
Reference in New Issue
Block a user