Merge "proxy: don't send multi-part terminator when no parts sent"
This commit is contained in:
commit
0cb02a6ce5
@ -1370,7 +1370,7 @@ class GetOrHeadHandler(GetterBase):
|
|||||||
except ChunkReadTimeout:
|
except ChunkReadTimeout:
|
||||||
if not self._replace_source(
|
if not self._replace_source(
|
||||||
'Trying to read object during GET (retrying)'):
|
'Trying to read object during GET (retrying)'):
|
||||||
raise StopIteration()
|
raise
|
||||||
|
|
||||||
def _iter_bytes_from_response_part(self, part_file, nbytes):
|
def _iter_bytes_from_response_part(self, part_file, nbytes):
|
||||||
# yield chunks of bytes from a single response part; if an error
|
# yield chunks of bytes from a single response part; if an error
|
||||||
@ -1446,10 +1446,6 @@ class GetOrHeadHandler(GetterBase):
|
|||||||
if part_iter:
|
if part_iter:
|
||||||
part_iter.close()
|
part_iter.close()
|
||||||
|
|
||||||
except ChunkReadTimeout:
|
|
||||||
self.app.exception_occurred(self.source.node, 'Object',
|
|
||||||
'Trying to read during GET')
|
|
||||||
raise
|
|
||||||
except ChunkWriteTimeout:
|
except ChunkWriteTimeout:
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Client did not read from proxy within %ss',
|
'Client did not read from proxy within %ss',
|
||||||
|
@ -1497,3 +1497,35 @@ class FakeSource(object):
|
|||||||
def getheaders(self):
|
def getheaders(self):
|
||||||
return [('content-length', self.getheader('content-length'))] + \
|
return [('content-length', self.getheader('content-length'))] + \
|
||||||
[(k, v) for k, v in self.headers.items()]
|
[(k, v) for k, v in self.headers.items()]
|
||||||
|
|
||||||
|
|
||||||
|
def get_node_error_stats(proxy_app, ring_node):
|
||||||
|
node_key = proxy_app.error_limiter.node_key(ring_node)
|
||||||
|
return proxy_app.error_limiter.stats.get(node_key) or {}
|
||||||
|
|
||||||
|
|
||||||
|
def node_error_count(proxy_app, ring_node):
|
||||||
|
# Reach into the proxy's internals to get the error count for a
|
||||||
|
# particular node
|
||||||
|
return get_node_error_stats(proxy_app, ring_node).get('errors', 0)
|
||||||
|
|
||||||
|
|
||||||
|
def node_error_counts(proxy_app, ring_nodes):
|
||||||
|
# Reach into the proxy's internals to get the error counts for a
|
||||||
|
# list of nodes
|
||||||
|
return sorted([get_node_error_stats(proxy_app, node).get('errors', 0)
|
||||||
|
for node in ring_nodes], reverse=True)
|
||||||
|
|
||||||
|
|
||||||
|
def node_last_error(proxy_app, ring_node):
|
||||||
|
# Reach into the proxy's internals to get the last error for a
|
||||||
|
# particular node
|
||||||
|
return get_node_error_stats(proxy_app, ring_node).get('last_error')
|
||||||
|
|
||||||
|
|
||||||
|
def set_node_errors(proxy_app, ring_node, value, last_error):
|
||||||
|
# Set the node's error count to value
|
||||||
|
node_key = proxy_app.error_limiter.node_key(ring_node)
|
||||||
|
stats = {'errors': value,
|
||||||
|
'last_error': last_error}
|
||||||
|
proxy_app.error_limiter.stats[node_key] = stats
|
||||||
|
@ -56,8 +56,8 @@ from test.unit import (
|
|||||||
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
|
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
|
||||||
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub,
|
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub,
|
||||||
fake_ec_node_response, StubResponse, mocked_http_conn,
|
fake_ec_node_response, StubResponse, mocked_http_conn,
|
||||||
quiet_eventlet_exceptions, FakeSource, make_timestamp_iter, FakeMemcache)
|
quiet_eventlet_exceptions, FakeSource, make_timestamp_iter, FakeMemcache,
|
||||||
from test.unit.proxy.test_server import node_error_count
|
node_error_count, node_error_counts)
|
||||||
|
|
||||||
|
|
||||||
def unchunk_body(chunked_body):
|
def unchunk_body(chunked_body):
|
||||||
@ -1773,13 +1773,15 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
|
|
||||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||||
'Range': 'bytes=0-49,100-104'})
|
'Range': 'bytes=0-49,100-104'})
|
||||||
with capture_http_requests(get_response) as log:
|
with capture_http_requests(get_response) as captured_requests:
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
self.assertEqual(resp.status_int, 206)
|
self.assertEqual(resp.status_int, 206)
|
||||||
actual_body = resp.body
|
actual_body = resp.body
|
||||||
|
|
||||||
self.assertEqual(resp.status_int, 206)
|
self.assertEqual(resp.status_int, 206)
|
||||||
self.assertEqual(2, len(log))
|
self.assertEqual(2, len(captured_requests))
|
||||||
|
self.assertEqual([1] + [0] * (self.replicas() - 1),
|
||||||
|
node_error_counts(self.app, self.obj_ring.devs))
|
||||||
# note: client response uses boundary from first backend response
|
# note: client response uses boundary from first backend response
|
||||||
self.assertEqual(resp_body1, actual_body)
|
self.assertEqual(resp_body1, actual_body)
|
||||||
error_lines = self.app.logger.get_lines_for_level('error')
|
error_lines = self.app.logger.get_lines_for_level('error')
|
||||||
@ -1815,7 +1817,6 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
|
|
||||||
def test_GET_with_multirange_slow_body_unable_to_resume(self):
|
def test_GET_with_multirange_slow_body_unable_to_resume(self):
|
||||||
self.app.recoverable_node_timeout = 0.01
|
self.app.recoverable_node_timeout = 0.01
|
||||||
self.app.object_chunk_size = 10
|
|
||||||
obj_data = b'testing' * 100
|
obj_data = b'testing' * 100
|
||||||
etag = md5(obj_data, usedforsecurity=False).hexdigest()
|
etag = md5(obj_data, usedforsecurity=False).hexdigest()
|
||||||
boundary = b'81eb9c110b32ced5fe'
|
boundary = b'81eb9c110b32ced5fe'
|
||||||
@ -1851,15 +1852,18 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
|
|
||||||
req = swob.Request.blank('/v1/a/c/o', headers={
|
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||||
'Range': 'bytes=0-49,100-104'})
|
'Range': 'bytes=0-49,100-104'})
|
||||||
|
response_chunks = []
|
||||||
with capture_http_requests(get_response) as log:
|
with capture_http_requests(get_response) as log:
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
|
with self.assertRaises(ChunkReadTimeout):
|
||||||
|
# note: the error is raised while the resp_iter is read...
|
||||||
|
for chunk in resp.app_iter:
|
||||||
|
response_chunks.append(chunk)
|
||||||
|
self.assertEqual(response_chunks, [])
|
||||||
self.assertEqual(resp.status_int, 206)
|
self.assertEqual(resp.status_int, 206)
|
||||||
actual_body = resp.body
|
self.assertEqual([1, 1, 1],
|
||||||
|
node_error_counts(self.app, self.obj_ring.devs))
|
||||||
self.assertEqual(resp.status_int, 206)
|
|
||||||
self.assertEqual(6, len(log))
|
self.assertEqual(6, len(log))
|
||||||
resp_boundary = resp.headers['content-type'].rsplit('=', 1)[1].encode()
|
|
||||||
self.assertEqual(b'--%s--' % resp_boundary, actual_body)
|
|
||||||
error_lines = self.app.logger.get_lines_for_level('error')
|
error_lines = self.app.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(3, len(error_lines))
|
self.assertEqual(3, len(error_lines))
|
||||||
for line in error_lines:
|
for line in error_lines:
|
||||||
@ -1895,6 +1899,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
_ = resp.body
|
_ = resp.body
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
self.assertEqual(etag, resp.headers.get('ETag'))
|
self.assertEqual(etag, resp.headers.get('ETag'))
|
||||||
|
self.assertEqual([1] * self.replicas(),
|
||||||
|
node_error_counts(self.app, self.obj_ring.devs))
|
||||||
|
|
||||||
error_lines = self.app.logger.get_lines_for_level('error')
|
error_lines = self.app.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(3, len(error_lines))
|
self.assertEqual(3, len(error_lines))
|
||||||
@ -2024,6 +2030,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
log.requests[2]['headers']['Range'])
|
log.requests[2]['headers']['Range'])
|
||||||
self.assertNotIn('X-Backend-Ignore-Range-If-Metadata-Present',
|
self.assertNotIn('X-Backend-Ignore-Range-If-Metadata-Present',
|
||||||
log.requests[2]['headers'])
|
log.requests[2]['headers'])
|
||||||
|
self.assertEqual([1, 1] + [0] * (self.replicas() - 2),
|
||||||
|
node_error_counts(self.app, self.obj_ring.devs))
|
||||||
|
|
||||||
def test_GET_transfer_encoding_chunked(self):
|
def test_GET_transfer_encoding_chunked(self):
|
||||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||||
@ -2088,6 +2096,8 @@ class TestReplicatedObjController(CommonObjectControllerMixin,
|
|||||||
with set_http_connect(*codes):
|
with set_http_connect(*codes):
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
self.assertEqual(resp.status_int, 503)
|
self.assertEqual(resp.status_int, 503)
|
||||||
|
self.assertEqual([1] * self.replicas(),
|
||||||
|
node_error_counts(self.app, self.obj_ring.devs))
|
||||||
|
|
||||||
def test_HEAD_error_limit_supression_count(self):
|
def test_HEAD_error_limit_supression_count(self):
|
||||||
def do_test(primary_codes, expected, clear_stats=True):
|
def do_test(primary_codes, expected, clear_stats=True):
|
||||||
@ -4873,7 +4883,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
'Range': 'bytes=1000-2000,14000-15000'})
|
'Range': 'bytes=1000-2000,14000-15000'})
|
||||||
with capture_http_requests(get_response) as log:
|
with capture_http_requests(get_response) as log:
|
||||||
resp = req.get_response(self.app)
|
resp = req.get_response(self.app)
|
||||||
_ = resp.body
|
# note: the error is raised before the resp_iter is read
|
||||||
|
self.assertIn(b'Internal Error', resp.body)
|
||||||
self.assertEqual(resp.status_int, 500)
|
self.assertEqual(resp.status_int, 500)
|
||||||
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
||||||
log_lines = self.app.logger.get_lines_for_level('error')
|
log_lines = self.app.logger.get_lines_for_level('error')
|
||||||
@ -4886,6 +4897,84 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
|||||||
self.assertIn('Unhandled exception in request: ChunkReadTimeout',
|
self.assertIn('Unhandled exception in request: ChunkReadTimeout',
|
||||||
log_lines[2])
|
log_lines[2])
|
||||||
|
|
||||||
|
def test_GET_with_multirange_unable_to_resume_body_started(self):
|
||||||
|
self.app.object_chunk_size = 256
|
||||||
|
self.app.recoverable_node_timeout = 0.01
|
||||||
|
test_body = b'test' * self.policy.ec_segment_size
|
||||||
|
ec_stub = make_ec_object_stub(test_body, self.policy, None)
|
||||||
|
frag_archives = ec_stub['frags']
|
||||||
|
self.assertEqual(len(frag_archives[0]), 1960)
|
||||||
|
boundary = b'81eb9c110b32ced5fe'
|
||||||
|
|
||||||
|
def make_mime_body(frag_archive):
|
||||||
|
return b'\r\n'.join([
|
||||||
|
b'--' + boundary,
|
||||||
|
b'Content-Type: application/octet-stream',
|
||||||
|
b'Content-Range: bytes 0-489/1960',
|
||||||
|
b'',
|
||||||
|
frag_archive[0:490],
|
||||||
|
b'--' + boundary,
|
||||||
|
b'Content-Type: application/octet-stream',
|
||||||
|
b'Content-Range: bytes 1470-1959/1960',
|
||||||
|
b'',
|
||||||
|
frag_archive[1470:],
|
||||||
|
b'--' + boundary + b'--',
|
||||||
|
])
|
||||||
|
|
||||||
|
obj_resp_bodies = [make_mime_body(fa) for fa
|
||||||
|
# no extra good responses
|
||||||
|
in ec_stub['frags'][:self.policy.ec_ndata]]
|
||||||
|
|
||||||
|
headers = {
|
||||||
|
'Content-Type': b'multipart/byteranges;boundary=' + boundary,
|
||||||
|
'Content-Length': len(obj_resp_bodies[0]),
|
||||||
|
'X-Object-Sysmeta-Ec-Content-Length': len(ec_stub['body']),
|
||||||
|
'X-Object-Sysmeta-Ec-Etag': ec_stub['etag'],
|
||||||
|
'X-Timestamp': Timestamp(self.ts()).normal,
|
||||||
|
}
|
||||||
|
|
||||||
|
responses = [
|
||||||
|
StubResponse(206, body, headers, i,
|
||||||
|
# make the first one slow
|
||||||
|
slowdown=0.1 if i == 0 else None)
|
||||||
|
for i, body in enumerate(obj_resp_bodies)
|
||||||
|
]
|
||||||
|
# the first response serves some bytes before slowing down
|
||||||
|
responses[0].slowdown_after = 1000
|
||||||
|
|
||||||
|
def get_response(req):
|
||||||
|
return responses.pop(0) if responses else StubResponse(404)
|
||||||
|
|
||||||
|
req = swob.Request.blank('/v1/a/c/o', headers={
|
||||||
|
'Range': 'bytes=1000-2000,14000-15000'})
|
||||||
|
response_chunks = []
|
||||||
|
with capture_http_requests(get_response) as log:
|
||||||
|
resp = req.get_response(self.app)
|
||||||
|
with self.assertRaises(ChunkReadTimeout):
|
||||||
|
# note: the error is raised while the resp_iter is read
|
||||||
|
for chunk in resp.app_iter:
|
||||||
|
response_chunks.append(chunk)
|
||||||
|
boundary = resp.headers['Content-Type'].split('=', 1)[1]
|
||||||
|
self.assertEqual(response_chunks, [
|
||||||
|
b'\r\n'.join([
|
||||||
|
b'--' + boundary.encode('ascii'),
|
||||||
|
b'Content-Type: application/octet-stream',
|
||||||
|
b'Content-Range: bytes 1000-2000/16384',
|
||||||
|
b'',
|
||||||
|
b'',
|
||||||
|
]),
|
||||||
|
test_body[0:1001],
|
||||||
|
b'\r\n',
|
||||||
|
])
|
||||||
|
self.assertEqual(resp.status_int, 206)
|
||||||
|
self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2)
|
||||||
|
log_lines = self.app.logger.get_lines_for_level('error')
|
||||||
|
self.assertEqual(2, len(log_lines), log_lines)
|
||||||
|
self.assertIn('Trying to read next part of EC multi-part GET',
|
||||||
|
log_lines[0])
|
||||||
|
self.assertIn('Trying to read during GET: ChunkReadTimeout',
|
||||||
|
log_lines[1])
|
||||||
|
|
||||||
def test_GET_with_multirange_short_resume_body(self):
|
def test_GET_with_multirange_short_resume_body(self):
|
||||||
self.app.object_chunk_size = 256
|
self.app.object_chunk_size = 256
|
||||||
self.app.recoverable_node_timeout = 0.01
|
self.app.recoverable_node_timeout = 0.01
|
||||||
|
@ -58,7 +58,7 @@ from test.unit import (
|
|||||||
connect_tcp, readuntil2crlfs, fake_http_connect, FakeRing,
|
connect_tcp, readuntil2crlfs, fake_http_connect, FakeRing,
|
||||||
FakeMemcache, patch_policies, write_fake_ring, mocked_http_conn,
|
FakeMemcache, patch_policies, write_fake_ring, mocked_http_conn,
|
||||||
DEFAULT_TEST_EC_TYPE, make_timestamp_iter, skip_if_no_xattrs,
|
DEFAULT_TEST_EC_TYPE, make_timestamp_iter, skip_if_no_xattrs,
|
||||||
FakeHTTPResponse)
|
FakeHTTPResponse, node_error_count, node_last_error, set_node_errors)
|
||||||
from test.unit.helpers import setup_servers, teardown_servers
|
from test.unit.helpers import setup_servers, teardown_servers
|
||||||
from swift.proxy import server as proxy_server
|
from swift.proxy import server as proxy_server
|
||||||
from swift.proxy.controllers.obj import ReplicatedObjectController
|
from swift.proxy.controllers.obj import ReplicatedObjectController
|
||||||
@ -154,31 +154,6 @@ def parse_headers_string(headers_str):
|
|||||||
return headers_dict
|
return headers_dict
|
||||||
|
|
||||||
|
|
||||||
def get_node_error_stats(proxy_app, ring_node):
|
|
||||||
node_key = proxy_app.error_limiter.node_key(ring_node)
|
|
||||||
return proxy_app.error_limiter.stats.get(node_key) or {}
|
|
||||||
|
|
||||||
|
|
||||||
def node_error_count(proxy_app, ring_node):
|
|
||||||
# Reach into the proxy's internals to get the error count for a
|
|
||||||
# particular node
|
|
||||||
return get_node_error_stats(proxy_app, ring_node).get('errors', 0)
|
|
||||||
|
|
||||||
|
|
||||||
def node_last_error(proxy_app, ring_node):
|
|
||||||
# Reach into the proxy's internals to get the last error for a
|
|
||||||
# particular node
|
|
||||||
return get_node_error_stats(proxy_app, ring_node).get('last_error')
|
|
||||||
|
|
||||||
|
|
||||||
def set_node_errors(proxy_app, ring_node, value, last_error):
|
|
||||||
# Set the node's error count to value
|
|
||||||
node_key = proxy_app.error_limiter.node_key(ring_node)
|
|
||||||
stats = {'errors': value,
|
|
||||||
'last_error': last_error}
|
|
||||||
proxy_app.error_limiter.stats[node_key] = stats
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def save_globals():
|
def save_globals():
|
||||||
orig_http_connect = getattr(swift.proxy.controllers.base, 'http_connect',
|
orig_http_connect = getattr(swift.proxy.controllers.base, 'http_connect',
|
||||||
|
Loading…
Reference in New Issue
Block a user