From a4e25d6da8d007051b3b0575de47338e210d4315 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Mon, 14 Oct 2013 13:12:22 -0700 Subject: [PATCH] Proxy: use just one greenthread per client Instead of using two greenthreads and an eventlet.queue.Queue of size 1 to stream response bodies from backends to clients, just do it with a single greenthread that reads and writes in a loop. This should lower the amount of CPU used by the proxy in its response streaming. Client fairness used to be provided implicitly; since the queue only held 1 item, the read-from-backend greenthread would block after each chunk until the write-to-client greenthread got a chance to run. Now fairness is provided by an explicit eventlet.sleep() call, just as it is in the object server. Change-Id: Iae27109f5a3d109ad21ec9a972e39f22150f6dbb --- swift/proxy/controllers/base.py | 102 +++++++++++++------------------- 1 file changed, 40 insertions(+), 62 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 29a93921fe..afe2eac4ac 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -32,8 +32,7 @@ import itertools from swift import gettext_ as _ from urllib import quote -from eventlet import spawn_n, GreenPile -from eventlet.queue import Queue, Empty, Full +from eventlet import sleep, GreenPile from eventlet.timeout import Timeout from swift.common.wsgi import make_pre_authed_env @@ -41,7 +40,8 @@ from swift.common.utils import normalize_timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ quorum_size from swift.common.bufferedhttp import http_connect -from swift.common.exceptions import ChunkReadTimeout, ConnectionTimeout +from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ + ConnectionTimeout from swift.common.http import is_informational, is_success, is_redirection, \ is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \ HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \ @@ -903,48 +903,6 @@ class Controller(object): """ return self.GETorHEAD(req) - def _make_app_iter_reader(self, node, source, queue, logger_thread_locals): - """ - Reads from the source and places data in the queue. It expects - something else be reading from the queue and, if nothing does within - self.app.client_timeout seconds, the process will be aborted. - - :param node: The node dict that the source is connected to, for - logging/error-limiting purposes. - :param source: The httplib.Response object to read from. - :param queue: The eventlet.queue.Queue to place read source data into. - :param logger_thread_locals: The thread local values to be set on the - self.app.logger to retain transaction - logging information. - """ - self.app.logger.thread_locals = logger_thread_locals - success = True - try: - try: - while True: - with ChunkReadTimeout(self.app.node_timeout): - chunk = source.read(self.app.object_chunk_size) - if not chunk: - break - queue.put(chunk, timeout=self.app.client_timeout) - except Full: - self.app.logger.warn( - _('Client did not read from queue within %ss') % - self.app.client_timeout) - self.app.logger.increment('client_timeouts') - success = False - except (Exception, Timeout): - self.exception_occurred(node, _('Object'), - _('Trying to read during GET')) - success = False - finally: - # Ensure the queue getter gets a terminator. - queue.resize(2) - queue.put(success) - # Close-out the connection as best as possible. - if getattr(source, 'swift_conn', None): - self.close_swift_conn(source) - def _make_app_iter(self, node, source): """ Returns an iterator over the contents of the source (via its read @@ -956,29 +914,49 @@ class Controller(object): :param node: The node the source is reading from, for logging purposes. """ try: - # Spawn reader to read from the source and place in the queue. - # We then drop any reference to the source or node, for garbage - # collection purposes. - queue = Queue(1) - spawn_n(self._make_app_iter_reader, node, source, queue, - self.app.logger.thread_locals) - source = node = None + nchunks = 0 while True: - chunk = queue.get(timeout=self.app.node_timeout) - if isinstance(chunk, bool): # terminator - success = chunk - if not success: - raise Exception(_('Failed to read all data' - ' from the source')) + with ChunkReadTimeout(self.app.node_timeout): + chunk = source.read(self.app.object_chunk_size) + nchunks += 1 + if not chunk: break - yield chunk - except Empty: - raise ChunkReadTimeout() - except (GeneratorExit, Timeout): + with ChunkWriteTimeout(self.app.client_timeout): + yield chunk + # This is for fairness; if the network is outpacing the CPU, + # we'll always be able to read and write data without + # encountering an EWOULDBLOCK, and so eventlet will not switch + # greenthreads on its own. We do it manually so that clients + # don't starve. + # + # The number 5 here was chosen by making stuff up. It's not + # every single chunk, but it's not too big either, so it seemed + # like it would probably be an okay choice. + # + # Note that we may trampoline to other greenthreads more often + # than once every 5 chunks, depending on how blocking our + # network IO is; the explicit sleep here simply provides a + # lower bound on the rate of trampolining. + if nchunks % 5 == 0: + sleep() + except ChunkReadTimeout: + self.exception_occurred(node, _('Object'), + _('Trying to read during GET')) + raise + except ChunkWriteTimeout: + self.app.logger.warn( + _('Client did not read from proxy within %ss') % + self.app.client_timeout) + self.app.logger.increment('client_timeouts') + except GeneratorExit: self.app.logger.warn(_('Client disconnected on read')) except Exception: self.app.logger.exception(_('Trying to send to client')) raise + finally: + # Close-out the connection as best as possible. + if getattr(source, 'swift_conn', None): + self.close_swift_conn(source) def close_swift_conn(self, src): """