Merge "Get better at closing WSGI iterables."
This commit is contained in:
commit
5370526b57
@ -22,7 +22,8 @@ from swift.common.http import is_success
|
||||
from swift.common.swob import Request, Response, \
|
||||
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict
|
||||
from swift.common.utils import get_logger, json, \
|
||||
RateLimitedIterator, read_conf_dir, quote
|
||||
RateLimitedIterator, read_conf_dir, quote, close_if_possible, \
|
||||
closing_if_possible
|
||||
from swift.common.request_helpers import SegmentedIterable
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
from urllib import unquote
|
||||
@ -48,7 +49,8 @@ class GetContext(WSGIContext):
|
||||
con_resp = con_req.get_response(self.dlo.app)
|
||||
if not is_success(con_resp.status_int):
|
||||
return con_resp, None
|
||||
return None, json.loads(''.join(con_resp.app_iter))
|
||||
with closing_if_possible(con_resp.app_iter):
|
||||
return None, json.loads(''.join(con_resp.app_iter))
|
||||
|
||||
def _segment_listing_iterator(self, req, version, account, container,
|
||||
prefix, segments, first_byte=None,
|
||||
@ -107,6 +109,7 @@ class GetContext(WSGIContext):
|
||||
# we've already started sending the response body to the
|
||||
# client, so all we can do is raise an exception to make the
|
||||
# WSGI server close the connection early
|
||||
close_if_possible(error_response.app_iter)
|
||||
raise ListingIterError(
|
||||
"Got status %d listing container /%s/%s" %
|
||||
(error_response.status_int, account, container))
|
||||
@ -233,6 +236,7 @@ class GetContext(WSGIContext):
|
||||
# make sure this response is for a dynamic large object manifest
|
||||
for header, value in self._response_headers:
|
||||
if (header.lower() == 'x-object-manifest'):
|
||||
close_if_possible(resp_iter)
|
||||
response = self.get_or_head_response(req, value)
|
||||
return response(req.environ, start_response)
|
||||
else:
|
||||
|
@ -159,9 +159,9 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
||||
Response
|
||||
from swift.common.utils import json, get_logger, config_true_value, \
|
||||
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
|
||||
register_swift_info, RateLimitedIterator, quote
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
closing_if_possible, close_if_possible
|
||||
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
|
||||
closing_if_possible
|
||||
from swift.common.request_helpers import SegmentedIterable
|
||||
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
@ -239,6 +239,7 @@ class SloGetContext(WSGIContext):
|
||||
sub_resp = sub_req.get_response(self.slo.app)
|
||||
|
||||
if not is_success(sub_resp.status_int):
|
||||
close_if_possible(sub_resp.app_iter)
|
||||
raise ListingIterError(
|
||||
'ERROR: while fetching %s, GET of submanifest %s '
|
||||
'failed with status %d' % (req.path, sub_req.path,
|
||||
@ -412,7 +413,8 @@ class SloGetContext(WSGIContext):
|
||||
return response(req.environ, start_response)
|
||||
|
||||
def get_or_head_response(self, req, resp_headers, resp_iter):
|
||||
resp_body = ''.join(resp_iter)
|
||||
with closing_if_possible(resp_iter):
|
||||
resp_body = ''.join(resp_iter)
|
||||
try:
|
||||
segments = json.loads(resp_body)
|
||||
except ValueError:
|
||||
|
@ -23,7 +23,6 @@ from swob in here without creating circular imports.
|
||||
import hashlib
|
||||
import itertools
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from urllib import unquote
|
||||
from swift import gettext_ as _
|
||||
from swift.common.storage_policy import POLICIES
|
||||
@ -32,7 +31,8 @@ from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.http import is_success
|
||||
from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
|
||||
HTTPServiceUnavailable)
|
||||
from swift.common.utils import split_path, validate_device_partition
|
||||
from swift.common.utils import split_path, validate_device_partition, \
|
||||
close_if_possible
|
||||
from swift.common.wsgi import make_subrequest
|
||||
|
||||
|
||||
@ -249,26 +249,6 @@ def copy_header_subset(from_r, to_r, condition):
|
||||
to_r.headers[k] = v
|
||||
|
||||
|
||||
def close_if_possible(maybe_closable):
|
||||
close_method = getattr(maybe_closable, 'close', None)
|
||||
if callable(close_method):
|
||||
return close_method()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def closing_if_possible(maybe_closable):
|
||||
"""
|
||||
Like contextlib.closing(), but doesn't crash if the object lacks a close()
|
||||
method.
|
||||
|
||||
PEP 333 (WSGI) says: "If the iterable returned by the application has a
|
||||
close() method, the server or gateway must call that method upon
|
||||
completion of the current request[.]" This function makes that easier.
|
||||
"""
|
||||
yield maybe_closable
|
||||
close_if_possible(maybe_closable)
|
||||
|
||||
|
||||
class SegmentedIterable(object):
|
||||
"""
|
||||
Iterable that returns the object contents for a large object.
|
||||
@ -304,6 +284,7 @@ class SegmentedIterable(object):
|
||||
self.peeked_chunk = None
|
||||
self.app_iter = self._internal_iter()
|
||||
self.validated_first_segment = False
|
||||
self.current_resp = None
|
||||
|
||||
def _internal_iter(self):
|
||||
start_time = time.time()
|
||||
@ -360,6 +341,8 @@ class SegmentedIterable(object):
|
||||
'r_size': seg_resp.content_length,
|
||||
's_etag': seg_etag,
|
||||
's_size': seg_size})
|
||||
else:
|
||||
self.current_resp = seg_resp
|
||||
|
||||
seg_hash = hashlib.md5()
|
||||
for chunk in seg_resp.app_iter:
|
||||
@ -431,3 +414,11 @@ class SegmentedIterable(object):
|
||||
return itertools.chain([pc], self.app_iter)
|
||||
else:
|
||||
return self.app_iter
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Called when the client disconnect. Ensure that the connection to the
|
||||
backend server is closed.
|
||||
"""
|
||||
if self.current_resp:
|
||||
close_if_possible(self.current_resp.app_iter)
|
||||
|
@ -49,7 +49,8 @@ import random
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
from swift.common.utils import reiterate, split_path, Timestamp, pairs
|
||||
from swift.common.utils import reiterate, split_path, Timestamp, pairs, \
|
||||
close_if_possible
|
||||
from swift.common.exceptions import InvalidTimestamp
|
||||
|
||||
|
||||
@ -1220,12 +1221,14 @@ class Response(object):
|
||||
etag in self.request.if_none_match:
|
||||
self.status = 304
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if etag and self.request.if_match and \
|
||||
etag not in self.request.if_match:
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.status_int == 404 and self.request.if_match \
|
||||
@ -1236,18 +1239,21 @@ class Response(object):
|
||||
# Failed) response. [RFC 2616 section 14.24]
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.last_modified and self.request.if_modified_since \
|
||||
and self.last_modified <= self.request.if_modified_since:
|
||||
self.status = 304
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.last_modified and self.request.if_unmodified_since \
|
||||
and self.last_modified > self.request.if_unmodified_since:
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.request and self.request.method == 'HEAD':
|
||||
@ -1261,6 +1267,7 @@ class Response(object):
|
||||
if ranges == []:
|
||||
self.status = 416
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
elif ranges:
|
||||
range_size = len(ranges)
|
||||
|
@ -3161,6 +3161,28 @@ def ismount_raw(path):
|
||||
return False
|
||||
|
||||
|
||||
def close_if_possible(maybe_closable):
|
||||
close_method = getattr(maybe_closable, 'close', None)
|
||||
if callable(close_method):
|
||||
return close_method()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def closing_if_possible(maybe_closable):
|
||||
"""
|
||||
Like contextlib.closing(), but doesn't crash if the object lacks a close()
|
||||
method.
|
||||
|
||||
PEP 333 (WSGI) says: "If the iterable returned by the application has a
|
||||
close() method, the server or gateway must call that method upon
|
||||
completion of the current request[.]" This function makes that easier.
|
||||
"""
|
||||
try:
|
||||
yield maybe_closable
|
||||
finally:
|
||||
close_if_possible(maybe_closable)
|
||||
|
||||
|
||||
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'
|
||||
_rfc_extension_pattern = re.compile(
|
||||
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
|
||||
|
@ -44,7 +44,7 @@ from swift.common.utils import (
|
||||
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
|
||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||
document_iters_to_http_response_body, parse_content_range,
|
||||
quorum_size, reiterate)
|
||||
quorum_size, reiterate, close_if_possible)
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_copy_from_header, check_destination_header, \
|
||||
@ -70,7 +70,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \
|
||||
HTTPRequestedRangeNotSatisfiable, Range
|
||||
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
|
||||
remove_items, copy_header_subset, close_if_possible
|
||||
remove_items, copy_header_subset
|
||||
|
||||
|
||||
def copy_headers_into(from_r, to_r):
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
# This stuff can't live in test/unit/__init__.py due to its swob dependency.
|
||||
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from hashlib import md5
|
||||
from swift.common import swob
|
||||
@ -23,6 +24,20 @@ from swift.common.utils import split_path
|
||||
from test.unit import FakeLogger, FakeRing
|
||||
|
||||
|
||||
class LeakTrackingIter(object):
|
||||
def __init__(self, inner_iter, fake_swift, path):
|
||||
self.inner_iter = inner_iter
|
||||
self.fake_swift = fake_swift
|
||||
self.path = path
|
||||
|
||||
def __iter__(self):
|
||||
for x in self.inner_iter:
|
||||
yield x
|
||||
|
||||
def close(self):
|
||||
self.fake_swift.mark_closed(self.path)
|
||||
|
||||
|
||||
class FakeSwift(object):
|
||||
"""
|
||||
A good-enough fake Swift proxy server to use in testing middleware.
|
||||
@ -30,6 +45,7 @@ class FakeSwift(object):
|
||||
|
||||
def __init__(self):
|
||||
self._calls = []
|
||||
self._unclosed_req_paths = defaultdict(int)
|
||||
self.req_method_paths = []
|
||||
self.swift_sources = []
|
||||
self.uploaded = {}
|
||||
@ -105,7 +121,21 @@ class FakeSwift(object):
|
||||
req = swob.Request(env)
|
||||
resp = resp_class(req=req, headers=headers, body=body,
|
||||
conditional_response=True)
|
||||
return resp(env, start_response)
|
||||
wsgi_iter = resp(env, start_response)
|
||||
self.mark_opened(path)
|
||||
return LeakTrackingIter(wsgi_iter, self, path)
|
||||
|
||||
def mark_opened(self, path):
|
||||
self._unclosed_req_paths[path] += 1
|
||||
|
||||
def mark_closed(self, path):
|
||||
self._unclosed_req_paths[path] -= 1
|
||||
|
||||
@property
|
||||
def unclosed_requests(self):
|
||||
return {path: count
|
||||
for path, count in self._unclosed_req_paths.items()
|
||||
if count > 0}
|
||||
|
||||
@property
|
||||
def calls(self):
|
||||
|
@ -26,6 +26,7 @@ import unittest
|
||||
|
||||
from swift.common import exceptions, swob
|
||||
from swift.common.middleware import dlo
|
||||
from swift.common.utils import closing_if_possible
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
@ -54,8 +55,10 @@ class DloTestCase(unittest.TestCase):
|
||||
body = ''
|
||||
caught_exc = None
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
# appease the close-checker
|
||||
with closing_if_possible(body_iter):
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
except Exception as exc:
|
||||
if expect_exception:
|
||||
caught_exc = exc
|
||||
@ -279,6 +282,9 @@ class TestDloHeadManifest(DloTestCase):
|
||||
|
||||
|
||||
class TestDloGetManifest(DloTestCase):
|
||||
def tearDown(self):
|
||||
self.assertEqual(self.app.unclosed_requests, {})
|
||||
|
||||
def test_get_manifest(self):
|
||||
expected_etag = '"%s"' % md5hex(
|
||||
md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") +
|
||||
|
@ -24,7 +24,7 @@ from swift.common import swob, utils
|
||||
from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.middleware import slo
|
||||
from swift.common.swob import Request, Response, HTTPException
|
||||
from swift.common.utils import quote, json
|
||||
from swift.common.utils import quote, json, closing_if_possible
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
@ -74,8 +74,10 @@ class SloTestCase(unittest.TestCase):
|
||||
body = ''
|
||||
caught_exc = None
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
# appease the close-checker
|
||||
with closing_if_possible(body_iter):
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
except Exception as exc:
|
||||
if expect_exception:
|
||||
caught_exc = exc
|
||||
@ -232,7 +234,7 @@ class TestSloPutManifest(SloTestCase):
|
||||
'/?multipart-manifest=put',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data)
|
||||
self.assertEquals(
|
||||
self.slo.handle_multipart_put(req, fake_start_response),
|
||||
list(self.slo.handle_multipart_put(req, fake_start_response)),
|
||||
['passed'])
|
||||
|
||||
def test_handle_multipart_put_success(self):
|
||||
@ -949,6 +951,9 @@ class TestSloGetManifest(SloTestCase):
|
||||
'X-Object-Meta-Fish': 'Bass'},
|
||||
"[not {json (at ++++all")
|
||||
|
||||
def tearDown(self):
|
||||
self.assertEqual(self.app.unclosed_requests, {})
|
||||
|
||||
def test_get_manifest_passthrough(self):
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/gettest/manifest-bc?multipart-manifest=get',
|
||||
|
Loading…
x
Reference in New Issue
Block a user