SSYNC: Stop sharing a global response

Change-Id: Ia431d20e1132cc139ac067d66d5d1626ec07117f
This commit is contained in:
Romain LE DISEZ 2018-10-22 11:32:36 +02:00
parent e4ad56abb1
commit 6b94cf204a
3 changed files with 196 additions and 176 deletions

View File

@ -69,6 +69,61 @@ def decode_wanted(parts):
return wanted
class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object):
def __init__(self, *args, **kwargs):
super(SsyncBufferedHTTPResponse, self).__init__(*args, **kwargs)
self.ssync_response_buffer = ''
self.ssync_response_chunk_left = 0
def readline(self, size=1024):
"""
Reads a line from the SSYNC response body.
httplib has no readline and will block on read(x) until x is
read, so we have to do the work ourselves. A bit of this is
taken from Python's httplib itself.
"""
data = self.ssync_response_buffer
self.ssync_response_buffer = ''
while '\n' not in data and len(data) < size:
if self.ssync_response_chunk_left == -1: # EOF-already indicator
break
if self.ssync_response_chunk_left == 0:
line = self.fp.readline()
i = line.find(';')
if i >= 0:
line = line[:i] # strip chunk-extensions
try:
self.ssync_response_chunk_left = int(line.strip(), 16)
except ValueError:
# close the connection as protocol synchronisation is
# probably lost
self.close()
raise exceptions.ReplicationException('Early disconnect')
if self.ssync_response_chunk_left == 0:
self.ssync_response_chunk_left = -1
break
chunk = self.fp.read(min(self.ssync_response_chunk_left,
size - len(data)))
if not chunk:
# close the connection as protocol synchronisation is
# probably lost
self.close()
raise exceptions.ReplicationException('Early disconnect')
self.ssync_response_chunk_left -= len(chunk)
if self.ssync_response_chunk_left == 0:
self.fp.read(2) # discard the trailing \r\n
data += chunk
if '\n' in data:
data, self.ssync_response_buffer = data.split('\n', 1)
data += '\n'
return data
class SsyncBufferedHTTPConnection(bufferedhttp.BufferedHTTPConnection):
response_class = SsyncBufferedHTTPResponse
class Sender(object):
"""
Sends SSYNC requests to the object server.
@ -84,9 +139,6 @@ class Sender(object):
self.node = node
self.job = job
self.suffixes = suffixes
self.response = None
self.response_buffer = ''
self.response_chunk_left = 0
# available_map has an entry for each object in given suffixes that
# is available to be sync'd; each entry is a hash => dict of timestamps
# of data file or tombstone file and/or meta file
@ -110,7 +162,7 @@ class Sender(object):
"""
if not self.suffixes:
return True, {}
connection = None
connection = response = None
try:
# Double try blocks in case our main error handler fails.
try:
@ -119,10 +171,10 @@ class Sender(object):
# exceptions.ReplicationException for common issues that will
# abort the replication attempt and log a simple error. All
# other exceptions will be logged with a full stack trace.
connection = self.connect()
self.missing_check(connection)
connection, response = self.connect()
self.missing_check(connection, response)
if self.remote_check_objs is None:
self.updates(connection)
self.updates(connection, response)
can_delete_obj = self.available_map
else:
# when we are initialized with remote_check_objs we don't
@ -167,10 +219,10 @@ class Sender(object):
Establishes a connection and starts an SSYNC request
with the object server.
"""
connection = None
connection = response = None
with exceptions.MessageTimeout(
self.daemon.conn_timeout, 'connect send'):
connection = bufferedhttp.BufferedHTTPConnection(
connection = SsyncBufferedHTTPConnection(
'%s:%s' % (self.node['replication_ip'],
self.node['replication_port']))
connection.putrequest('SSYNC', '/%s/%s' % (
@ -196,60 +248,15 @@ class Sender(object):
connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):
self.response = connection.getresponse()
if self.response.status != http.HTTP_OK:
err_msg = self.response.read()[:1024]
response = connection.getresponse()
if response.status != http.HTTP_OK:
err_msg = response.read()[:1024]
raise exceptions.ReplicationException(
'Expected status %s; got %s (%s)' %
(http.HTTP_OK, self.response.status, err_msg))
return connection
(http.HTTP_OK, response.status, err_msg))
return connection, response
def readline(self):
"""
Reads a line from the SSYNC response body.
httplib has no readline and will block on read(x) until x is
read, so we have to do the work ourselves. A bit of this is
taken from Python's httplib itself.
"""
data = self.response_buffer
self.response_buffer = ''
while '\n' not in data and len(data) < self.daemon.network_chunk_size:
if self.response_chunk_left == -1: # EOF-already indicator
break
if self.response_chunk_left == 0:
line = self.response.fp.readline()
i = line.find(';')
if i >= 0:
line = line[:i] # strip chunk-extensions
try:
self.response_chunk_left = int(line.strip(), 16)
except ValueError:
# close the connection as protocol synchronisation is
# probably lost
self.response.close()
raise exceptions.ReplicationException('Early disconnect')
if self.response_chunk_left == 0:
self.response_chunk_left = -1
break
chunk = self.response.fp.read(min(
self.response_chunk_left,
self.daemon.network_chunk_size - len(data)))
if not chunk:
# close the connection as protocol synchronisation is
# probably lost
self.response.close()
raise exceptions.ReplicationException('Early disconnect')
self.response_chunk_left -= len(chunk)
if self.response_chunk_left == 0:
self.response.fp.read(2) # discard the trailing \r\n
data += chunk
if '\n' in data:
data, self.response_buffer = data.split('\n', 1)
data += '\n'
return data
def missing_check(self, connection):
def missing_check(self, connection, response):
"""
Handles the sender-side of the MISSING_CHECK step of a
SSYNC request.
@ -286,7 +293,7 @@ class Sender(object):
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'missing_check start wait'):
line = self.readline()
line = response.readline(size=self.daemon.network_chunk_size)
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
@ -298,7 +305,7 @@ class Sender(object):
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'missing_check line wait'):
line = self.readline()
line = response.readline(size=self.daemon.network_chunk_size)
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
@ -308,7 +315,7 @@ class Sender(object):
if parts:
self.send_map[parts[0]] = decode_wanted(parts[1:])
def updates(self, connection):
def updates(self, connection, response):
"""
Handles the sender-side of the UPDATES step of an SSYNC
request.
@ -362,7 +369,7 @@ class Sender(object):
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'updates start wait'):
line = self.readline()
line = response.readline(size=self.daemon.network_chunk_size)
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()
@ -374,7 +381,7 @@ class Sender(object):
while True:
with exceptions.MessageTimeout(
self.daemon.http_timeout, 'updates line wait'):
line = self.readline()
line = response.readline(size=self.daemon.network_chunk_size)
if not line:
raise exceptions.ReplicationException('Early disconnect')
line = line.strip()

View File

@ -99,8 +99,8 @@ class TestBaseSsync(BaseTest):
return wrapped_send
def make_readline_wrapper(readline):
def wrapped_readline():
data = readline()
def wrapped_readline(size=1024):
data = readline(size=size)
add_trace('rx', data)
bytes_read = trace.setdefault('readline_bytes', 0)
trace['readline_bytes'] = bytes_read + len(data)
@ -108,11 +108,11 @@ class TestBaseSsync(BaseTest):
return wrapped_readline
def wrapped_connect():
connection = orig_connect()
connection, response = orig_connect()
connection.send = make_send_wrapper(
connection.send)
sender.readline = make_readline_wrapper(sender.readline)
return connection
response.readline = make_readline_wrapper(response.readline)
return connection, response
return wrapped_connect, trace
def _get_object_data(self, path, **kwargs):

View File

@ -52,7 +52,7 @@ class NullBufferedHTTPConnection(object):
pass
class FakeResponse(object):
class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
def __init__(self, chunk_body=''):
self.status = 200
@ -60,6 +60,8 @@ class FakeResponse(object):
if chunk_body:
self.fp = six.StringIO(
'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body))
self.ssync_response_buffer = ''
self.ssync_response_chunk_left = 0
def read(self, *args, **kwargs):
return ''
@ -163,8 +165,10 @@ class TestSender(BaseTest):
def test_call_calls_others(self):
connection = FakeConnection()
response = FakeResponse()
self.sender.suffixes = ['abc']
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.missing_check = mock.MagicMock()
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
@ -172,8 +176,8 @@ class TestSender(BaseTest):
self.assertTrue(success)
self.assertEqual(candidates, {})
self.sender.connect.assert_called_once_with()
self.sender.missing_check.assert_called_once_with(connection)
self.sender.updates.assert_called_once_with(connection)
self.sender.missing_check.assert_called_once_with(connection, response)
self.sender.updates.assert_called_once_with(connection, response)
self.sender.disconnect.assert_called_once_with(connection)
def test_connect(self):
@ -183,7 +187,7 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
@ -217,7 +221,7 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
@ -251,7 +255,7 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
@ -285,7 +289,7 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
@ -320,7 +324,7 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
self.sender.suffixes = ['abc']
with mock.patch(
'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection'
'swift.obj.ssync_sender.SsyncBufferedHTTPConnection'
) as mock_conn_class:
mock_conn = mock_conn_class.return_value
mock_resp = mock.MagicMock()
@ -350,7 +354,9 @@ class TestSender(BaseTest):
def test_call(self):
def patch_sender(sender):
connection = FakeConnection()
sender.connect = mock.MagicMock(return_value=connection)
response = FakeResponse()
sender.connect = mock.MagicMock(return_value=(connection,
response))
sender.missing_check = mock.MagicMock()
sender.updates = mock.MagicMock()
sender.disconnect = mock.MagicMock()
@ -439,7 +445,7 @@ class TestSender(BaseTest):
'frag_index': 0,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc\r\n'
@ -448,7 +454,8 @@ class TestSender(BaseTest):
':UPDATES: END\r\n'
))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
df = mock.MagicMock()
df.content_length = 0
self.sender.df_mgr.get_diskfile_from_hash = mock.MagicMock(
@ -485,13 +492,14 @@ class TestSender(BaseTest):
'frag_index': 0,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@ -519,12 +527,13 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, None, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
connection = FakeConnection()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@ -552,13 +561,14 @@ class TestSender(BaseTest):
self.sender = ssync_sender.Sender(self.daemon, {}, job, ['abc'],
['9d41d8cd98f00b204e9800998ecf0abc'])
connection = FakeConnection()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.connect = mock.MagicMock(return_value=connection)
self.sender.connect = mock.MagicMock(return_value=(connection,
response))
self.sender.updates = mock.MagicMock()
self.sender.disconnect = mock.MagicMock()
success, candidates = self.sender()
@ -602,7 +612,7 @@ class TestSender(BaseTest):
eventlet.sleep(0.1)
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
ssync_sender, 'SsyncBufferedHTTPConnection',
FakeBufferedHTTPConnection):
success, candidates = self.sender()
self.assertFalse(success)
@ -628,7 +638,7 @@ class TestSender(BaseTest):
missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check'
with mock.patch(missing_check_fn) as mock_missing_check:
with mock.patch.object(
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
ssync_sender, 'SsyncBufferedHTTPConnection',
FakeBufferedHTTPConnection):
self.sender = ssync_sender.Sender(
self.daemon, node, job, ['abc'])
@ -644,63 +654,65 @@ class TestSender(BaseTest):
self.assertFalse(mock_missing_check.called)
def test_readline_newline_in_buffer(self):
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
self.assertEqual(self.sender.readline(), 'Has a newline already.\r\n')
self.assertEqual(self.sender.response_buffer, 'Okay.')
response = FakeResponse()
response.ssync_response_buffer = 'Has a newline already.\r\nOkay.'
self.assertEqual(response.readline(), 'Has a newline already.\r\n')
self.assertEqual(response.ssync_response_buffer, 'Okay.')
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
self.daemon.network_chunk_size = 2
self.sender.response_buffer = '1234567890'
self.assertEqual(self.sender.readline(), '1234567890')
self.assertEqual(self.sender.response_buffer, '')
response = FakeResponse()
response.ssync_response_buffer = '1234567890'
self.assertEqual(response.readline(size=2), '1234567890')
self.assertEqual(response.ssync_response_buffer, '')
def test_readline_at_start_of_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO('2\r\nx\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
response = FakeResponse()
response.fp = six.StringIO('2\r\nx\n\r\n')
self.assertEqual(response.readline(), 'x\n')
def test_readline_chunk_with_extension(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO(
response = FakeResponse()
response.fp = six.StringIO(
'2 ; chunk=extension\r\nx\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
self.assertEqual(response.readline(), 'x\n')
def test_readline_broken_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO('q\r\nx\n\r\n')
response = FakeResponse()
response.fp = six.StringIO('q\r\nx\n\r\n')
self.assertRaises(
exceptions.ReplicationException, self.sender.readline)
self.assertTrue(self.sender.response.close_called)
exceptions.ReplicationException, response.readline)
self.assertTrue(response.close_called)
def test_readline_terminated_chunk(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO('b\r\nnot enough')
response = FakeResponse()
response.fp = six.StringIO('b\r\nnot enough')
self.assertRaises(
exceptions.ReplicationException, self.sender.readline)
self.assertTrue(self.sender.response.close_called)
exceptions.ReplicationException, response.readline)
self.assertTrue(response.close_called)
def test_readline_all(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO('2\r\nx\n\r\n0\r\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
self.assertEqual(self.sender.readline(), '')
self.assertEqual(self.sender.readline(), '')
response = FakeResponse()
response.fp = six.StringIO('2\r\nx\n\r\n0\r\n\r\n')
self.assertEqual(response.readline(), 'x\n')
self.assertEqual(response.readline(), '')
self.assertEqual(response.readline(), '')
def test_readline_all_trailing_not_newline_termed(self):
self.sender.response = FakeResponse()
self.sender.response.fp = six.StringIO(
response = FakeResponse()
response.fp = six.StringIO(
'2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n')
self.assertEqual(self.sender.readline(), 'x\n')
self.assertEqual(self.sender.readline(), '123')
self.assertEqual(self.sender.readline(), '')
self.assertEqual(self.sender.readline(), '')
self.assertEqual(response.readline(), 'x\n')
self.assertEqual(response.readline(), '123')
self.assertEqual(response.readline(), '')
self.assertEqual(response.readline(), '')
def test_missing_check_timeout(self):
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
response = FakeResponse()
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.missing_check,
connection)
connection, response)
def test_missing_check_has_empty_suffixes(self):
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
@ -719,12 +731,12 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
@ -761,12 +773,12 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc', 'def']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
@ -809,10 +821,10 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='\r\n')
response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -846,11 +858,11 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=':MISSING_CHECK: START\r\n')
exc = None
try:
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -884,10 +896,10 @@ class TestSender(BaseTest):
}
self.sender.suffixes = ['abc']
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
response = FakeResponse(chunk_body='OH HAI\r\n')
exc = None
try:
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
@ -920,13 +932,13 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'0123abc dm\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
self.assertEqual(
''.join(connection.sent),
'17\r\n:MISSING_CHECK: START\r\n\r\n'
@ -960,13 +972,13 @@ class TestSender(BaseTest):
'policy': POLICIES.legacy,
}
self.sender.suffixes = ['abc']
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':MISSING_CHECK: START\r\n'
'0123abc d extra response parts\r\n'
':MISSING_CHECK: END\r\n'))
self.sender.df_mgr.yield_hashes = yield_hashes
self.sender.missing_check(connection)
self.sender.missing_check(connection, response)
self.assertEqual(self.sender.send_map,
{'0123abc': {'data': True}})
self.assertEqual(self.sender.available_map,
@ -976,18 +988,19 @@ class TestSender(BaseTest):
def test_updates_timeout(self):
connection = FakeConnection()
connection.send = lambda d: eventlet.sleep(1)
response = FakeResponse()
self.sender.daemon.node_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
connection, response)
def test_updates_empty_send_map(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.assertEqual(
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
@ -996,14 +1009,14 @@ class TestSender(BaseTest):
def test_updates_unexpected_response_lines1(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
'abc\r\n'
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
@ -1015,14 +1028,14 @@ class TestSender(BaseTest):
def test_updates_unexpected_response_lines2(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'abc\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'abc'")
@ -1050,11 +1063,11 @@ class TestSender(BaseTest):
self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.sender.send_delete.assert_called_once_with(
connection, '/a/c/o', delete_timestamp)
self.assertEqual(self.sender.send_put.mock_calls, [])
@ -1082,11 +1095,11 @@ class TestSender(BaseTest):
}
self.sender.node = {}
self.sender.send_map = {object_hash: {'data': True}}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.assertEqual(
''.join(connection.sent),
'11\r\n:UPDATES: START\r\n\r\n'
@ -1125,11 +1138,11 @@ class TestSender(BaseTest):
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_post.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
@ -1172,11 +1185,11 @@ class TestSender(BaseTest):
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(self.sender.send_put.mock_calls, [])
self.assertEqual(1, len(self.sender.send_post.mock_calls))
@ -1219,11 +1232,11 @@ class TestSender(BaseTest):
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.send_post = mock.MagicMock()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
self.assertEqual(self.sender.send_delete.mock_calls, [])
self.assertEqual(1, len(self.sender.send_put.mock_calls))
self.assertEqual(1, len(self.sender.send_post.mock_calls))
@ -1262,11 +1275,11 @@ class TestSender(BaseTest):
self.sender.send_map = {object_hash: {'data': True}}
self.sender.send_delete = mock.MagicMock()
self.sender.send_put = mock.MagicMock()
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
self.sender.updates(connection)
self.sender.updates(connection, response)
args, _kwargs = self.sender.send_put.call_args
connection, path, df = args
self.assertEqual(path, '/a/c/o')
@ -1279,28 +1292,28 @@ class TestSender(BaseTest):
def test_updates_read_response_timeout_start(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
orig_readline = self.sender.readline
orig_readline = response.readline
def delayed_readline():
def delayed_readline(*args, **kwargs):
eventlet.sleep(1)
return orig_readline()
return orig_readline(*args, **kwargs)
self.sender.readline = delayed_readline
response.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
connection, response)
def test_updates_read_response_disconnect_start(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(chunk_body='\r\n')
response = FakeResponse(chunk_body='\r\n')
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -1312,14 +1325,14 @@ class TestSender(BaseTest):
def test_updates_read_response_unexp_start(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
'anything else\r\n'
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
@ -1331,33 +1344,33 @@ class TestSender(BaseTest):
def test_updates_read_response_timeout_end(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
':UPDATES: END\r\n'))
orig_readline = self.sender.readline
orig_readline = response.readline
def delayed_readline():
rv = orig_readline()
def delayed_readline(*args, **kwargs):
rv = orig_readline(*args, **kwargs)
if rv == ':UPDATES: END\r\n':
eventlet.sleep(1)
return rv
self.sender.readline = delayed_readline
response.readline = delayed_readline
self.sender.daemon.http_timeout = 0.01
self.assertRaises(exceptions.MessageTimeout, self.sender.updates,
connection)
connection, response)
def test_updates_read_response_disconnect_end(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'\r\n'))
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), 'Early disconnect')
@ -1369,14 +1382,14 @@ class TestSender(BaseTest):
def test_updates_read_response_unexp_end(self):
connection = FakeConnection()
self.sender.send_map = {}
self.sender.response = FakeResponse(
response = FakeResponse(
chunk_body=(
':UPDATES: START\r\n'
'anything else\r\n'
':UPDATES: END\r\n'))
exc = None
try:
self.sender.updates(connection)
self.sender.updates(connection, response)
except exceptions.ReplicationException as err:
exc = err
self.assertEqual(str(exc), "Unexpected response: 'anything else'")