Merge "Move EC-specific unit test to EC Test class"
This commit is contained in:
commit
dd3bc8fe61
@ -4097,127 +4097,6 @@ class TestReplicatedObjectController(
|
||||
finally:
|
||||
time.time = orig_time
|
||||
|
||||
@unpatch_policies
|
||||
def test_ec_client_disconnect(self):
|
||||
prolis = _test_sockets[0]
|
||||
|
||||
# create connection
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
|
||||
# create container
|
||||
fd.write('PUT /v1/a/ec-discon HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: 0\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'X-Storage-Policy: ec\r\n'
|
||||
'\r\n')
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 2'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# create object
|
||||
obj = 'a' * 4 * 64 * 2 ** 10
|
||||
fd.write('PUT /v1/a/ec-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: donuts\r\n'
|
||||
'\r\n%s' % (len(obj), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
class WrappedTimeout(ChunkWriteTimeout):
|
||||
def __enter__(self):
|
||||
timeouts[self] = traceback.extract_stack()
|
||||
return super(WrappedTimeout, self).__enter__()
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
timeouts[self] = None
|
||||
return super(WrappedTimeout, self).__exit__(typ, value, tb)
|
||||
|
||||
timeouts = {}
|
||||
with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout',
|
||||
WrappedTimeout):
|
||||
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
|
||||
# get object
|
||||
fd.write('GET /v1/a/ec-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'\r\n')
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 200'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# read most of the object, and disconnect
|
||||
fd.read(10)
|
||||
sock.fd._sock.close()
|
||||
self._sleep_enough(
|
||||
lambda:
|
||||
_test_servers[0].logger.get_lines_for_level('warning'))
|
||||
|
||||
# check for disconnect message!
|
||||
expected = ['Client disconnected on read'] * 2
|
||||
self.assertEqual(
|
||||
_test_servers[0].logger.get_lines_for_level('warning'),
|
||||
expected)
|
||||
# check that no coro was left waiting to write
|
||||
self.assertTrue(timeouts) # sanity - WrappedTimeout did get called
|
||||
missing_exits = [tb for tb in timeouts.values() if tb is not None]
|
||||
self.assertFalse(
|
||||
missing_exits, 'Failed to exit all ChunkWriteTimeouts.\n' +
|
||||
''.join(['No exit from ChunkWriteTimeout entered at:\n' +
|
||||
''.join(traceback.format_list(tb)[:-1])
|
||||
for tb in missing_exits]))
|
||||
# and check that the ChunkWriteTimeouts did not raise Exceptions
|
||||
self.assertFalse(_test_servers[0].logger.get_lines_for_level('error'))
|
||||
|
||||
@unpatch_policies
|
||||
def test_ec_client_put_disconnect(self):
|
||||
prolis = _test_sockets[0]
|
||||
|
||||
# create connection
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
|
||||
# create container
|
||||
fd.write('PUT /v1/a/ec-discon HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: 0\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'X-Storage-Policy: ec\r\n'
|
||||
'\r\n')
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 2'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# create object
|
||||
obj = 'a' * 4 * 64 * 2 ** 10
|
||||
fd.write('PUT /v1/a/ec-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: donuts\r\n'
|
||||
'\r\n%s' % (len(obj), obj[:-10]))
|
||||
fd.flush()
|
||||
fd.close()
|
||||
sock.close()
|
||||
# sleep to trampoline enough
|
||||
condition = \
|
||||
lambda: _test_servers[0].logger.get_lines_for_level('warning')
|
||||
self._sleep_enough(condition)
|
||||
expected = ['Client disconnected without sending enough data']
|
||||
warns = _test_servers[0].logger.get_lines_for_level('warning')
|
||||
self.assertEqual(expected, warns)
|
||||
errors = _test_servers[0].logger.get_lines_for_level('error')
|
||||
self.assertEqual([], errors)
|
||||
|
||||
@unpatch_policies
|
||||
def test_leak_1(self):
|
||||
_request_instances = weakref.WeakKeyDictionary()
|
||||
@ -5615,6 +5494,125 @@ class BaseTestECObjectController(BaseTestObjectController):
|
||||
os.rename(self.ec_policy.object_ring.serialized_path + '.bak',
|
||||
self.ec_policy.object_ring.serialized_path)
|
||||
|
||||
def test_ec_client_disconnect(self):
|
||||
prolis = _test_sockets[0]
|
||||
|
||||
# create connection
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
|
||||
# create container
|
||||
fd.write('PUT /v1/a/%s-discon HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: 0\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'X-Storage-Policy: %s\r\n'
|
||||
'\r\n' % (self.ec_policy.name, self.ec_policy.name))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 2'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# create object
|
||||
obj = 'a' * 4 * 64 * 2 ** 10
|
||||
fd.write('PUT /v1/a/%s-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: donuts\r\n'
|
||||
'\r\n%s' % (self.ec_policy.name, len(obj), obj))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 201'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
class WrappedTimeout(ChunkWriteTimeout):
|
||||
def __enter__(self):
|
||||
timeouts[self] = traceback.extract_stack()
|
||||
return super(WrappedTimeout, self).__enter__()
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
timeouts[self] = None
|
||||
return super(WrappedTimeout, self).__exit__(typ, value, tb)
|
||||
|
||||
timeouts = {}
|
||||
with mock.patch('swift.proxy.controllers.base.ChunkWriteTimeout',
|
||||
WrappedTimeout):
|
||||
with mock.patch.object(_test_servers[0], 'client_timeout', new=5):
|
||||
# get object
|
||||
fd.write('GET /v1/a/%s-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Connection: close\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'\r\n' % self.ec_policy.name)
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 200'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# read most of the object, and disconnect
|
||||
fd.read(10)
|
||||
sock.fd._sock.close()
|
||||
self._sleep_enough(
|
||||
lambda:
|
||||
_test_servers[0].logger.get_lines_for_level('warning'))
|
||||
|
||||
# check for disconnect message!
|
||||
expected = ['Client disconnected on read'] * 2
|
||||
self.assertEqual(
|
||||
_test_servers[0].logger.get_lines_for_level('warning'),
|
||||
expected)
|
||||
# check that no coro was left waiting to write
|
||||
self.assertTrue(timeouts) # sanity - WrappedTimeout did get called
|
||||
missing_exits = [tb for tb in timeouts.values() if tb is not None]
|
||||
self.assertFalse(
|
||||
missing_exits, 'Failed to exit all ChunkWriteTimeouts.\n' +
|
||||
''.join(['No exit from ChunkWriteTimeout entered at:\n' +
|
||||
''.join(traceback.format_list(tb)[:-1])
|
||||
for tb in missing_exits]))
|
||||
# and check that the ChunkWriteTimeouts did not raise Exceptions
|
||||
self.assertFalse(_test_servers[0].logger.get_lines_for_level('error'))
|
||||
|
||||
def test_ec_client_put_disconnect(self):
|
||||
prolis = _test_sockets[0]
|
||||
|
||||
# create connection
|
||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||
fd = sock.makefile()
|
||||
|
||||
# create container
|
||||
fd.write('PUT /v1/a/%s-discon HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: 0\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'X-Storage-Policy: %s\r\n'
|
||||
'\r\n' % (self.ec_policy.name, self.ec_policy.name))
|
||||
fd.flush()
|
||||
headers = readuntil2crlfs(fd)
|
||||
exp = 'HTTP/1.1 2'
|
||||
self.assertEqual(headers[:len(exp)], exp)
|
||||
|
||||
# create object
|
||||
obj = 'a' * 4 * 64 * 2 ** 10
|
||||
fd.write('PUT /v1/a/%s-discon/test HTTP/1.1\r\n'
|
||||
'Host: localhost\r\n'
|
||||
'Content-Length: %d\r\n'
|
||||
'X-Storage-Token: t\r\n'
|
||||
'Content-Type: donuts\r\n'
|
||||
'\r\n%s' % (self.ec_policy.name, len(obj), obj[:-10]))
|
||||
fd.flush()
|
||||
fd.close()
|
||||
sock.close()
|
||||
# sleep to trampoline enough
|
||||
condition = \
|
||||
lambda: _test_servers[0].logger.get_lines_for_level('warning')
|
||||
self._sleep_enough(condition)
|
||||
expected = ['Client disconnected without sending enough data']
|
||||
warns = _test_servers[0].logger.get_lines_for_level('warning')
|
||||
self.assertEqual(expected, warns)
|
||||
errors = _test_servers[0].logger.get_lines_for_level('error')
|
||||
self.assertEqual([], errors)
|
||||
|
||||
|
||||
class TestECObjectController(BaseTestECObjectController, unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
Loading…
Reference in New Issue
Block a user