Merge "Proxy: use just one greenthread per client"
This commit is contained in:
commit
16ddd3c53f
@ -32,8 +32,7 @@ import itertools
|
||||
from swift import gettext_ as _
|
||||
from urllib import quote
|
||||
|
||||
from eventlet import spawn_n, GreenPile
|
||||
from eventlet.queue import Queue, Empty, Full
|
||||
from eventlet import sleep, GreenPile
|
||||
from eventlet.timeout import Timeout
|
||||
|
||||
from swift.common.wsgi import make_pre_authed_env
|
||||
@ -41,7 +40,8 @@ from swift.common.utils import normalize_timestamp, config_true_value, \
|
||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||
quorum_size
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout
|
||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||
ConnectionTimeout
|
||||
from swift.common.http import is_informational, is_success, is_redirection, \
|
||||
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
|
||||
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
|
||||
@ -903,48 +903,6 @@ class Controller(object):
|
||||
"""
|
||||
return self.GETorHEAD(req)
|
||||
|
||||
def _make_app_iter_reader(self, node, source, queue, logger_thread_locals):
|
||||
"""
|
||||
Reads from the source and places data in the queue. It expects
|
||||
something else be reading from the queue and, if nothing does within
|
||||
self.app.client_timeout seconds, the process will be aborted.
|
||||
|
||||
:param node: The node dict that the source is connected to, for
|
||||
logging/error-limiting purposes.
|
||||
:param source: The httplib.Response object to read from.
|
||||
:param queue: The eventlet.queue.Queue to place read source data into.
|
||||
:param logger_thread_locals: The thread local values to be set on the
|
||||
self.app.logger to retain transaction
|
||||
logging information.
|
||||
"""
|
||||
self.app.logger.thread_locals = logger_thread_locals
|
||||
success = True
|
||||
try:
|
||||
try:
|
||||
while True:
|
||||
with ChunkReadTimeout(self.app.node_timeout):
|
||||
chunk = source.read(self.app.object_chunk_size)
|
||||
if not chunk:
|
||||
break
|
||||
queue.put(chunk, timeout=self.app.client_timeout)
|
||||
except Full:
|
||||
self.app.logger.warn(
|
||||
_('Client did not read from queue within %ss') %
|
||||
self.app.client_timeout)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
success = False
|
||||
except (Exception, Timeout):
|
||||
self.exception_occurred(node, _('Object'),
|
||||
_('Trying to read during GET'))
|
||||
success = False
|
||||
finally:
|
||||
# Ensure the queue getter gets a terminator.
|
||||
queue.resize(2)
|
||||
queue.put(success)
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source, 'swift_conn', None):
|
||||
self.close_swift_conn(source)
|
||||
|
||||
def _make_app_iter(self, node, source):
|
||||
"""
|
||||
Returns an iterator over the contents of the source (via its read
|
||||
@ -956,29 +914,49 @@ class Controller(object):
|
||||
:param node: The node the source is reading from, for logging purposes.
|
||||
"""
|
||||
try:
|
||||
# Spawn reader to read from the source and place in the queue.
|
||||
# We then drop any reference to the source or node, for garbage
|
||||
# collection purposes.
|
||||
queue = Queue(1)
|
||||
spawn_n(self._make_app_iter_reader, node, source, queue,
|
||||
self.app.logger.thread_locals)
|
||||
source = node = None
|
||||
nchunks = 0
|
||||
while True:
|
||||
chunk = queue.get(timeout=self.app.node_timeout)
|
||||
if isinstance(chunk, bool): # terminator
|
||||
success = chunk
|
||||
if not success:
|
||||
raise Exception(_('Failed to read all data'
|
||||
' from the source'))
|
||||
with ChunkReadTimeout(self.app.node_timeout):
|
||||
chunk = source.read(self.app.object_chunk_size)
|
||||
nchunks += 1
|
||||
if not chunk:
|
||||
break
|
||||
yield chunk
|
||||
except Empty:
|
||||
raise ChunkReadTimeout()
|
||||
except (GeneratorExit, Timeout):
|
||||
with ChunkWriteTimeout(self.app.client_timeout):
|
||||
yield chunk
|
||||
# This is for fairness; if the network is outpacing the CPU,
|
||||
# we'll always be able to read and write data without
|
||||
# encountering an EWOULDBLOCK, and so eventlet will not switch
|
||||
# greenthreads on its own. We do it manually so that clients
|
||||
# don't starve.
|
||||
#
|
||||
# The number 5 here was chosen by making stuff up. It's not
|
||||
# every single chunk, but it's not too big either, so it seemed
|
||||
# like it would probably be an okay choice.
|
||||
#
|
||||
# Note that we may trampoline to other greenthreads more often
|
||||
# than once every 5 chunks, depending on how blocking our
|
||||
# network IO is; the explicit sleep here simply provides a
|
||||
# lower bound on the rate of trampolining.
|
||||
if nchunks % 5 == 0:
|
||||
sleep()
|
||||
except ChunkReadTimeout:
|
||||
self.exception_occurred(node, _('Object'),
|
||||
_('Trying to read during GET'))
|
||||
raise
|
||||
except ChunkWriteTimeout:
|
||||
self.app.logger.warn(
|
||||
_('Client did not read from proxy within %ss') %
|
||||
self.app.client_timeout)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
except GeneratorExit:
|
||||
self.app.logger.warn(_('Client disconnected on read'))
|
||||
except Exception:
|
||||
self.app.logger.exception(_('Trying to send to client'))
|
||||
raise
|
||||
finally:
|
||||
# Close-out the connection as best as possible.
|
||||
if getattr(source, 'swift_conn', None):
|
||||
self.close_swift_conn(source)
|
||||
|
||||
def close_swift_conn(self, src):
|
||||
"""
|
||||
|
Loading…
x
Reference in New Issue
Block a user