py3: port ssync
Change-Id: I63a502be13f5dcda2a457d38f2fc5f1ca469d562
This commit is contained in:
parent
23eca56ce0
commit
d9cafca246
@ -463,7 +463,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
if resp_frag_index not in buckets[timestamp]:
|
if resp_frag_index not in buckets[timestamp]:
|
||||||
buckets[timestamp][resp_frag_index] = resp
|
buckets[timestamp][resp_frag_index] = resp
|
||||||
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
|
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
|
||||||
responses = buckets[timestamp].values()
|
responses = list(buckets[timestamp].values())
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Reconstruct frag #%s with frag indexes %s'
|
'Reconstruct frag #%s with frag indexes %s'
|
||||||
% (fi_to_rebuild, list(buckets[timestamp])))
|
% (fi_to_rebuild, list(buckets[timestamp])))
|
||||||
@ -509,15 +509,15 @@ class ObjectReconstructor(Daemon):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def _get_one_fragment(resp):
|
def _get_one_fragment(resp):
|
||||||
buff = ''
|
buff = []
|
||||||
remaining_bytes = policy.fragment_size
|
remaining_bytes = policy.fragment_size
|
||||||
while remaining_bytes:
|
while remaining_bytes:
|
||||||
chunk = resp.read(remaining_bytes)
|
chunk = resp.read(remaining_bytes)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
remaining_bytes -= len(chunk)
|
remaining_bytes -= len(chunk)
|
||||||
buff += chunk
|
buff.append(chunk)
|
||||||
return buff
|
return b''.join(buff)
|
||||||
|
|
||||||
def fragment_payload_iter():
|
def fragment_payload_iter():
|
||||||
# We need a fragment from each connections, so best to
|
# We need a fragment from each connections, so best to
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
|
|
||||||
import eventlet.greenio
|
import eventlet.greenio
|
||||||
|
import eventlet.wsgi
|
||||||
from six.moves import urllib
|
from six.moves import urllib
|
||||||
|
|
||||||
from swift.common import exceptions
|
from swift.common import exceptions
|
||||||
@ -35,7 +36,7 @@ def decode_missing(line):
|
|||||||
:py:func:`~swift.obj.ssync_sender.encode_missing`
|
:py:func:`~swift.obj.ssync_sender.encode_missing`
|
||||||
"""
|
"""
|
||||||
result = {}
|
result = {}
|
||||||
parts = line.split()
|
parts = line.decode('ascii').split()
|
||||||
result['object_hash'] = urllib.parse.unquote(parts[0])
|
result['object_hash'] = urllib.parse.unquote(parts[0])
|
||||||
t_data = urllib.parse.unquote(parts[1])
|
t_data = urllib.parse.unquote(parts[1])
|
||||||
result['ts_data'] = Timestamp(t_data)
|
result['ts_data'] = Timestamp(t_data)
|
||||||
@ -129,7 +130,17 @@ class Receiver(object):
|
|||||||
# raised during processing because otherwise the sender could send for
|
# raised during processing because otherwise the sender could send for
|
||||||
# quite some time before realizing it was all in vain.
|
# quite some time before realizing it was all in vain.
|
||||||
self.disconnect = True
|
self.disconnect = True
|
||||||
self.initialize_request()
|
try:
|
||||||
|
self.initialize_request()
|
||||||
|
except swob.HTTPException:
|
||||||
|
# Old (pre-0.18.0) eventlet would try to drain the request body
|
||||||
|
# in a way that's prone to blowing up when the client has
|
||||||
|
# disconnected. Trick it into skipping that so we don't trip
|
||||||
|
# ValueError: invalid literal for int() with base 16
|
||||||
|
# in tests. Note we disconnect shortly after receiving a non-200
|
||||||
|
# response in the sender code, so this is not *so* crazy to do.
|
||||||
|
request.environ['wsgi.input'].chunked_input = False
|
||||||
|
raise
|
||||||
|
|
||||||
def __call__(self):
|
def __call__(self):
|
||||||
"""
|
"""
|
||||||
@ -151,7 +162,7 @@ class Receiver(object):
|
|||||||
try:
|
try:
|
||||||
# Need to send something to trigger wsgi to return response
|
# Need to send something to trigger wsgi to return response
|
||||||
# headers and kick off the ssync exchange.
|
# headers and kick off the ssync exchange.
|
||||||
yield '\r\n'
|
yield b'\r\n'
|
||||||
# If semaphore is in use, try to acquire it, non-blocking, and
|
# If semaphore is in use, try to acquire it, non-blocking, and
|
||||||
# return a 503 if it fails.
|
# return a 503 if it fails.
|
||||||
if self.app.replication_semaphore:
|
if self.app.replication_semaphore:
|
||||||
@ -176,21 +187,22 @@ class Receiver(object):
|
|||||||
'%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
|
'%s/%s/%s SSYNC LOCK TIMEOUT: %s' % (
|
||||||
self.request.remote_addr, self.device, self.partition,
|
self.request.remote_addr, self.device, self.partition,
|
||||||
err))
|
err))
|
||||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
yield (':ERROR: %d %r\n' % (0, str(err))).encode('utf8')
|
||||||
except exceptions.MessageTimeout as err:
|
except exceptions.MessageTimeout as err:
|
||||||
self.app.logger.error(
|
self.app.logger.error(
|
||||||
'%s/%s/%s TIMEOUT in ssync.Receiver: %s' % (
|
'%s/%s/%s TIMEOUT in ssync.Receiver: %s' % (
|
||||||
self.request.remote_addr, self.device, self.partition,
|
self.request.remote_addr, self.device, self.partition,
|
||||||
err))
|
err))
|
||||||
yield ':ERROR: %d %r\n' % (408, str(err))
|
yield (':ERROR: %d %r\n' % (408, str(err))).encode('utf8')
|
||||||
except swob.HTTPException as err:
|
except swob.HTTPException as err:
|
||||||
body = ''.join(err({}, lambda *args: None))
|
body = b''.join(err({}, lambda *args: None))
|
||||||
yield ':ERROR: %d %r\n' % (err.status_int, body)
|
yield (':ERROR: %d %r\n' % (
|
||||||
|
err.status_int, body)).encode('utf8')
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
self.app.logger.exception(
|
self.app.logger.exception(
|
||||||
'%s/%s/%s EXCEPTION in ssync.Receiver' %
|
'%s/%s/%s EXCEPTION in ssync.Receiver' %
|
||||||
(self.request.remote_addr, self.device, self.partition))
|
(self.request.remote_addr, self.device, self.partition))
|
||||||
yield ':ERROR: %d %r\n' % (0, str(err))
|
yield (':ERROR: %d %r\n' % (0, str(err))).encode('utf8')
|
||||||
except Exception:
|
except Exception:
|
||||||
self.app.logger.exception('EXCEPTION in ssync.Receiver')
|
self.app.logger.exception('EXCEPTION in ssync.Receiver')
|
||||||
if self.disconnect:
|
if self.disconnect:
|
||||||
@ -335,7 +347,7 @@ class Receiver(object):
|
|||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.app.client_timeout, 'missing_check start'):
|
self.app.client_timeout, 'missing_check start'):
|
||||||
line = self.fp.readline(self.app.network_chunk_size)
|
line = self.fp.readline(self.app.network_chunk_size)
|
||||||
if line.strip() != ':MISSING_CHECK: START':
|
if line.strip() != b':MISSING_CHECK: START':
|
||||||
raise Exception(
|
raise Exception(
|
||||||
'Looking for :MISSING_CHECK: START got %r' % line[:1024])
|
'Looking for :MISSING_CHECK: START got %r' % line[:1024])
|
||||||
object_hashes = []
|
object_hashes = []
|
||||||
@ -343,16 +355,16 @@ class Receiver(object):
|
|||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.app.client_timeout, 'missing_check line'):
|
self.app.client_timeout, 'missing_check line'):
|
||||||
line = self.fp.readline(self.app.network_chunk_size)
|
line = self.fp.readline(self.app.network_chunk_size)
|
||||||
if not line or line.strip() == ':MISSING_CHECK: END':
|
if not line or line.strip() == b':MISSING_CHECK: END':
|
||||||
break
|
break
|
||||||
want = self._check_missing(line)
|
want = self._check_missing(line)
|
||||||
if want:
|
if want:
|
||||||
object_hashes.append(want)
|
object_hashes.append(want)
|
||||||
yield ':MISSING_CHECK: START\r\n'
|
yield b':MISSING_CHECK: START\r\n'
|
||||||
if object_hashes:
|
if object_hashes:
|
||||||
yield '\r\n'.join(object_hashes)
|
yield b'\r\n'.join(hsh.encode('ascii') for hsh in object_hashes)
|
||||||
yield '\r\n'
|
yield b'\r\n'
|
||||||
yield ':MISSING_CHECK: END\r\n'
|
yield b':MISSING_CHECK: END\r\n'
|
||||||
|
|
||||||
def updates(self):
|
def updates(self):
|
||||||
"""
|
"""
|
||||||
@ -395,7 +407,7 @@ class Receiver(object):
|
|||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.app.client_timeout, 'updates start'):
|
self.app.client_timeout, 'updates start'):
|
||||||
line = self.fp.readline(self.app.network_chunk_size)
|
line = self.fp.readline(self.app.network_chunk_size)
|
||||||
if line.strip() != ':UPDATES: START':
|
if line.strip() != b':UPDATES: START':
|
||||||
raise Exception('Looking for :UPDATES: START got %r' % line[:1024])
|
raise Exception('Looking for :UPDATES: START got %r' % line[:1024])
|
||||||
successes = 0
|
successes = 0
|
||||||
failures = 0
|
failures = 0
|
||||||
@ -403,10 +415,10 @@ class Receiver(object):
|
|||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.app.client_timeout, 'updates line'):
|
self.app.client_timeout, 'updates line'):
|
||||||
line = self.fp.readline(self.app.network_chunk_size)
|
line = self.fp.readline(self.app.network_chunk_size)
|
||||||
if not line or line.strip() == ':UPDATES: END':
|
if not line or line.strip() == b':UPDATES: END':
|
||||||
break
|
break
|
||||||
# Read first line METHOD PATH of subrequest.
|
# Read first line METHOD PATH of subrequest.
|
||||||
method, path = line.strip().split(' ', 1)
|
method, path = swob.bytes_to_wsgi(line.strip()).split(' ', 1)
|
||||||
subreq = swob.Request.blank(
|
subreq = swob.Request.blank(
|
||||||
'/%s/%s%s' % (self.device, self.partition, path),
|
'/%s/%s%s' % (self.device, self.partition, path),
|
||||||
environ={'REQUEST_METHOD': method})
|
environ={'REQUEST_METHOD': method})
|
||||||
@ -422,7 +434,7 @@ class Receiver(object):
|
|||||||
line = line.strip()
|
line = line.strip()
|
||||||
if not line:
|
if not line:
|
||||||
break
|
break
|
||||||
header, value = line.split(':', 1)
|
header, value = swob.bytes_to_wsgi(line).split(':', 1)
|
||||||
header = header.strip().lower()
|
header = header.strip().lower()
|
||||||
value = value.strip()
|
value = value.strip()
|
||||||
subreq.headers[header] = value
|
subreq.headers[header] = value
|
||||||
@ -495,5 +507,5 @@ class Receiver(object):
|
|||||||
raise swob.HTTPInternalServerError(
|
raise swob.HTTPInternalServerError(
|
||||||
'ERROR: With :UPDATES: %d failures to %d successes' %
|
'ERROR: With :UPDATES: %d failures to %d successes' %
|
||||||
(failures, successes))
|
(failures, successes))
|
||||||
yield ':UPDATES: START\r\n'
|
yield b':UPDATES: START\r\n'
|
||||||
yield ':UPDATES: END\r\n'
|
yield b':UPDATES: END\r\n'
|
||||||
|
@ -40,7 +40,7 @@ def encode_missing(object_hash, ts_data, ts_meta=None, ts_ctype=None):
|
|||||||
if ts_ctype and ts_ctype != ts_data:
|
if ts_ctype and ts_ctype != ts_data:
|
||||||
delta = ts_ctype.raw - ts_data.raw
|
delta = ts_ctype.raw - ts_data.raw
|
||||||
msg = '%s,t:%x' % (msg, delta)
|
msg = '%s,t:%x' % (msg, delta)
|
||||||
return msg
|
return msg.encode('ascii')
|
||||||
|
|
||||||
|
|
||||||
def decode_wanted(parts):
|
def decode_wanted(parts):
|
||||||
@ -52,7 +52,7 @@ def decode_wanted(parts):
|
|||||||
:py:func:`~swift.obj.ssync_receiver.encode_wanted`
|
:py:func:`~swift.obj.ssync_receiver.encode_wanted`
|
||||||
"""
|
"""
|
||||||
wanted = {}
|
wanted = {}
|
||||||
key_map = dict(d='data', m='meta')
|
key_map = {'d': 'data', 'm': 'meta'}
|
||||||
if parts:
|
if parts:
|
||||||
# receiver specified data and/or meta wanted, so use those as
|
# receiver specified data and/or meta wanted, so use those as
|
||||||
# conditions for sending PUT and/or POST subrequests
|
# conditions for sending PUT and/or POST subrequests
|
||||||
@ -72,7 +72,7 @@ def decode_wanted(parts):
|
|||||||
class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object):
|
class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object):
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super(SsyncBufferedHTTPResponse, self).__init__(*args, **kwargs)
|
super(SsyncBufferedHTTPResponse, self).__init__(*args, **kwargs)
|
||||||
self.ssync_response_buffer = ''
|
self.ssync_response_buffer = b''
|
||||||
self.ssync_response_chunk_left = 0
|
self.ssync_response_chunk_left = 0
|
||||||
|
|
||||||
def readline(self, size=1024):
|
def readline(self, size=1024):
|
||||||
@ -84,13 +84,13 @@ class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object):
|
|||||||
taken from Python's httplib itself.
|
taken from Python's httplib itself.
|
||||||
"""
|
"""
|
||||||
data = self.ssync_response_buffer
|
data = self.ssync_response_buffer
|
||||||
self.ssync_response_buffer = ''
|
self.ssync_response_buffer = b''
|
||||||
while '\n' not in data and len(data) < size:
|
while b'\n' not in data and len(data) < size:
|
||||||
if self.ssync_response_chunk_left == -1: # EOF-already indicator
|
if self.ssync_response_chunk_left == -1: # EOF-already indicator
|
||||||
break
|
break
|
||||||
if self.ssync_response_chunk_left == 0:
|
if self.ssync_response_chunk_left == 0:
|
||||||
line = self.fp.readline()
|
line = self.fp.readline()
|
||||||
i = line.find(';')
|
i = line.find(b';')
|
||||||
if i >= 0:
|
if i >= 0:
|
||||||
line = line[:i] # strip chunk-extensions
|
line = line[:i] # strip chunk-extensions
|
||||||
try:
|
try:
|
||||||
@ -114,9 +114,9 @@ class SsyncBufferedHTTPResponse(bufferedhttp.BufferedHTTPResponse, object):
|
|||||||
if self.ssync_response_chunk_left == 0:
|
if self.ssync_response_chunk_left == 0:
|
||||||
self.fp.read(2) # discard the trailing \r\n
|
self.fp.read(2) # discard the trailing \r\n
|
||||||
data += chunk
|
data += chunk
|
||||||
if '\n' in data:
|
if b'\n' in data:
|
||||||
data, self.ssync_response_buffer = data.split('\n', 1)
|
data, self.ssync_response_buffer = data.split(b'\n', 1)
|
||||||
data += '\n'
|
data += b'\n'
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
@ -263,8 +263,8 @@ class Sender(object):
|
|||||||
# First, send our list.
|
# First, send our list.
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout, 'missing_check start'):
|
self.daemon.node_timeout, 'missing_check start'):
|
||||||
msg = ':MISSING_CHECK: START\r\n'
|
msg = b':MISSING_CHECK: START\r\n'
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
hash_gen = self.df_mgr.yield_hashes(
|
hash_gen = self.df_mgr.yield_hashes(
|
||||||
self.job['device'], self.job['partition'],
|
self.job['device'], self.job['partition'],
|
||||||
self.job['policy'], self.suffixes,
|
self.job['policy'], self.suffixes,
|
||||||
@ -279,12 +279,12 @@ class Sender(object):
|
|||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout,
|
self.daemon.node_timeout,
|
||||||
'missing_check send line'):
|
'missing_check send line'):
|
||||||
msg = '%s\r\n' % encode_missing(object_hash, **timestamps)
|
msg = b'%s\r\n' % encode_missing(object_hash, **timestamps)
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout, 'missing_check end'):
|
self.daemon.node_timeout, 'missing_check end'):
|
||||||
msg = ':MISSING_CHECK: END\r\n'
|
msg = b':MISSING_CHECK: END\r\n'
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
# Now, retrieve the list of what they want.
|
# Now, retrieve the list of what they want.
|
||||||
while True:
|
while True:
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
@ -293,9 +293,14 @@ class Sender(object):
|
|||||||
if not line:
|
if not line:
|
||||||
raise exceptions.ReplicationException('Early disconnect')
|
raise exceptions.ReplicationException('Early disconnect')
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line == ':MISSING_CHECK: START':
|
if line == b':MISSING_CHECK: START':
|
||||||
break
|
break
|
||||||
elif line:
|
elif line:
|
||||||
|
if not six.PY2:
|
||||||
|
try:
|
||||||
|
line = line.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
pass
|
||||||
raise exceptions.ReplicationException(
|
raise exceptions.ReplicationException(
|
||||||
'Unexpected response: %r' % line[:1024])
|
'Unexpected response: %r' % line[:1024])
|
||||||
while True:
|
while True:
|
||||||
@ -305,9 +310,9 @@ class Sender(object):
|
|||||||
if not line:
|
if not line:
|
||||||
raise exceptions.ReplicationException('Early disconnect')
|
raise exceptions.ReplicationException('Early disconnect')
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line == ':MISSING_CHECK: END':
|
if line == b':MISSING_CHECK: END':
|
||||||
break
|
break
|
||||||
parts = line.split()
|
parts = line.decode('ascii').split()
|
||||||
if parts:
|
if parts:
|
||||||
send_map[parts[0]] = decode_wanted(parts[1:])
|
send_map[parts[0]] = decode_wanted(parts[1:])
|
||||||
return available_map, send_map
|
return available_map, send_map
|
||||||
@ -323,8 +328,8 @@ class Sender(object):
|
|||||||
# First, send all our subrequests based on the send_map.
|
# First, send all our subrequests based on the send_map.
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout, 'updates start'):
|
self.daemon.node_timeout, 'updates start'):
|
||||||
msg = ':UPDATES: START\r\n'
|
msg = b':UPDATES: START\r\n'
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
for object_hash, want in send_map.items():
|
for object_hash, want in send_map.items():
|
||||||
object_hash = urllib.parse.unquote(object_hash)
|
object_hash = urllib.parse.unquote(object_hash)
|
||||||
try:
|
try:
|
||||||
@ -360,8 +365,8 @@ class Sender(object):
|
|||||||
pass
|
pass
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout, 'updates end'):
|
self.daemon.node_timeout, 'updates end'):
|
||||||
msg = ':UPDATES: END\r\n'
|
msg = b':UPDATES: END\r\n'
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
# Now, read their response for any issues.
|
# Now, read their response for any issues.
|
||||||
while True:
|
while True:
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
@ -370,9 +375,14 @@ class Sender(object):
|
|||||||
if not line:
|
if not line:
|
||||||
raise exceptions.ReplicationException('Early disconnect')
|
raise exceptions.ReplicationException('Early disconnect')
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line == ':UPDATES: START':
|
if line == b':UPDATES: START':
|
||||||
break
|
break
|
||||||
elif line:
|
elif line:
|
||||||
|
if not six.PY2:
|
||||||
|
try:
|
||||||
|
line = line.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
pass
|
||||||
raise exceptions.ReplicationException(
|
raise exceptions.ReplicationException(
|
||||||
'Unexpected response: %r' % line[:1024])
|
'Unexpected response: %r' % line[:1024])
|
||||||
while True:
|
while True:
|
||||||
@ -382,20 +392,30 @@ class Sender(object):
|
|||||||
if not line:
|
if not line:
|
||||||
raise exceptions.ReplicationException('Early disconnect')
|
raise exceptions.ReplicationException('Early disconnect')
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
if line == ':UPDATES: END':
|
if line == b':UPDATES: END':
|
||||||
break
|
break
|
||||||
elif line:
|
elif line:
|
||||||
|
if not six.PY2:
|
||||||
|
try:
|
||||||
|
line = line.decode('ascii')
|
||||||
|
except UnicodeDecodeError:
|
||||||
|
pass
|
||||||
raise exceptions.ReplicationException(
|
raise exceptions.ReplicationException(
|
||||||
'Unexpected response: %r' % line[:1024])
|
'Unexpected response: %r' % line[:1024])
|
||||||
|
|
||||||
def send_subrequest(self, connection, method, url_path, headers, df):
|
def send_subrequest(self, connection, method, url_path, headers, df):
|
||||||
msg = ['%s %s' % (method, url_path)]
|
msg = [b'%s %s' % (method.encode('ascii'), url_path.encode('utf8'))]
|
||||||
for key, value in sorted(headers.items()):
|
for key, value in sorted(headers.items()):
|
||||||
msg.append('%s: %s' % (key, value))
|
if six.PY2:
|
||||||
msg = '\r\n'.join(msg) + '\r\n\r\n'
|
msg.append(b'%s: %s' % (key, value))
|
||||||
|
else:
|
||||||
|
msg.append(b'%s: %s' % (
|
||||||
|
key.encode('utf8', 'surrogateescape'),
|
||||||
|
str(value).encode('utf8', 'surrogateescape')))
|
||||||
|
msg = b'\r\n'.join(msg) + b'\r\n\r\n'
|
||||||
with exceptions.MessageTimeout(self.daemon.node_timeout,
|
with exceptions.MessageTimeout(self.daemon.node_timeout,
|
||||||
'send_%s' % method.lower()):
|
'send_%s' % method.lower()):
|
||||||
connection.send('%x\r\n%s\r\n' % (len(msg), msg))
|
connection.send(b'%x\r\n%s\r\n' % (len(msg), msg))
|
||||||
|
|
||||||
if df:
|
if df:
|
||||||
bytes_read = 0
|
bytes_read = 0
|
||||||
@ -404,7 +424,7 @@ class Sender(object):
|
|||||||
with exceptions.MessageTimeout(self.daemon.node_timeout,
|
with exceptions.MessageTimeout(self.daemon.node_timeout,
|
||||||
'send_%s chunk' %
|
'send_%s chunk' %
|
||||||
method.lower()):
|
method.lower()):
|
||||||
connection.send('%x\r\n%s\r\n' % (len(chunk), chunk))
|
connection.send(b'%x\r\n%s\r\n' % (len(chunk), chunk))
|
||||||
if bytes_read != df.content_length:
|
if bytes_read != df.content_length:
|
||||||
# Since we may now have partial state on the receiver we have
|
# Since we may now have partial state on the receiver we have
|
||||||
# to prevent the receiver finalising what may well be a bad or
|
# to prevent the receiver finalising what may well be a bad or
|
||||||
@ -450,7 +470,7 @@ class Sender(object):
|
|||||||
try:
|
try:
|
||||||
with exceptions.MessageTimeout(
|
with exceptions.MessageTimeout(
|
||||||
self.daemon.node_timeout, 'disconnect'):
|
self.daemon.node_timeout, 'disconnect'):
|
||||||
connection.send('0\r\n\r\n')
|
connection.send(b'0\r\n\r\n')
|
||||||
except (Exception, exceptions.Timeout):
|
except (Exception, exceptions.Timeout):
|
||||||
pass # We're okay with the above failing.
|
pass # We're okay with the above failing.
|
||||||
connection.close()
|
connection.close()
|
||||||
|
@ -25,6 +25,7 @@ from six.moves import urllib
|
|||||||
|
|
||||||
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
from swift.common.exceptions import DiskFileNotExist, DiskFileError, \
|
||||||
DiskFileDeleted, DiskFileExpired
|
DiskFileDeleted, DiskFileExpired
|
||||||
|
from swift.common import swob
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.storage_policy import POLICIES, EC_POLICY
|
from swift.common.storage_policy import POLICIES, EC_POLICY
|
||||||
from swift.common.utils import Timestamp
|
from swift.common.utils import Timestamp
|
||||||
@ -92,8 +93,8 @@ class TestBaseSsync(BaseTest):
|
|||||||
|
|
||||||
def make_send_wrapper(send):
|
def make_send_wrapper(send):
|
||||||
def wrapped_send(msg):
|
def wrapped_send(msg):
|
||||||
_msg = msg.split('\r\n', 1)[1]
|
_msg = msg.split(b'\r\n', 1)[1]
|
||||||
_msg = _msg.rsplit('\r\n', 1)[0]
|
_msg = _msg.rsplit(b'\r\n', 1)[0]
|
||||||
add_trace('tx', _msg)
|
add_trace('tx', _msg)
|
||||||
send(msg)
|
send(msg)
|
||||||
return wrapped_send
|
return wrapped_send
|
||||||
@ -118,7 +119,7 @@ class TestBaseSsync(BaseTest):
|
|||||||
def _get_object_data(self, path, **kwargs):
|
def _get_object_data(self, path, **kwargs):
|
||||||
# return data for given path
|
# return data for given path
|
||||||
if path not in self.obj_data:
|
if path not in self.obj_data:
|
||||||
self.obj_data[path] = '%s___data' % path
|
self.obj_data[path] = b'%s___data' % path.encode('ascii')
|
||||||
return self.obj_data[path]
|
return self.obj_data[path]
|
||||||
|
|
||||||
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
|
def _create_ondisk_files(self, df_mgr, obj_name, policy, timestamp,
|
||||||
@ -162,7 +163,7 @@ class TestBaseSsync(BaseTest):
|
|||||||
for k, v in tx_df.get_metadata().items():
|
for k, v in tx_df.get_metadata().items():
|
||||||
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
|
if k == 'X-Object-Sysmeta-Ec-Frag-Index':
|
||||||
# if tx_df had a frag_index then rx_df should also have one
|
# if tx_df had a frag_index then rx_df should also have one
|
||||||
self.assertTrue(k in rx_metadata)
|
self.assertIn(k, rx_metadata)
|
||||||
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
|
self.assertEqual(frag_index, int(rx_metadata.pop(k)))
|
||||||
elif k == 'ETag' and not same_etag:
|
elif k == 'ETag' and not same_etag:
|
||||||
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
self.assertNotEqual(v, rx_metadata.pop(k, None))
|
||||||
@ -174,7 +175,7 @@ class TestBaseSsync(BaseTest):
|
|||||||
self.assertFalse(rx_metadata)
|
self.assertFalse(rx_metadata)
|
||||||
expected_body = self._get_object_data(tx_df._name,
|
expected_body = self._get_object_data(tx_df._name,
|
||||||
frag_index=frag_index)
|
frag_index=frag_index)
|
||||||
actual_body = ''.join([chunk for chunk in rx_df.reader()])
|
actual_body = b''.join([chunk for chunk in rx_df.reader()])
|
||||||
self.assertEqual(expected_body, actual_body)
|
self.assertEqual(expected_body, actual_body)
|
||||||
|
|
||||||
def _analyze_trace(self, trace):
|
def _analyze_trace(self, trace):
|
||||||
@ -194,22 +195,22 @@ class TestBaseSsync(BaseTest):
|
|||||||
|
|
||||||
def rx_missing(results, line):
|
def rx_missing(results, line):
|
||||||
self.assertEqual('rx', line[0])
|
self.assertEqual('rx', line[0])
|
||||||
parts = line[1].split('\r\n')
|
parts = line[1].split(b'\r\n')
|
||||||
for part in parts:
|
for part in parts:
|
||||||
results['rx_missing'].append(part)
|
results['rx_missing'].append(part)
|
||||||
|
|
||||||
def tx_updates(results, line):
|
def tx_updates(results, line):
|
||||||
self.assertEqual('tx', line[0])
|
self.assertEqual('tx', line[0])
|
||||||
subrequests = results['tx_updates']
|
subrequests = results['tx_updates']
|
||||||
if line[1].startswith(('PUT', 'DELETE', 'POST')):
|
if line[1].startswith((b'PUT', b'DELETE', b'POST')):
|
||||||
parts = line[1].split('\r\n')
|
parts = [swob.bytes_to_wsgi(l) for l in line[1].split(b'\r\n')]
|
||||||
method, path = parts[0].split()
|
method, path = parts[0].split()
|
||||||
subreq = {'method': method, 'path': path, 'req': line[1],
|
subreq = {'method': method, 'path': path, 'req': line[1],
|
||||||
'headers': parts[1:]}
|
'headers': parts[1:]}
|
||||||
subrequests.append(subreq)
|
subrequests.append(subreq)
|
||||||
else:
|
else:
|
||||||
self.assertTrue(subrequests)
|
self.assertTrue(subrequests)
|
||||||
body = (subrequests[-1]).setdefault('body', '')
|
body = (subrequests[-1]).setdefault('body', b'')
|
||||||
body += line[1]
|
body += line[1]
|
||||||
subrequests[-1]['body'] = body
|
subrequests[-1]['body'] = body
|
||||||
|
|
||||||
@ -221,14 +222,14 @@ class TestBaseSsync(BaseTest):
|
|||||||
results.setdefault('unexpected', []).append(line)
|
results.setdefault('unexpected', []).append(line)
|
||||||
|
|
||||||
# each trace line is a tuple of ([tx|rx], msg)
|
# each trace line is a tuple of ([tx|rx], msg)
|
||||||
handshakes = iter([(('tx', ':MISSING_CHECK: START'), tx_missing),
|
handshakes = iter([(('tx', b':MISSING_CHECK: START'), tx_missing),
|
||||||
(('tx', ':MISSING_CHECK: END'), unexpected),
|
(('tx', b':MISSING_CHECK: END'), unexpected),
|
||||||
(('rx', ':MISSING_CHECK: START'), rx_missing),
|
(('rx', b':MISSING_CHECK: START'), rx_missing),
|
||||||
(('rx', ':MISSING_CHECK: END'), unexpected),
|
(('rx', b':MISSING_CHECK: END'), unexpected),
|
||||||
(('tx', ':UPDATES: START'), tx_updates),
|
(('tx', b':UPDATES: START'), tx_updates),
|
||||||
(('tx', ':UPDATES: END'), unexpected),
|
(('tx', b':UPDATES: END'), unexpected),
|
||||||
(('rx', ':UPDATES: START'), rx_updates),
|
(('rx', b':UPDATES: START'), rx_updates),
|
||||||
(('rx', ':UPDATES: END'), unexpected)])
|
(('rx', b':UPDATES: END'), unexpected)])
|
||||||
expect_handshake = next(handshakes)
|
expect_handshake = next(handshakes)
|
||||||
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
|
phases = ('tx_missing', 'rx_missing', 'tx_updates', 'rx_updates')
|
||||||
results = dict((k, []) for k in phases)
|
results = dict((k, []) for k in phases)
|
||||||
@ -319,7 +320,8 @@ class TestBaseSsyncEC(TestBaseSsync):
|
|||||||
# for EC policies obj_data maps obj path -> list of frag archives
|
# for EC policies obj_data maps obj path -> list of frag archives
|
||||||
if path not in self.obj_data:
|
if path not in self.obj_data:
|
||||||
# make unique frag archives for each object name
|
# make unique frag archives for each object name
|
||||||
data = path * 2 * (self.policy.ec_ndata + self.policy.ec_nparity)
|
data = path.encode('ascii') * 2 * (
|
||||||
|
self.policy.ec_ndata + self.policy.ec_nparity)
|
||||||
self.obj_data[path] = encode_frag_archive_bodies(
|
self.obj_data[path] = encode_frag_archive_bodies(
|
||||||
self.policy, data)
|
self.policy, data)
|
||||||
return self.obj_data[path][frag_index]
|
return self.obj_data[path][frag_index]
|
||||||
@ -740,7 +742,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
|||||||
self.device, self.partition, suffixes, policy)
|
self.device, self.partition, suffixes, policy)
|
||||||
rx_hashes = rx_df_mgr.get_hashes(
|
rx_hashes = rx_df_mgr.get_hashes(
|
||||||
self.device, self.partition, suffixes, policy)
|
self.device, self.partition, suffixes, policy)
|
||||||
self.assertEqual(suffixes, tx_hashes.keys()) # sanity
|
self.assertEqual(suffixes, list(tx_hashes.keys())) # sanity
|
||||||
self.assertEqual(tx_hashes, rx_hashes)
|
self.assertEqual(tx_hashes, rx_hashes)
|
||||||
|
|
||||||
# sanity check - run ssync again and expect no sync activity
|
# sanity check - run ssync again and expect no sync activity
|
||||||
@ -763,7 +765,7 @@ class FakeResponse(object):
|
|||||||
}
|
}
|
||||||
self.frag_index = frag_index
|
self.frag_index = frag_index
|
||||||
self.obj_data = obj_data
|
self.obj_data = obj_data
|
||||||
self.data = ''
|
self.data = b''
|
||||||
self.length = length
|
self.length = length
|
||||||
|
|
||||||
def init(self, path):
|
def init(self, path):
|
||||||
@ -779,7 +781,7 @@ class FakeResponse(object):
|
|||||||
if isinstance(self.data, Exception):
|
if isinstance(self.data, Exception):
|
||||||
raise self.data
|
raise self.data
|
||||||
val = self.data
|
val = self.data
|
||||||
self.data = ''
|
self.data = b''
|
||||||
return val if self.length is None else val[:self.length]
|
return val if self.length is None else val[:self.length]
|
||||||
|
|
||||||
|
|
||||||
@ -1011,7 +1013,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self._get_object_data(synced_obj_path,
|
self._get_object_data(synced_obj_path,
|
||||||
frag_index=self.rx_node_index),
|
frag_index=self.rx_node_index),
|
||||||
''.join([d for d in df.reader()]))
|
b''.join([d for d in df.reader()]))
|
||||||
except DiskFileNotExist:
|
except DiskFileNotExist:
|
||||||
msgs.append('Missing rx diskfile for %r' % obj_name)
|
msgs.append('Missing rx diskfile for %r' % obj_name)
|
||||||
|
|
||||||
@ -1057,7 +1059,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
self._get_object_data(df._name,
|
self._get_object_data(df._name,
|
||||||
frag_index=self.rx_node_index),
|
frag_index=self.rx_node_index),
|
||||||
''.join([d for d in df.reader()]))
|
b''.join([d for d in df.reader()]))
|
||||||
except DiskFileNotExist:
|
except DiskFileNotExist:
|
||||||
msgs.append('Missing rx diskfile for %r' % obj_name)
|
msgs.append('Missing rx diskfile for %r' % obj_name)
|
||||||
if msgs:
|
if msgs:
|
||||||
@ -1499,7 +1501,7 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
|
|
||||||
def _legacy_check_missing(self, line):
|
def _legacy_check_missing(self, line):
|
||||||
# reproduces behavior of 'legacy' ssync receiver missing_checks()
|
# reproduces behavior of 'legacy' ssync receiver missing_checks()
|
||||||
parts = line.split()
|
parts = line.decode('ascii').split()
|
||||||
object_hash = urllib.parse.unquote(parts[0])
|
object_hash = urllib.parse.unquote(parts[0])
|
||||||
timestamp = urllib.parse.unquote(parts[1])
|
timestamp = urllib.parse.unquote(parts[1])
|
||||||
want = False
|
want = False
|
||||||
@ -1562,14 +1564,14 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
|
|
||||||
# o1 on tx only with two meta files
|
# o1 on tx only with two meta files
|
||||||
name = 'o1'
|
name = 'o1'
|
||||||
t1 = self.ts_iter.next()
|
t1 = next(self.ts_iter)
|
||||||
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t1)
|
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t1)
|
||||||
t1_type = self.ts_iter.next()
|
t1_type = next(self.ts_iter)
|
||||||
metadata_1 = {'X-Timestamp': t1_type.internal,
|
metadata_1 = {'X-Timestamp': t1_type.internal,
|
||||||
'Content-Type': 'text/test',
|
'Content-Type': 'text/test',
|
||||||
'Content-Type-Timestamp': t1_type.internal}
|
'Content-Type-Timestamp': t1_type.internal}
|
||||||
tx_objs[name][0].write_metadata(metadata_1)
|
tx_objs[name][0].write_metadata(metadata_1)
|
||||||
t1_meta = self.ts_iter.next()
|
t1_meta = next(self.ts_iter)
|
||||||
metadata_2 = {'X-Timestamp': t1_meta.internal,
|
metadata_2 = {'X-Timestamp': t1_meta.internal,
|
||||||
'X-Object-Meta-Test': name}
|
'X-Object-Meta-Test': name}
|
||||||
tx_objs[name][0].write_metadata(metadata_2)
|
tx_objs[name][0].write_metadata(metadata_2)
|
||||||
@ -1579,14 +1581,14 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
# o2 on tx with two meta files, rx has .data and newest .meta but is
|
# o2 on tx with two meta files, rx has .data and newest .meta but is
|
||||||
# missing latest content-type
|
# missing latest content-type
|
||||||
name = 'o2'
|
name = 'o2'
|
||||||
t2 = self.ts_iter.next()
|
t2 = next(self.ts_iter)
|
||||||
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t2)
|
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t2)
|
||||||
t2_type = self.ts_iter.next()
|
t2_type = next(self.ts_iter)
|
||||||
metadata_1 = {'X-Timestamp': t2_type.internal,
|
metadata_1 = {'X-Timestamp': t2_type.internal,
|
||||||
'Content-Type': 'text/test',
|
'Content-Type': 'text/test',
|
||||||
'Content-Type-Timestamp': t2_type.internal}
|
'Content-Type-Timestamp': t2_type.internal}
|
||||||
tx_objs[name][0].write_metadata(metadata_1)
|
tx_objs[name][0].write_metadata(metadata_1)
|
||||||
t2_meta = self.ts_iter.next()
|
t2_meta = next(self.ts_iter)
|
||||||
metadata_2 = {'X-Timestamp': t2_meta.internal,
|
metadata_2 = {'X-Timestamp': t2_meta.internal,
|
||||||
'X-Object-Meta-Test': name}
|
'X-Object-Meta-Test': name}
|
||||||
tx_objs[name][0].write_metadata(metadata_2)
|
tx_objs[name][0].write_metadata(metadata_2)
|
||||||
@ -1597,14 +1599,14 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
# o3 on tx with two meta files, rx has .data and one .meta but does
|
# o3 on tx with two meta files, rx has .data and one .meta but does
|
||||||
# have latest content-type so nothing to sync
|
# have latest content-type so nothing to sync
|
||||||
name = 'o3'
|
name = 'o3'
|
||||||
t3 = self.ts_iter.next()
|
t3 = next(self.ts_iter)
|
||||||
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t3)
|
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t3)
|
||||||
t3_type = self.ts_iter.next()
|
t3_type = next(self.ts_iter)
|
||||||
metadata_1 = {'X-Timestamp': t3_type.internal,
|
metadata_1 = {'X-Timestamp': t3_type.internal,
|
||||||
'Content-Type': 'text/test',
|
'Content-Type': 'text/test',
|
||||||
'Content-Type-Timestamp': t3_type.internal}
|
'Content-Type-Timestamp': t3_type.internal}
|
||||||
tx_objs[name][0].write_metadata(metadata_1)
|
tx_objs[name][0].write_metadata(metadata_1)
|
||||||
t3_meta = self.ts_iter.next()
|
t3_meta = next(self.ts_iter)
|
||||||
metadata_2 = {'X-Timestamp': t3_meta.internal,
|
metadata_2 = {'X-Timestamp': t3_meta.internal,
|
||||||
'X-Object-Meta-Test': name}
|
'X-Object-Meta-Test': name}
|
||||||
tx_objs[name][0].write_metadata(metadata_2)
|
tx_objs[name][0].write_metadata(metadata_2)
|
||||||
@ -1619,10 +1621,10 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
# .data and two .meta having latest content-type so nothing to sync
|
# .data and two .meta having latest content-type so nothing to sync
|
||||||
# i.e. o4 is the reverse of o3 scenario
|
# i.e. o4 is the reverse of o3 scenario
|
||||||
name = 'o4'
|
name = 'o4'
|
||||||
t4 = self.ts_iter.next()
|
t4 = next(self.ts_iter)
|
||||||
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t4)
|
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t4)
|
||||||
t4_type = self.ts_iter.next()
|
t4_type = next(self.ts_iter)
|
||||||
t4_meta = self.ts_iter.next()
|
t4_meta = next(self.ts_iter)
|
||||||
metadata_2b = {'X-Timestamp': t4_meta.internal,
|
metadata_2b = {'X-Timestamp': t4_meta.internal,
|
||||||
'X-Object-Meta-Test': name,
|
'X-Object-Meta-Test': name,
|
||||||
'Content-Type': 'text/test',
|
'Content-Type': 'text/test',
|
||||||
@ -1640,10 +1642,10 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
# o5 on tx with one meta file having latest content-type, rx has
|
# o5 on tx with one meta file having latest content-type, rx has
|
||||||
# .data and no .meta
|
# .data and no .meta
|
||||||
name = 'o5'
|
name = 'o5'
|
||||||
t5 = self.ts_iter.next()
|
t5 = next(self.ts_iter)
|
||||||
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t5)
|
tx_objs[name] = self._create_ondisk_files(tx_df_mgr, name, policy, t5)
|
||||||
t5_type = self.ts_iter.next()
|
t5_type = next(self.ts_iter)
|
||||||
t5_meta = self.ts_iter.next()
|
t5_meta = next(self.ts_iter)
|
||||||
metadata = {'X-Timestamp': t5_meta.internal,
|
metadata = {'X-Timestamp': t5_meta.internal,
|
||||||
'X-Object-Meta-Test': name,
|
'X-Object-Meta-Test': name,
|
||||||
'Content-Type': 'text/test',
|
'Content-Type': 'text/test',
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -57,14 +57,16 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
|
|||||||
def __init__(self, chunk_body=''):
|
def __init__(self, chunk_body=''):
|
||||||
self.status = 200
|
self.status = 200
|
||||||
self.close_called = False
|
self.close_called = False
|
||||||
|
if not six.PY2:
|
||||||
|
chunk_body = chunk_body.encode('ascii')
|
||||||
if chunk_body:
|
if chunk_body:
|
||||||
self.fp = six.StringIO(
|
self.fp = six.BytesIO(
|
||||||
'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body))
|
b'%x\r\n%s\r\n0\r\n\r\n' % (len(chunk_body), chunk_body))
|
||||||
self.ssync_response_buffer = ''
|
self.ssync_response_buffer = b''
|
||||||
self.ssync_response_chunk_left = 0
|
self.ssync_response_chunk_left = 0
|
||||||
|
|
||||||
def read(self, *args, **kwargs):
|
def read(self, *args, **kwargs):
|
||||||
return ''
|
return b''
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.close_called = True
|
self.close_called = True
|
||||||
@ -487,9 +489,9 @@ class TestSender(BaseTest):
|
|||||||
self.assertTrue(success)
|
self.assertTrue(success)
|
||||||
found_post = found_put = False
|
found_post = found_put = False
|
||||||
for chunk in connection.sent:
|
for chunk in connection.sent:
|
||||||
if 'POST' in chunk:
|
if b'POST' in chunk:
|
||||||
found_post = True
|
found_post = True
|
||||||
if 'PUT' in chunk:
|
if b'PUT' in chunk:
|
||||||
found_put = True
|
found_put = True
|
||||||
self.assertFalse(found_post)
|
self.assertFalse(found_post)
|
||||||
self.assertTrue(found_put)
|
self.assertTrue(found_put)
|
||||||
@ -677,56 +679,56 @@ class TestSender(BaseTest):
|
|||||||
|
|
||||||
def test_readline_newline_in_buffer(self):
|
def test_readline_newline_in_buffer(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.ssync_response_buffer = 'Has a newline already.\r\nOkay.'
|
response.ssync_response_buffer = b'Has a newline already.\r\nOkay.'
|
||||||
self.assertEqual(response.readline(), 'Has a newline already.\r\n')
|
self.assertEqual(response.readline(), b'Has a newline already.\r\n')
|
||||||
self.assertEqual(response.ssync_response_buffer, 'Okay.')
|
self.assertEqual(response.ssync_response_buffer, b'Okay.')
|
||||||
|
|
||||||
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
|
def test_readline_buffer_exceeds_network_chunk_size_somehow(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.ssync_response_buffer = '1234567890'
|
response.ssync_response_buffer = b'1234567890'
|
||||||
self.assertEqual(response.readline(size=2), '1234567890')
|
self.assertEqual(response.readline(size=2), b'1234567890')
|
||||||
self.assertEqual(response.ssync_response_buffer, '')
|
self.assertEqual(response.ssync_response_buffer, b'')
|
||||||
|
|
||||||
def test_readline_at_start_of_chunk(self):
|
def test_readline_at_start_of_chunk(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO('2\r\nx\n\r\n')
|
response.fp = six.BytesIO(b'2\r\nx\n\r\n')
|
||||||
self.assertEqual(response.readline(), 'x\n')
|
self.assertEqual(response.readline(), b'x\n')
|
||||||
|
|
||||||
def test_readline_chunk_with_extension(self):
|
def test_readline_chunk_with_extension(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO(
|
response.fp = six.BytesIO(
|
||||||
'2 ; chunk=extension\r\nx\n\r\n')
|
b'2 ; chunk=extension\r\nx\n\r\n')
|
||||||
self.assertEqual(response.readline(), 'x\n')
|
self.assertEqual(response.readline(), b'x\n')
|
||||||
|
|
||||||
def test_readline_broken_chunk(self):
|
def test_readline_broken_chunk(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO('q\r\nx\n\r\n')
|
response.fp = six.BytesIO(b'q\r\nx\n\r\n')
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exceptions.ReplicationException, response.readline)
|
exceptions.ReplicationException, response.readline)
|
||||||
self.assertTrue(response.close_called)
|
self.assertTrue(response.close_called)
|
||||||
|
|
||||||
def test_readline_terminated_chunk(self):
|
def test_readline_terminated_chunk(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO('b\r\nnot enough')
|
response.fp = six.BytesIO(b'b\r\nnot enough')
|
||||||
self.assertRaises(
|
self.assertRaises(
|
||||||
exceptions.ReplicationException, response.readline)
|
exceptions.ReplicationException, response.readline)
|
||||||
self.assertTrue(response.close_called)
|
self.assertTrue(response.close_called)
|
||||||
|
|
||||||
def test_readline_all(self):
|
def test_readline_all(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO('2\r\nx\n\r\n0\r\n\r\n')
|
response.fp = six.BytesIO(b'2\r\nx\n\r\n0\r\n\r\n')
|
||||||
self.assertEqual(response.readline(), 'x\n')
|
self.assertEqual(response.readline(), b'x\n')
|
||||||
self.assertEqual(response.readline(), '')
|
self.assertEqual(response.readline(), b'')
|
||||||
self.assertEqual(response.readline(), '')
|
self.assertEqual(response.readline(), b'')
|
||||||
|
|
||||||
def test_readline_all_trailing_not_newline_termed(self):
|
def test_readline_all_trailing_not_newline_termed(self):
|
||||||
response = FakeResponse()
|
response = FakeResponse()
|
||||||
response.fp = six.StringIO(
|
response.fp = six.BytesIO(
|
||||||
'2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n')
|
b'2\r\nx\n\r\n3\r\n123\r\n0\r\n\r\n')
|
||||||
self.assertEqual(response.readline(), 'x\n')
|
self.assertEqual(response.readline(), b'x\n')
|
||||||
self.assertEqual(response.readline(), '123')
|
self.assertEqual(response.readline(), b'123')
|
||||||
self.assertEqual(response.readline(), '')
|
self.assertEqual(response.readline(), b'')
|
||||||
self.assertEqual(response.readline(), '')
|
self.assertEqual(response.readline(), b'')
|
||||||
|
|
||||||
def test_missing_check_timeout(self):
|
def test_missing_check_timeout(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -761,9 +763,9 @@ class TestSender(BaseTest):
|
|||||||
available_map, send_map = self.sender.missing_check(connection,
|
available_map, send_map = self.sender.missing_check(connection,
|
||||||
response)
|
response)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
self.assertEqual(send_map, {})
|
self.assertEqual(send_map, {})
|
||||||
self.assertEqual(available_map, {})
|
self.assertEqual(available_map, {})
|
||||||
|
|
||||||
@ -804,14 +806,14 @@ class TestSender(BaseTest):
|
|||||||
available_map, send_map = self.sender.missing_check(connection,
|
available_map, send_map = self.sender.missing_check(connection,
|
||||||
response)
|
response)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||||
'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
|
b'3b\r\n9d41d8cd98f00b204e9800998ecf0def 1380144472.22222 '
|
||||||
'm:186a0\r\n\r\n'
|
b'm:186a0\r\n\r\n'
|
||||||
'3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
|
b'3f\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444 '
|
||||||
'm:186a0,t:4\r\n\r\n'
|
b'm:186a0,t:4\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
self.assertEqual(send_map, {})
|
self.assertEqual(send_map, {})
|
||||||
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
|
candidates = [('9d41d8cd98f00b204e9800998ecf0abc',
|
||||||
dict(ts_data=Timestamp(1380144470.00000))),
|
dict(ts_data=Timestamp(1380144470.00000))),
|
||||||
@ -853,10 +855,10 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), 'Early disconnect')
|
self.assertEqual(str(exc), 'Early disconnect')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
|
|
||||||
def test_missing_check_far_end_disconnect2(self):
|
def test_missing_check_far_end_disconnect2(self):
|
||||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||||
@ -888,10 +890,10 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), 'Early disconnect')
|
self.assertEqual(str(exc), 'Early disconnect')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
|
|
||||||
def test_missing_check_far_end_unexpected(self):
|
def test_missing_check_far_end_unexpected(self):
|
||||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||||
@ -922,10 +924,10 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
|
self.assertEqual(str(exc), "Unexpected response: 'OH HAI'")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
|
|
||||||
def test_missing_check_send_map(self):
|
def test_missing_check_send_map(self):
|
||||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||||
@ -956,10 +958,10 @@ class TestSender(BaseTest):
|
|||||||
available_map, send_map = self.sender.missing_check(connection,
|
available_map, send_map = self.sender.missing_check(connection,
|
||||||
response)
|
response)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||||
'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||||
'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
b'15\r\n:MISSING_CHECK: END\r\n\r\n')
|
||||||
self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}})
|
self.assertEqual(send_map, {'0123abc': {'data': True, 'meta': True}})
|
||||||
self.assertEqual(available_map,
|
self.assertEqual(available_map,
|
||||||
dict([('9d41d8cd98f00b204e9800998ecf0abc',
|
dict([('9d41d8cd98f00b204e9800998ecf0abc',
|
||||||
@ -1016,9 +1018,9 @@ class TestSender(BaseTest):
|
|||||||
':UPDATES: END\r\n'))
|
':UPDATES: END\r\n'))
|
||||||
self.sender.updates(connection, response, {})
|
self.sender.updates(connection, response, {})
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_unexpected_response_lines1(self):
|
def test_updates_unexpected_response_lines1(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1034,9 +1036,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), "Unexpected response: 'abc'")
|
self.assertEqual(str(exc), "Unexpected response: 'abc'")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_unexpected_response_lines2(self):
|
def test_updates_unexpected_response_lines2(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1052,9 +1054,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), "Unexpected response: 'abc'")
|
self.assertEqual(str(exc), "Unexpected response: 'abc'")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_is_deleted(self):
|
def test_updates_is_deleted(self):
|
||||||
device = 'dev'
|
device = 'dev'
|
||||||
@ -1086,9 +1088,9 @@ class TestSender(BaseTest):
|
|||||||
# note that the delete line isn't actually sent since we mock
|
# note that the delete line isn't actually sent since we mock
|
||||||
# send_delete; send_delete is tested separately.
|
# send_delete; send_delete is tested separately.
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_update_send_delete(self):
|
def test_update_send_delete(self):
|
||||||
device = 'dev'
|
device = 'dev'
|
||||||
@ -1113,13 +1115,13 @@ class TestSender(BaseTest):
|
|||||||
':UPDATES: END\r\n'))
|
':UPDATES: END\r\n'))
|
||||||
self.sender.updates(connection, response, send_map)
|
self.sender.updates(connection, response, send_map)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'30\r\n'
|
b'30\r\n'
|
||||||
'DELETE /a/c/o\r\n'
|
b'DELETE /a/c/o\r\n'
|
||||||
'X-Timestamp: %s\r\n\r\n\r\n'
|
b'X-Timestamp: %s\r\n\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n'
|
b'f\r\n:UPDATES: END\r\n\r\n'
|
||||||
% delete_timestamp
|
% delete_timestamp.encode('ascii')
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_updates_put(self):
|
def test_updates_put(self):
|
||||||
@ -1166,9 +1168,9 @@ class TestSender(BaseTest):
|
|||||||
# note that the put line isn't actually sent since we mock send_put;
|
# note that the put line isn't actually sent since we mock send_put;
|
||||||
# send_put is tested separately.
|
# send_put is tested separately.
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_post(self):
|
def test_updates_post(self):
|
||||||
ts_iter = make_timestamp_iter()
|
ts_iter = make_timestamp_iter()
|
||||||
@ -1213,9 +1215,9 @@ class TestSender(BaseTest):
|
|||||||
# note that the post line isn't actually sent since we mock send_post;
|
# note that the post line isn't actually sent since we mock send_post;
|
||||||
# send_post is tested separately.
|
# send_post is tested separately.
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_put_and_post(self):
|
def test_updates_put_and_post(self):
|
||||||
ts_iter = make_timestamp_iter()
|
ts_iter = make_timestamp_iter()
|
||||||
@ -1265,9 +1267,9 @@ class TestSender(BaseTest):
|
|||||||
self.assertIsInstance(df, diskfile.DiskFile)
|
self.assertIsInstance(df, diskfile.DiskFile)
|
||||||
self.assertEqual(expected, df.get_metadata())
|
self.assertEqual(expected, df.get_metadata())
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_storage_policy_index(self):
|
def test_updates_storage_policy_index(self):
|
||||||
device = 'dev'
|
device = 'dev'
|
||||||
@ -1328,9 +1330,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), 'Early disconnect')
|
self.assertEqual(str(exc), 'Early disconnect')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_read_response_unexp_start(self):
|
def test_updates_read_response_unexp_start(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1346,9 +1348,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
|
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_read_response_timeout_end(self):
|
def test_updates_read_response_timeout_end(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1360,7 +1362,7 @@ class TestSender(BaseTest):
|
|||||||
|
|
||||||
def delayed_readline(*args, **kwargs):
|
def delayed_readline(*args, **kwargs):
|
||||||
rv = orig_readline(*args, **kwargs)
|
rv = orig_readline(*args, **kwargs)
|
||||||
if rv == ':UPDATES: END\r\n':
|
if rv == b':UPDATES: END\r\n':
|
||||||
eventlet.sleep(1)
|
eventlet.sleep(1)
|
||||||
return rv
|
return rv
|
||||||
|
|
||||||
@ -1382,9 +1384,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), 'Early disconnect')
|
self.assertEqual(str(exc), 'Early disconnect')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_updates_read_response_unexp_end(self):
|
def test_updates_read_response_unexp_end(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1400,9 +1402,9 @@ class TestSender(BaseTest):
|
|||||||
exc = err
|
exc = err
|
||||||
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
|
self.assertEqual(str(exc), "Unexpected response: 'anything else'")
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'11\r\n:UPDATES: START\r\n\r\n'
|
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||||
'f\r\n:UPDATES: END\r\n\r\n')
|
b'f\r\n:UPDATES: END\r\n\r\n')
|
||||||
|
|
||||||
def test_send_delete_timeout(self):
|
def test_send_delete_timeout(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
@ -1421,11 +1423,11 @@ class TestSender(BaseTest):
|
|||||||
self.sender.send_delete(connection, '/a/c/o',
|
self.sender.send_delete(connection, '/a/c/o',
|
||||||
utils.Timestamp('1381679759.90941'))
|
utils.Timestamp('1381679759.90941'))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'30\r\n'
|
b'30\r\n'
|
||||||
'DELETE /a/c/o\r\n'
|
b'DELETE /a/c/o\r\n'
|
||||||
'X-Timestamp: 1381679759.90941\r\n'
|
b'X-Timestamp: 1381679759.90941\r\n'
|
||||||
'\r\n\r\n')
|
b'\r\n\r\n')
|
||||||
|
|
||||||
def test_send_put_initial_timeout(self):
|
def test_send_put_initial_timeout(self):
|
||||||
df = self._make_open_diskfile()
|
df = self._make_open_diskfile()
|
||||||
@ -1465,19 +1467,20 @@ class TestSender(BaseTest):
|
|||||||
def _check_send_put(self, obj_name, meta_value):
|
def _check_send_put(self, obj_name, meta_value):
|
||||||
ts_iter = make_timestamp_iter()
|
ts_iter = make_timestamp_iter()
|
||||||
t1 = next(ts_iter)
|
t1 = next(ts_iter)
|
||||||
body = 'test'
|
body = b'test'
|
||||||
extra_metadata = {'Some-Other-Header': 'value',
|
extra_metadata = {'Some-Other-Header': 'value',
|
||||||
u'Unicode-Meta-Name': meta_value}
|
u'Unicode-Meta-Name': meta_value}
|
||||||
df = self._make_open_diskfile(obj=obj_name, body=body,
|
df = self._make_open_diskfile(obj=obj_name, body=body,
|
||||||
timestamp=t1,
|
timestamp=t1,
|
||||||
extra_metadata=extra_metadata)
|
extra_metadata=extra_metadata)
|
||||||
expected = dict(df.get_metadata())
|
expected = dict(df.get_metadata())
|
||||||
expected['body'] = body
|
expected['body'] = body if six.PY2 else body.decode('ascii')
|
||||||
expected['chunk_size'] = len(body)
|
expected['chunk_size'] = len(body)
|
||||||
expected['meta'] = meta_value
|
expected['meta'] = meta_value
|
||||||
|
wire_meta = meta_value if six.PY2 else meta_value.encode('utf8')
|
||||||
path = six.moves.urllib.parse.quote(expected['name'])
|
path = six.moves.urllib.parse.quote(expected['name'])
|
||||||
expected['path'] = path
|
expected['path'] = path
|
||||||
expected['length'] = format(145 + len(path) + len(meta_value), 'x')
|
expected['length'] = format(145 + len(path) + len(wire_meta), 'x')
|
||||||
# .meta file metadata is not included in expected for data only PUT
|
# .meta file metadata is not included in expected for data only PUT
|
||||||
t2 = next(ts_iter)
|
t2 = next(ts_iter)
|
||||||
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
|
metadata = {'X-Timestamp': t2.internal, 'X-Object-Meta-Fruit': 'kiwi'}
|
||||||
@ -1485,8 +1488,7 @@ class TestSender(BaseTest):
|
|||||||
df.open()
|
df.open()
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
self.sender.send_put(connection, path, df)
|
self.sender.send_put(connection, path, df)
|
||||||
self.assertEqual(
|
expected = (
|
||||||
''.join(connection.sent),
|
|
||||||
'%(length)s\r\n'
|
'%(length)s\r\n'
|
||||||
'PUT %(path)s\r\n'
|
'PUT %(path)s\r\n'
|
||||||
'Content-Length: %(Content-Length)s\r\n'
|
'Content-Length: %(Content-Length)s\r\n'
|
||||||
@ -1498,13 +1500,20 @@ class TestSender(BaseTest):
|
|||||||
'\r\n'
|
'\r\n'
|
||||||
'%(chunk_size)s\r\n'
|
'%(chunk_size)s\r\n'
|
||||||
'%(body)s\r\n' % expected)
|
'%(body)s\r\n' % expected)
|
||||||
|
if not six.PY2:
|
||||||
|
expected = expected.encode('utf8')
|
||||||
|
self.assertEqual(b''.join(connection.sent), expected)
|
||||||
|
|
||||||
def test_send_put(self):
|
def test_send_put(self):
|
||||||
self._check_send_put('o', 'meta')
|
self._check_send_put('o', 'meta')
|
||||||
|
|
||||||
def test_send_put_unicode(self):
|
def test_send_put_unicode(self):
|
||||||
self._check_send_put(
|
if six.PY2:
|
||||||
'o_with_caract\xc3\xa8res_like_in_french', 'm\xc3\xa8ta')
|
self._check_send_put(
|
||||||
|
'o_with_caract\xc3\xa8res_like_in_french', 'm\xc3\xa8ta')
|
||||||
|
else:
|
||||||
|
self._check_send_put(
|
||||||
|
'o_with_caract\u00e8res_like_in_french', 'm\u00e8ta')
|
||||||
|
|
||||||
def _check_send_post(self, obj_name, meta_value):
|
def _check_send_post(self, obj_name, meta_value):
|
||||||
ts_iter = make_timestamp_iter()
|
ts_iter = make_timestamp_iter()
|
||||||
@ -1522,39 +1531,46 @@ class TestSender(BaseTest):
|
|||||||
'X-Timestamp': ts_1.internal}
|
'X-Timestamp': ts_1.internal}
|
||||||
df.write_metadata(newer_metadata)
|
df.write_metadata(newer_metadata)
|
||||||
path = six.moves.urllib.parse.quote(df.read_metadata()['name'])
|
path = six.moves.urllib.parse.quote(df.read_metadata()['name'])
|
||||||
length = format(61 + len(path) + len(meta_value), 'x')
|
wire_meta = meta_value if six.PY2 else meta_value.encode('utf8')
|
||||||
|
length = format(61 + len(path) + len(wire_meta), 'x')
|
||||||
|
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
with df.open():
|
with df.open():
|
||||||
self.sender.send_post(connection, path, df)
|
self.sender.send_post(connection, path, df)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
''.join(connection.sent),
|
b''.join(connection.sent),
|
||||||
'%s\r\n'
|
b'%s\r\n'
|
||||||
'POST %s\r\n'
|
b'POST %s\r\n'
|
||||||
'X-Object-Meta-Foo: %s\r\n'
|
b'X-Object-Meta-Foo: %s\r\n'
|
||||||
'X-Timestamp: %s\r\n'
|
b'X-Timestamp: %s\r\n'
|
||||||
'\r\n'
|
b'\r\n'
|
||||||
'\r\n' % (length, path, meta_value, ts_1.internal))
|
b'\r\n' % (length.encode('ascii'), path.encode('ascii'),
|
||||||
|
wire_meta,
|
||||||
|
ts_1.internal.encode('ascii')))
|
||||||
|
|
||||||
def test_send_post(self):
|
def test_send_post(self):
|
||||||
self._check_send_post('o', 'meta')
|
self._check_send_post('o', 'meta')
|
||||||
|
|
||||||
def test_send_post_unicode(self):
|
def test_send_post_unicode(self):
|
||||||
self._check_send_post(
|
if six.PY2:
|
||||||
'o_with_caract\xc3\xa8res_like_in_french', 'm\xc3\xa8ta')
|
self._check_send_post(
|
||||||
|
'o_with_caract\xc3\xa8res_like_in_french', 'm\xc3\xa8ta')
|
||||||
|
else:
|
||||||
|
self._check_send_post(
|
||||||
|
'o_with_caract\u00e8res_like_in_french', 'm\u00e8ta')
|
||||||
|
|
||||||
def test_disconnect_timeout(self):
|
def test_disconnect_timeout(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
connection.send = lambda d: eventlet.sleep(1)
|
connection.send = lambda d: eventlet.sleep(1)
|
||||||
self.sender.daemon.node_timeout = 0.01
|
self.sender.daemon.node_timeout = 0.01
|
||||||
self.sender.disconnect(connection)
|
self.sender.disconnect(connection)
|
||||||
self.assertEqual(''.join(connection.sent), '')
|
self.assertEqual(b''.join(connection.sent), b'')
|
||||||
self.assertTrue(connection.closed)
|
self.assertTrue(connection.closed)
|
||||||
|
|
||||||
def test_disconnect(self):
|
def test_disconnect(self):
|
||||||
connection = FakeConnection()
|
connection = FakeConnection()
|
||||||
self.sender.disconnect(connection)
|
self.sender.disconnect(connection)
|
||||||
self.assertEqual(''.join(connection.sent), '0\r\n\r\n')
|
self.assertEqual(b''.join(connection.sent), b'0\r\n\r\n')
|
||||||
self.assertTrue(connection.closed)
|
self.assertTrue(connection.closed)
|
||||||
|
|
||||||
|
|
||||||
@ -1571,34 +1587,34 @@ class TestModuleMethods(unittest.TestCase):
|
|||||||
# equal data and meta timestamps -> legacy single timestamp string
|
# equal data and meta timestamps -> legacy single timestamp string
|
||||||
expected = '%s %s' % (object_hash, t_data.internal)
|
expected = '%s %s' % (object_hash, t_data.internal)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
expected,
|
expected.encode('ascii'),
|
||||||
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data))
|
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_data))
|
||||||
|
|
||||||
# newer meta timestamp -> hex data delta encoded as extra message part
|
# newer meta timestamp -> hex data delta encoded as extra message part
|
||||||
expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
|
expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
expected,
|
expected.encode('ascii'),
|
||||||
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta))
|
ssync_sender.encode_missing(object_hash, t_data, ts_meta=t_meta))
|
||||||
|
|
||||||
# newer meta timestamp -> hex data delta encoded as extra message part
|
# newer meta timestamp -> hex data delta encoded as extra message part
|
||||||
# content type timestamp equals data timestamp -> no delta
|
# content type timestamp equals data timestamp -> no delta
|
||||||
expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
|
expected = '%s %s m:%x' % (object_hash, t_data.internal, d_meta_data)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
expected,
|
expected.encode('ascii'),
|
||||||
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_data))
|
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_data))
|
||||||
|
|
||||||
# content type timestamp newer data timestamp -> delta encoded
|
# content type timestamp newer data timestamp -> delta encoded
|
||||||
expected = ('%s %s m:%x,t:%x'
|
expected = ('%s %s m:%x,t:%x'
|
||||||
% (object_hash, t_data.internal, d_meta_data, d_type_data))
|
% (object_hash, t_data.internal, d_meta_data, d_type_data))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
expected,
|
expected.encode('ascii'),
|
||||||
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type))
|
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type))
|
||||||
|
|
||||||
# content type timestamp equal to meta timestamp -> delta encoded
|
# content type timestamp equal to meta timestamp -> delta encoded
|
||||||
expected = ('%s %s m:%x,t:%x'
|
expected = ('%s %s m:%x,t:%x'
|
||||||
% (object_hash, t_data.internal, d_meta_data, d_type_data))
|
% (object_hash, t_data.internal, d_meta_data, d_type_data))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
expected,
|
expected.encode('ascii'),
|
||||||
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type))
|
ssync_sender.encode_missing(object_hash, t_data, t_meta, t_type))
|
||||||
|
|
||||||
# test encode and decode functions invert
|
# test encode and decode functions invert
|
||||||
|
3
tox.ini
3
tox.ini
@ -99,6 +99,9 @@ commands =
|
|||||||
test/unit/obj/test_expirer.py \
|
test/unit/obj/test_expirer.py \
|
||||||
test/unit/obj/test_replicator.py \
|
test/unit/obj/test_replicator.py \
|
||||||
test/unit/obj/test_server.py \
|
test/unit/obj/test_server.py \
|
||||||
|
test/unit/obj/test_ssync.py \
|
||||||
|
test/unit/obj/test_ssync_receiver.py \
|
||||||
|
test/unit/obj/test_ssync_sender.py \
|
||||||
test/unit/obj/test_updater.py \
|
test/unit/obj/test_updater.py \
|
||||||
test/unit/proxy}
|
test/unit/proxy}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user