From f5648638ee6f939556ebfcb40dfdb8a590d3b5ae Mon Sep 17 00:00:00 2001
From: David Goetz <david.goetz@rackspace.com>
Date: Mon, 4 Nov 2013 17:06:06 +0000
Subject: [PATCH] Get retry.

If a source times out on read try another one of them with a
modified range.  There had to be a lot of moved around code
to get this working but it should all make sense.

Change-Id: Ieaf045690a8823927a6f38098a95b37a4d4adb70
---
 swift/proxy/controllers/base.py          | 595 ++++++++++++-----------
 swift/proxy/controllers/obj.py           |  31 +-
 swift/proxy/server.py                    | 121 +++++
 test/unit/__init__.py                    |  13 +-
 test/unit/proxy/controllers/test_base.py |  28 +-
 test/unit/proxy/test_server.py           | 125 +++--
 6 files changed, 557 insertions(+), 356 deletions(-)

diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py
index 8976328cec..35ca457830 100644
--- a/swift/proxy/controllers/base.py
+++ b/swift/proxy/controllers/base.py
@@ -28,7 +28,7 @@ import os
 import time
 import functools
 import inspect
-import itertools
+from sys import exc_info
 from swift import gettext_ as _
 from urllib import quote
 
@@ -46,7 +46,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
     is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
     HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
     HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED
-from swift.common.swob import Request, Response, HeaderKeyDict
+from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
+    HTTPException, HTTPRequestedRangeNotSatisfiable
 
 
 def update_headers(response, headers):
@@ -518,6 +519,286 @@ def _get_object_info(app, env, account, container, obj, swift_source=None):
     return None
 
 
+def close_swift_conn(src):
+    """
+    Force close the http connection to the backend.
+
+    :param src: the response from the backend
+    """
+    try:
+        # Since the backends set "Connection: close" in their response
+        # headers, the response object (src) is solely responsible for the
+        # socket. The connection object (src.swift_conn) has no references
+        # to the socket, so calling its close() method does nothing, and
+        # therefore we don't do it.
+        #
+        # Also, since calling the response's close() method might not
+        # close the underlying socket but only decrement some
+        # reference-counter, we have a special method here that really,
+        # really kills the underlying socket with a close() syscall.
+        src.nuke_from_orbit()  # it's the only way to be sure
+    except Exception:
+        pass
+
+
+class GetOrHeadHandler(object):
+
+    def __init__(self, app, req, server_type, ring, partition, path,
+                 backend_headers):
+        self.app = app
+        self.ring = ring
+        self.server_type = server_type
+        self.partition = partition
+        self.path = path
+        self.backend_headers = backend_headers
+        self.used_nodes = []
+        self.used_source_etag = ''
+
+        # stuff from request
+        self.req_method = req.method
+        self.req_path = req.path
+        self.req_query_string = req.query_string
+        self.newest = config_true_value(req.headers.get('x-newest', 'f'))
+
+        # populated when finding source
+        self.statuses = []
+        self.reasons = []
+        self.bodies = []
+        self.source_headers = []
+
+    def fast_forward(self, num_bytes):
+        """
+        Will skip num_bytes into the current ranges.
+        :params num_bytes: the number of bytes that have already been read on
+                           this request. This will change the Range header
+                           so that the next req will start where it left off.
+        :raises NotImplementedError: if this is a multirange request
+        :raises ValueError: if invalid range header
+        :raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes
+                                                  > end of range
+        """
+        if 'Range' in self.backend_headers:
+            req_range = Range(self.backend_headers['Range'])
+
+            if len(req_range.ranges) > 1:
+                raise NotImplementedError()
+            begin, end = req_range.ranges.pop()
+            if begin is None:
+                # this is a -50 range req (last 50 bytes of file)
+                end -= num_bytes
+            else:
+                begin += num_bytes
+            if end and begin > end:
+                raise HTTPRequestedRangeNotSatisfiable()
+            req_range.ranges = [(begin, end)]
+            self.backend_headers['Range'] = str(req_range)
+        else:
+            self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
+
+    def is_good_source(self, src):
+        """
+        Indicates whether or not the request made to the backend found
+        what it was looking for.
+
+        :param src: the response from the backend
+        :returns: True if found, False if not
+        """
+        if self.server_type == 'Object' and src.status == 416:
+            return True
+        return is_success(src.status) or is_redirection(src.status)
+
+    def _make_app_iter(self, node, source):
+        """
+        Returns an iterator over the contents of the source (via its read
+        func).  There is also quite a bit of cleanup to ensure garbage
+        collection works and the underlying socket of the source is closed.
+
+        :param source: The httplib.Response object this iterator should read
+                       from.
+        :param node: The node the source is reading from, for logging purposes.
+        """
+        try:
+            nchunks = 0
+            bytes_read_from_source = 0
+            while True:
+                try:
+                    with ChunkReadTimeout(self.app.node_timeout):
+                        chunk = source.read(self.app.object_chunk_size)
+                        nchunks += 1
+                        bytes_read_from_source += len(chunk)
+                except ChunkReadTimeout:
+                    exc_type, exc_value, exc_traceback = exc_info()
+                    if self.newest:
+                        raise exc_type, exc_value, exc_traceback
+                    try:
+                        self.fast_forward(bytes_read_from_source)
+                    except (NotImplementedError, HTTPException, ValueError):
+                        raise exc_type, exc_value, exc_traceback
+                    new_source, new_node = self._get_source_and_node()
+                    if new_source:
+                        self.app.exception_occurred(
+                            node, _('Object'),
+                            _('Trying to read during GET (retrying)'))
+                        # Close-out the connection as best as possible.
+                        if getattr(source, 'swift_conn', None):
+                            close_swift_conn(source)
+                        source = new_source
+                        node = new_node
+                        bytes_read_from_source = 0
+                        continue
+                    else:
+                        raise exc_type, exc_value, exc_traceback
+                if not chunk:
+                    break
+                with ChunkWriteTimeout(self.app.client_timeout):
+                    yield chunk
+                # This is for fairness; if the network is outpacing the CPU,
+                # we'll always be able to read and write data without
+                # encountering an EWOULDBLOCK, and so eventlet will not switch
+                # greenthreads on its own. We do it manually so that clients
+                # don't starve.
+                #
+                # The number 5 here was chosen by making stuff up. It's not
+                # every single chunk, but it's not too big either, so it seemed
+                # like it would probably be an okay choice.
+                #
+                # Note that we may trampoline to other greenthreads more often
+                # than once every 5 chunks, depending on how blocking our
+                # network IO is; the explicit sleep here simply provides a
+                # lower bound on the rate of trampolining.
+                if nchunks % 5 == 0:
+                    sleep()
+
+        except ChunkReadTimeout:
+            self.app.exception_occurred(node, _('Object'),
+                                        _('Trying to read during GET'))
+            raise
+        except ChunkWriteTimeout:
+            self.app.logger.warn(
+                _('Client did not read from proxy within %ss') %
+                self.app.client_timeout)
+            self.app.logger.increment('client_timeouts')
+        except GeneratorExit:
+            self.app.logger.warn(_('Client disconnected on read'))
+        except Exception:
+            self.app.logger.exception(_('Trying to send to client'))
+            raise
+        finally:
+            # Close-out the connection as best as possible.
+            if getattr(source, 'swift_conn', None):
+                close_swift_conn(source)
+
+    def _get_source_and_node(self):
+
+        self.statuses = []
+        self.reasons = []
+        self.bodies = []
+        self.source_headers = []
+        sources = []
+
+        for node in self.app.iter_nodes(self.ring, self.partition):
+            if node in self.used_nodes:
+                continue
+            start_node_timing = time.time()
+            try:
+                with ConnectionTimeout(self.app.conn_timeout):
+                    conn = http_connect(
+                        node['ip'], node['port'], node['device'],
+                        self.partition, self.req_method, self.path,
+                        headers=self.backend_headers,
+                        query_string=self.req_query_string)
+                self.app.set_node_timing(node, time.time() - start_node_timing)
+
+                with Timeout(self.app.node_timeout):
+                    possible_source = conn.getresponse()
+                    # See NOTE: swift_conn at top of file about this.
+                    possible_source.swift_conn = conn
+            except (Exception, Timeout):
+                self.app.exception_occurred(
+                    node, self.server_type,
+                    _('Trying to %(method)s %(path)s') %
+                    {'method': self.req_method, 'path': self.req_path})
+                continue
+            if self.is_good_source(possible_source):
+                # 404 if we know we don't have a synced copy
+                if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
+                    self.statuses.append(HTTP_NOT_FOUND)
+                    self.reasons.append('')
+                    self.bodies.append('')
+                    self.source_headers.append('')
+                    close_swift_conn(possible_source)
+                else:
+                    if self.used_source_etag:
+                        src_headers = dict(
+                            (k.lower(), v) for k, v in
+                            possible_source.getheaders())
+                        if src_headers.get('etag', '').strip('"') != \
+                                self.used_source_etag:
+                            self.statuses.append(HTTP_NOT_FOUND)
+                            self.reasons.append('')
+                            self.bodies.append('')
+                            self.source_headers.append('')
+                            continue
+
+                    self.statuses.append(possible_source.status)
+                    self.reasons.append(possible_source.reason)
+                    self.bodies.append('')
+                    self.source_headers.append('')
+                    sources.append((possible_source, node))
+                    if not self.newest:  # one good source is enough
+                        break
+            else:
+                self.statuses.append(possible_source.status)
+                self.reasons.append(possible_source.reason)
+                self.bodies.append(possible_source.read())
+                self.source_headers.append(possible_source.getheaders())
+                if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
+                    self.app.error_limit(node, _('ERROR Insufficient Storage'))
+                elif is_server_error(possible_source.status):
+                    self.app.error_occurred(
+                        node, _('ERROR %(status)d %(body)s '
+                                'From %(type)s Server') %
+                        {'status': possible_source.status,
+                         'body': self.bodies[-1][:1024],
+                         'type': self.server_type})
+
+        if sources:
+            sources.sort(key=lambda s: source_key(s[0]))
+            source, node = sources.pop()
+            for src, _junk in sources:
+                close_swift_conn(src)
+            self.used_nodes.append(node)
+            src_headers = dict(
+                (k.lower(), v) for k, v in
+                possible_source.getheaders())
+            self.used_source_etag = src_headers.get('etag', '').strip('"')
+            return source, node
+        return None, None
+
+    def get_working_response(self, req):
+        source, node = self._get_source_and_node()
+        res = None
+        if source:
+            res = Response(request=req)
+            if req.method == 'GET' and \
+                    source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
+                res.app_iter = self._make_app_iter(node, source)
+                # See NOTE: swift_conn at top of file about this.
+                res.swift_conn = source.swift_conn
+            res.status = source.status
+            update_headers(res, source.getheaders())
+            if not res.environ:
+                res.environ = {}
+            res.environ['swift_x_timestamp'] = \
+                source.getheader('x-timestamp')
+            res.accept_ranges = 'bytes'
+            res.content_length = source.getheader('Content-Length')
+            if source.getheader('Content-Type'):
+                res.charset = None
+                res.content_type = source.getheader('Content-Type')
+        return res
+
+
 class Controller(object):
     """Base WSGI controller class for the proxy"""
     server_type = 'Base'
@@ -602,71 +883,6 @@ class Controller(object):
         headers['referer'] = referer
         return headers
 
-    def error_occurred(self, node, msg):
-        """
-        Handle logging, and handling of errors.
-
-        :param node: dictionary of node to handle errors for
-        :param msg: error message
-        """
-        node['errors'] = node.get('errors', 0) + 1
-        node['last_error'] = time.time()
-        self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
-                              {'msg': msg, 'ip': node['ip'],
-                              'port': node['port'], 'device': node['device']})
-
-    def exception_occurred(self, node, typ, additional_info):
-        """
-        Handle logging of generic exceptions.
-
-        :param node: dictionary of node to log the error for
-        :param typ: server type
-        :param additional_info: additional information to log
-        """
-        self.app.logger.exception(
-            _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
-              '%(info)s'),
-            {'type': typ, 'ip': node['ip'], 'port': node['port'],
-             'device': node['device'], 'info': additional_info})
-
-    def error_limited(self, node):
-        """
-        Check if the node is currently error limited.
-
-        :param node: dictionary of node to check
-        :returns: True if error limited, False otherwise
-        """
-        now = time.time()
-        if 'errors' not in node:
-            return False
-        if 'last_error' in node and node['last_error'] < \
-                now - self.app.error_suppression_interval:
-            del node['last_error']
-            if 'errors' in node:
-                del node['errors']
-            return False
-        limited = node['errors'] > self.app.error_suppression_limit
-        if limited:
-            self.app.logger.debug(
-                _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
-        return limited
-
-    def error_limit(self, node, msg):
-        """
-        Mark a node as error limited. This immediately pretends the
-        node received enough errors to trigger error suppression. Use
-        this for errors like Insufficient Storage. For other errors
-        use :func:`error_occurred`.
-
-        :param node: dictionary of node to error limit
-        :param msg: error message
-        """
-        node['errors'] = self.app.error_suppression_limit + 1
-        node['last_error'] = time.time()
-        self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
-                              {'msg': msg, 'ip': node['ip'],
-                              'port': node['port'], 'device': node['device']})
-
     def account_info(self, account, req=None):
         """
         Get account information, and also verify that the account exists.
@@ -719,62 +935,6 @@ class Controller(object):
             info['nodes'] = nodes
         return info
 
-    def iter_nodes(self, ring, partition, node_iter=None):
-        """
-        Yields nodes for a ring partition, skipping over error
-        limited nodes and stopping at the configurable number of
-        nodes. If a node yielded subsequently gets error limited, an
-        extra node will be yielded to take its place.
-
-        Note that if you're going to iterate over this concurrently from
-        multiple greenthreads, you'll want to use a
-        swift.common.utils.GreenthreadSafeIterator to serialize access.
-        Otherwise, you may get ValueErrors from concurrent access. (You also
-        may not, depending on how logging is configured, the vagaries of
-        socket IO and eventlet, and the phase of the moon.)
-
-        :param ring: ring to get yield nodes from
-        :param partition: ring partition to yield nodes for
-        :param node_iter: optional iterable of nodes to try. Useful if you
-            want to filter or reorder the nodes.
-        """
-        part_nodes = ring.get_part_nodes(partition)
-        if node_iter is None:
-            node_iter = itertools.chain(part_nodes,
-                                        ring.get_more_nodes(partition))
-        num_primary_nodes = len(part_nodes)
-
-        # Use of list() here forcibly yanks the first N nodes (the primary
-        # nodes) from node_iter, so the rest of its values are handoffs.
-        primary_nodes = self.app.sort_nodes(
-            list(itertools.islice(node_iter, num_primary_nodes)))
-        handoff_nodes = node_iter
-        nodes_left = self.app.request_node_count(ring)
-
-        for node in primary_nodes:
-            if not self.error_limited(node):
-                yield node
-                if not self.error_limited(node):
-                    nodes_left -= 1
-                    if nodes_left <= 0:
-                        return
-
-        handoffs = 0
-        for node in handoff_nodes:
-            if not self.error_limited(node):
-                handoffs += 1
-                if self.app.log_handoffs:
-                    self.app.logger.increment('handoff_count')
-                    self.app.logger.warning(
-                        'Handoff requested (%d)' % handoffs)
-                    if handoffs == len(primary_nodes):
-                        self.app.logger.increment('handoff_all_count')
-                yield node
-                if not self.error_limited(node):
-                    nodes_left -= 1
-                    if nodes_left <= 0:
-                        return
-
     def _make_request(self, nodes, part, method, path, headers, query,
                       logger_thread_locals):
         """
@@ -812,11 +972,13 @@ class Controller(object):
                         return resp.status, resp.reason, resp.getheaders(), \
                             resp.read()
                     elif resp.status == HTTP_INSUFFICIENT_STORAGE:
-                        self.error_limit(node, _('ERROR Insufficient Storage'))
+                        self.app.error_limit(node,
+                                             _('ERROR Insufficient Storage'))
             except (Exception, Timeout):
-                self.exception_occurred(node, self.server_type,
-                                        _('Trying to %(method)s %(path)s') %
-                                        {'method': method, 'path': path})
+                self.app.exception_occurred(
+                    node, self.server_type,
+                    _('Trying to %(method)s %(path)s') %
+                    {'method': method, 'path': path})
 
     def make_requests(self, req, ring, part, method, path, headers,
                       query_string=''):
@@ -836,7 +998,7 @@ class Controller(object):
         :returns: a swob.Response object
         """
         start_nodes = ring.get_part_nodes(part)
-        nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part))
+        nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
         pile = GreenAsyncPile(len(start_nodes))
         for head in headers:
             pile.spawn(self._make_request, nodes, part, method, path,
@@ -929,92 +1091,6 @@ class Controller(object):
         """
         return self.GETorHEAD(req)
 
-    def _make_app_iter(self, node, source):
-        """
-        Returns an iterator over the contents of the source (via its read
-        func).  There is also quite a bit of cleanup to ensure garbage
-        collection works and the underlying socket of the source is closed.
-
-        :param source: The bufferedhttp.Response object this iterator should
-                       read from.
-        :param node: The node the source is reading from, for logging purposes.
-        """
-        try:
-            nchunks = 0
-            while True:
-                with ChunkReadTimeout(self.app.node_timeout):
-                    chunk = source.read(self.app.object_chunk_size)
-                    nchunks += 1
-                if not chunk:
-                    break
-                with ChunkWriteTimeout(self.app.client_timeout):
-                    yield chunk
-                # This is for fairness; if the network is outpacing the CPU,
-                # we'll always be able to read and write data without
-                # encountering an EWOULDBLOCK, and so eventlet will not switch
-                # greenthreads on its own. We do it manually so that clients
-                # don't starve.
-                #
-                # The number 5 here was chosen by making stuff up. It's not
-                # every single chunk, but it's not too big either, so it seemed
-                # like it would probably be an okay choice.
-                #
-                # Note that we may trampoline to other greenthreads more often
-                # than once every 5 chunks, depending on how blocking our
-                # network IO is; the explicit sleep here simply provides a
-                # lower bound on the rate of trampolining.
-                if nchunks % 5 == 0:
-                    sleep()
-        except ChunkReadTimeout:
-            self.exception_occurred(node, _('Object'),
-                                    _('Trying to read during GET'))
-            raise
-        except ChunkWriteTimeout:
-            self.app.logger.warn(
-                _('Client did not read from proxy within %ss') %
-                self.app.client_timeout)
-            self.app.logger.increment('client_timeouts')
-        except GeneratorExit:
-            self.app.logger.warn(_('Client disconnected on read'))
-        except Exception:
-            self.app.logger.exception(_('Trying to send to client'))
-            raise
-        finally:
-            # Close-out the connection as best as possible.
-            if getattr(source, 'swift_conn', None):
-                self.close_swift_conn(source)
-
-    def close_swift_conn(self, src):
-        """
-        Force close the http connection to the backend.
-
-        :param src: the response from the backend
-        """
-        try:
-            # Since the backends set "Connection: close" in their response
-            # headers, the response object (src) is solely responsible for the
-            # socket. The connection object (src.swift_conn) has no references
-            # to the socket, so calling its close() method does nothing, and
-            # therefore we don't do it.
-            #
-            # Also, since calling the response's close() method might not
-            # close the underlying socket but only decrement some
-            # reference-counter, we have a special method here that really,
-            # really kills the underlying socket with a close() syscall.
-            src.nuke_from_orbit()  # it's the only way to be sure
-        except Exception:
-            pass
-
-    def is_good_source(self, src):
-        """
-        Indicates whether or not the request made to the backend found
-        what it was looking for.
-
-        :param src: the response from the backend
-        :returns: True if found, False if not
-        """
-        return is_success(src.status) or is_redirection(src.status)
-
     def autocreate_account(self, env, account):
         """
         Autocreate an account
@@ -1047,87 +1123,18 @@ class Controller(object):
         :param path: path for the request
         :returns: swob.Response object
         """
-        statuses = []
-        reasons = []
-        bodies = []
-        source_headers = []
-        sources = []
-        newest = config_true_value(req.headers.get('x-newest', 'f'))
-        headers = self.generate_request_headers(req, additional=req.headers)
-        for node in self.iter_nodes(ring, partition):
-            start_node_timing = time.time()
-            try:
-                with ConnectionTimeout(self.app.conn_timeout):
-                    conn = http_connect(
-                        node['ip'], node['port'], node['device'], partition,
-                        req.method, path, headers=headers,
-                        query_string=req.query_string)
-                self.app.set_node_timing(node, time.time() - start_node_timing)
-                with Timeout(self.app.node_timeout):
-                    possible_source = conn.getresponse()
-                    # See NOTE: swift_conn at top of file about this.
-                    possible_source.swift_conn = conn
-            except (Exception, Timeout):
-                self.exception_occurred(
-                    node, server_type, _('Trying to %(method)s %(path)s') %
-                    {'method': req.method, 'path': req.path})
-                continue
-            if self.is_good_source(possible_source):
-                # 404 if we know we don't have a synced copy
-                if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
-                    statuses.append(HTTP_NOT_FOUND)
-                    reasons.append('')
-                    bodies.append('')
-                    source_headers.append('')
-                    self.close_swift_conn(possible_source)
-                else:
-                    statuses.append(possible_source.status)
-                    reasons.append(possible_source.reason)
-                    bodies.append('')
-                    source_headers.append('')
-                    sources.append((possible_source, node))
-                    if not newest:  # one good source is enough
-                        break
-            else:
-                statuses.append(possible_source.status)
-                reasons.append(possible_source.reason)
-                bodies.append(possible_source.read())
-                source_headers.append(possible_source.getheaders())
-                if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
-                    self.error_limit(node, _('ERROR Insufficient Storage'))
-                elif is_server_error(possible_source.status):
-                    self.error_occurred(node, _('ERROR %(status)d %(body)s '
-                                                'From %(type)s Server') %
-                                        {'status': possible_source.status,
-                                         'body': bodies[-1][:1024],
-                                         'type': server_type})
-        res = None
-        if sources:
-            sources.sort(key=lambda s: source_key(s[0]))
-            source, node = sources.pop()
-            for src, _junk in sources:
-                self.close_swift_conn(src)
-            res = Response(request=req)
-            if req.method == 'GET' and \
-                    source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
-                res.app_iter = self._make_app_iter(node, source)
-                # See NOTE: swift_conn at top of file about this.
-                res.swift_conn = source.swift_conn
-            res.status = source.status
-            update_headers(res, source.getheaders())
-            if not res.environ:
-                res.environ = {}
-            res.environ['swift_x_timestamp'] = \
-                source.getheader('x-timestamp')
-            res.accept_ranges = 'bytes'
-            res.content_length = source.getheader('Content-Length')
-            if source.getheader('Content-Type'):
-                res.charset = None
-                res.content_type = source.getheader('Content-Type')
+        backend_headers = self.generate_request_headers(
+            req, additional=req.headers)
+
+        handler = GetOrHeadHandler(self.app, req, server_type, ring,
+                                   partition, path, backend_headers)
+        res = handler.get_working_response(req)
+
         if not res:
-            res = self.best_response(req, statuses, reasons, bodies,
-                                     '%s %s' % (server_type, req.method),
-                                     headers=source_headers)
+            res = self.best_response(
+                req, handler.statuses, handler.reasons, handler.bodies,
+                '%s %s' % (server_type, req.method),
+                headers=handler.source_headers)
         try:
             (account, container) = split_path(req.path_info, 1, 2)
             _set_info_cache(self.app, req.environ, account, container, res)
diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py
index 5d6f4a0cb5..da690b4680 100644
--- a/swift/proxy/controllers/obj.py
+++ b/swift/proxy/controllers/obj.py
@@ -504,7 +504,7 @@ class ObjectController(Controller):
         is_local = self.app.write_affinity_is_local_fn
 
         if is_local is None:
-            return self.iter_nodes(ring, partition)
+            return self.app.iter_nodes(ring, partition)
 
         all_nodes = itertools.chain(primary_nodes,
                                     ring.get_more_nodes(partition))
@@ -519,20 +519,9 @@ class ObjectController(Controller):
             itertools.ifilter(lambda node: node not in first_n_local_nodes,
                               all_nodes))
 
-        return self.iter_nodes(
+        return self.app.iter_nodes(
             ring, partition, node_iter=local_first_node_iter)
 
-    def is_good_source(self, src):
-        """
-        Indicates whether or not the request made to the backend found
-        what it was looking for.
-
-        In the case of an object, a 416 indicates that we found a
-        backend with the object.
-        """
-        return src.status == 416 or \
-            super(ObjectController, self).is_good_source(src)
-
     def GETorHEAD(self, req):
         """Handle HTTP GET or HEAD requests."""
         container_info = self.container_info(
@@ -800,8 +789,9 @@ class ObjectController(Controller):
                         conn.send(chunk)
                 except (Exception, ChunkWriteTimeout):
                     conn.failed = True
-                    self.exception_occurred(conn.node, _('Object'),
-                                            _('Trying to write to %s') % path)
+                    self.app.exception_occurred(
+                        conn.node, _('Object'),
+                        _('Trying to write to %s') % path)
             conn.queue.task_done()
 
     def _connect_put_node(self, nodes, part, path, headers,
@@ -827,10 +817,11 @@ class ObjectController(Controller):
                     conn.node = node
                     return conn
                 elif resp.status == HTTP_INSUFFICIENT_STORAGE:
-                    self.error_limit(node, _('ERROR Insufficient Storage'))
+                    self.app.error_limit(node, _('ERROR Insufficient Storage'))
             except (Exception, Timeout):
-                self.exception_occurred(node, _('Object'),
-                                        _('Expect: 100-continue on %s') % path)
+                self.app.exception_occurred(
+                    node, _('Object'),
+                    _('Expect: 100-continue on %s') % path)
 
     def _get_put_responses(self, req, conns, nodes):
         statuses = []
@@ -846,7 +837,7 @@ class ObjectController(Controller):
                     else:
                         return conn.getresponse()
             except (Exception, Timeout):
-                self.exception_occurred(
+                self.app.exception_occurred(
                     conn.node, _('Object'),
                     _('Trying to get final status of PUT to %s') % req.path)
         pile = GreenAsyncPile(len(conns))
@@ -858,7 +849,7 @@ class ObjectController(Controller):
                 reasons.append(response.reason)
                 bodies.append(response.read())
                 if response.status >= HTTP_INTERNAL_SERVER_ERROR:
-                    self.error_occurred(
+                    self.app.error_occurred(
                         conn.node,
                         _('ERROR %(status)d %(body)s From Object Server '
                           're: %(path)s') %
diff --git a/swift/proxy/server.py b/swift/proxy/server.py
index 691e2d005d..569fc653c8 100644
--- a/swift/proxy/server.py
+++ b/swift/proxy/server.py
@@ -19,6 +19,7 @@ import socket
 from swift import gettext_ as _
 from random import shuffle
 from time import time
+import itertools
 
 from eventlet import Timeout
 
@@ -332,6 +333,126 @@ class Application(object):
         timing = round(timing, 3)  # sort timings to the millisecond
         self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
 
+    def error_limited(self, node):
+        """
+        Check if the node is currently error limited.
+
+        :param node: dictionary of node to check
+        :returns: True if error limited, False otherwise
+        """
+        now = time()
+        if 'errors' not in node:
+            return False
+        if 'last_error' in node and node['last_error'] < \
+                now - self.error_suppression_interval:
+            del node['last_error']
+            if 'errors' in node:
+                del node['errors']
+            return False
+        limited = node['errors'] > self.error_suppression_limit
+        if limited:
+            self.logger.debug(
+                _('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
+        return limited
+
+    def error_limit(self, node, msg):
+        """
+        Mark a node as error limited. This immediately pretends the
+        node received enough errors to trigger error suppression. Use
+        this for errors like Insufficient Storage. For other errors
+        use :func:`error_occurred`.
+
+        :param node: dictionary of node to error limit
+        :param msg: error message
+        """
+        node['errors'] = self.error_suppression_limit + 1
+        node['last_error'] = time()
+        self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
+                          {'msg': msg, 'ip': node['ip'],
+                          'port': node['port'], 'device': node['device']})
+
+    def error_occurred(self, node, msg):
+        """
+        Handle logging, and handling of errors.
+
+        :param node: dictionary of node to handle errors for
+        :param msg: error message
+        """
+        node['errors'] = node.get('errors', 0) + 1
+        node['last_error'] = time()
+        self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
+                          {'msg': msg, 'ip': node['ip'],
+                          'port': node['port'], 'device': node['device']})
+
+    def iter_nodes(self, ring, partition, node_iter=None):
+        """
+        Yields nodes for a ring partition, skipping over error
+        limited nodes and stopping at the configurable number of
+        nodes. If a node yielded subsequently gets error limited, an
+        extra node will be yielded to take its place.
+
+        Note that if you're going to iterate over this concurrently from
+        multiple greenthreads, you'll want to use a
+        swift.common.utils.GreenthreadSafeIterator to serialize access.
+        Otherwise, you may get ValueErrors from concurrent access. (You also
+        may not, depending on how logging is configured, the vagaries of
+        socket IO and eventlet, and the phase of the moon.)
+
+        :param ring: ring to get yield nodes from
+        :param partition: ring partition to yield nodes for
+        :param node_iter: optional iterable of nodes to try. Useful if you
+            want to filter or reorder the nodes.
+        """
+        part_nodes = ring.get_part_nodes(partition)
+        if node_iter is None:
+            node_iter = itertools.chain(part_nodes,
+                                        ring.get_more_nodes(partition))
+        num_primary_nodes = len(part_nodes)
+
+        # Use of list() here forcibly yanks the first N nodes (the primary
+        # nodes) from node_iter, so the rest of its values are handoffs.
+        primary_nodes = self.sort_nodes(
+            list(itertools.islice(node_iter, num_primary_nodes)))
+        handoff_nodes = node_iter
+        nodes_left = self.request_node_count(ring)
+
+        for node in primary_nodes:
+            if not self.error_limited(node):
+                yield node
+                if not self.error_limited(node):
+                    nodes_left -= 1
+                    if nodes_left <= 0:
+                        return
+        handoffs = 0
+        for node in handoff_nodes:
+            if not self.error_limited(node):
+                handoffs += 1
+                if self.log_handoffs:
+                    self.logger.increment('handoff_count')
+                    self.logger.warning(
+                        'Handoff requested (%d)' % handoffs)
+                    if handoffs == len(primary_nodes):
+                        self.logger.increment('handoff_all_count')
+                yield node
+                if not self.error_limited(node):
+                    nodes_left -= 1
+                    if nodes_left <= 0:
+                        return
+
+    def exception_occurred(self, node, typ, additional_info):
+        """
+        Handle logging of generic exceptions.
+
+        :param node: dictionary of node to log the error for
+        :param typ: server type
+        :param additional_info: additional information to log
+        """
+        self.logger.exception(
+            _('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
+              '%(info)s'),
+            {'type': typ, 'ip': node['ip'], 'port': node['port'],
+             'device': node['device'], 'info': additional_info})
+
 
 def app_factory(global_conf, **local_conf):
     """paste.deploy app factory for creating WSGI proxy apps."""
diff --git a/test/unit/__init__.py b/test/unit/__init__.py
index 0d07493b04..8ad04cbc5e 100644
--- a/test/unit/__init__.py
+++ b/test/unit/__init__.py
@@ -426,6 +426,8 @@ def fake_http_connect(*code_iter, **kwargs):
             self.body = body
             self.headers = headers or {}
             self.timestamp = timestamp
+            if kwargs.get('slow') and isinstance(kwargs['slow'], list):
+                kwargs['slow'][0] -= 1
 
         def getresponse(self):
             if kwargs.get('raise_exc'):
@@ -469,13 +471,18 @@ def fake_http_connect(*code_iter, **kwargs):
                     headers['x-container-timestamp'] = '1'
             except StopIteration:
                 pass
-            if 'slow' in kwargs:
+            if self.am_slow():
                 headers['content-length'] = '4'
             headers.update(self.headers)
             return headers.items()
 
+        def am_slow(self):
+            if kwargs.get('slow') and isinstance(kwargs['slow'], list):
+                return kwargs['slow'][0] >= 0
+            return bool(kwargs.get('slow'))
+
         def read(self, amt=None):
-            if 'slow' in kwargs:
+            if self.am_slow():
                 if self.sent < 4:
                     self.sent += 1
                     sleep(0.1)
@@ -485,7 +492,7 @@ def fake_http_connect(*code_iter, **kwargs):
             return rv
 
         def send(self, amt=None):
-            if 'slow' in kwargs:
+            if self.am_slow():
                 if self.received < 4:
                     self.received += 1
                     sleep(0.1)
diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py
index 1fe8880dd1..403bf3d40e 100644
--- a/test/unit/proxy/controllers/test_base.py
+++ b/test/unit/proxy/controllers/test_base.py
@@ -18,8 +18,9 @@ from mock import patch
 from swift.proxy.controllers.base import headers_to_container_info, \
     headers_to_account_info, headers_to_object_info, get_container_info, \
     get_container_memcache_key, get_account_info, get_account_memcache_key, \
-    get_object_env_key, _get_cache_key, get_info, get_object_info, Controller
-from swift.common.swob import Request
+    get_object_env_key, _get_cache_key, get_info, get_object_info, \
+    Controller, GetOrHeadHandler
+from swift.common.swob import Request, HTTPException
 from swift.common.utils import split_path
 from test.unit import fake_http_connect, FakeRing, FakeMemcache
 from swift.proxy import server as proxy_server
@@ -446,3 +447,26 @@ class TestFuncs(unittest.TestCase):
         self.assertEqual(base.have_quorum([201, 201], 2), True)
         self.assertEqual(base.have_quorum([404, 404], 2), True)
         self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True)
+
+    def test_range_fast_forward(self):
+        req = Request.blank('/')
+        handler = GetOrHeadHandler(None, req, None, None, None, None, {})
+        handler.fast_forward(50)
+        self.assertEquals(handler.backend_headers['Range'], 'bytes=50-')
+
+        handler = GetOrHeadHandler(None, req, None, None, None, None,
+                                   {'Range': 'bytes=23-50'})
+        handler.fast_forward(20)
+        self.assertEquals(handler.backend_headers['Range'], 'bytes=43-50')
+        self.assertRaises(HTTPException,
+                          handler.fast_forward, 80)
+
+        handler = GetOrHeadHandler(None, req, None, None, None, None,
+                                   {'Range': 'bytes=23-'})
+        handler.fast_forward(20)
+        self.assertEquals(handler.backend_headers['Range'], 'bytes=43-')
+
+        handler = GetOrHeadHandler(None, req, None, None, None, None,
+                                   {'Range': 'bytes=-100'})
+        handler.fast_forward(20)
+        self.assertEquals(handler.backend_headers['Range'], 'bytes=-80')
diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py
index 72a813cf92..010934e100 100644
--- a/test/unit/proxy/test_server.py
+++ b/test/unit/proxy/test_server.py
@@ -810,7 +810,7 @@ class TestObjectController(unittest.TestCase):
 
             controller = \
                 proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
-            controller.error_limit(
+            self.app.error_limit(
                 self.app.object_ring.get_part_nodes(1)[0], 'test')
             set_http_connect(200, 200,        # account, container
                              201, 201, 201,   # 3 working backends
@@ -2267,6 +2267,73 @@ class TestObjectController(unittest.TestCase):
                 got_exc = True
             self.assert_(got_exc)
 
+    def test_node_read_timeout_retry(self):
+        with save_globals():
+            self.app.account_ring.get_nodes('account')
+            for dev in self.app.account_ring.devs.values():
+                dev['ip'] = '127.0.0.1'
+                dev['port'] = 1
+            self.app.container_ring.get_nodes('account')
+            for dev in self.app.container_ring.devs.values():
+                dev['ip'] = '127.0.0.1'
+                dev['port'] = 1
+            self.app.object_ring.get_nodes('account')
+            for dev in self.app.object_ring.devs.values():
+                dev['ip'] = '127.0.0.1'
+                dev['port'] = 1
+            req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
+            self.app.update_request(req)
+
+            self.app.node_timeout = 0.1
+            set_http_connect(200, 200, 200, slow=[3])
+            resp = req.get_response(self.app)
+            got_exc = False
+            try:
+                resp.body
+            except ChunkReadTimeout:
+                got_exc = True
+            self.assert_(got_exc)
+
+            set_http_connect(200, 200, 200, body='lalala', slow=[2])
+            resp = req.get_response(self.app)
+            got_exc = False
+            try:
+                self.assertEquals(resp.body, 'lalala')
+            except ChunkReadTimeout:
+                got_exc = True
+            self.assert_(not got_exc)
+
+            set_http_connect(200, 200, 200, body='lalala', slow=[2],
+                             etags=['a', 'a', 'a'])
+            resp = req.get_response(self.app)
+            got_exc = False
+            try:
+                self.assertEquals(resp.body, 'lalala')
+            except ChunkReadTimeout:
+                got_exc = True
+            self.assert_(not got_exc)
+
+            set_http_connect(200, 200, 200, body='lalala', slow=[2],
+                             etags=['a', 'b', 'a'])
+            resp = req.get_response(self.app)
+            got_exc = False
+            try:
+                self.assertEquals(resp.body, 'lalala')
+            except ChunkReadTimeout:
+                got_exc = True
+            self.assert_(not got_exc)
+
+            req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
+            set_http_connect(200, 200, 200, body='lalala', slow=[2],
+                             etags=['a', 'b', 'b'])
+            resp = req.get_response(self.app)
+            got_exc = False
+            try:
+                resp.body
+            except ChunkReadTimeout:
+                got_exc = True
+            self.assert_(got_exc)
+
     def test_node_write_timeout(self):
         with save_globals():
             self.app.account_ring.get_nodes('account')
@@ -2305,44 +2372,35 @@ class TestObjectController(unittest.TestCase):
         with save_globals():
             try:
                 self.app.object_ring.max_more_nodes = 2
-                controller = proxy_server.ObjectController(self.app, 'account',
-                                                           'container',
-                                                           'object')
                 partition, nodes = self.app.object_ring.get_nodes('account',
                                                                   'container',
                                                                   'object')
                 collected_nodes = []
-                for node in controller.iter_nodes(self.app.object_ring,
-                                                  partition):
+                for node in self.app.iter_nodes(self.app.object_ring,
+                                                partition):
                     collected_nodes.append(node)
                 self.assertEquals(len(collected_nodes), 5)
 
                 self.app.object_ring.max_more_nodes = 20
                 self.app.request_node_count = lambda r: 20
-                controller = proxy_server.ObjectController(self.app, 'account',
-                                                           'container',
-                                                           'object')
                 partition, nodes = self.app.object_ring.get_nodes('account',
                                                                   'container',
                                                                   'object')
                 collected_nodes = []
-                for node in controller.iter_nodes(self.app.object_ring,
-                                                  partition):
+                for node in self.app.iter_nodes(self.app.object_ring,
+                                                partition):
                     collected_nodes.append(node)
                 self.assertEquals(len(collected_nodes), 9)
 
                 self.app.log_handoffs = True
                 self.app.logger = FakeLogger()
                 self.app.object_ring.max_more_nodes = 2
-                controller = proxy_server.ObjectController(self.app, 'account',
-                                                           'container',
-                                                           'object')
                 partition, nodes = self.app.object_ring.get_nodes('account',
                                                                   'container',
                                                                   'object')
                 collected_nodes = []
-                for node in controller.iter_nodes(self.app.object_ring,
-                                                  partition):
+                for node in self.app.iter_nodes(self.app.object_ring,
+                                                partition):
                     collected_nodes.append(node)
                 self.assertEquals(len(collected_nodes), 5)
                 self.assertEquals(
@@ -2353,15 +2411,12 @@ class TestObjectController(unittest.TestCase):
                 self.app.log_handoffs = False
                 self.app.logger = FakeLogger()
                 self.app.object_ring.max_more_nodes = 2
-                controller = proxy_server.ObjectController(self.app, 'account',
-                                                           'container',
-                                                           'object')
                 partition, nodes = self.app.object_ring.get_nodes('account',
                                                                   'container',
                                                                   'object')
                 collected_nodes = []
-                for node in controller.iter_nodes(self.app.object_ring,
-                                                  partition):
+                for node in self.app.iter_nodes(self.app.object_ring,
+                                                partition):
                     collected_nodes.append(node)
                 self.assertEquals(len(collected_nodes), 5)
                 self.assertEquals(self.app.logger.log_dict['warning'], [])
@@ -2370,21 +2425,19 @@ class TestObjectController(unittest.TestCase):
 
     def test_iter_nodes_calls_sort_nodes(self):
         with mock.patch.object(self.app, 'sort_nodes') as sort_nodes:
-            controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
-            for node in controller.iter_nodes(self.app.object_ring, 0):
+            for node in self.app.iter_nodes(self.app.object_ring, 0):
                 pass
             sort_nodes.assert_called_once_with(
                 self.app.object_ring.get_part_nodes(0))
 
     def test_iter_nodes_skips_error_limited(self):
         with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
-            controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
-            first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
-            second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
+            first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
+            second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
             self.assertTrue(first_nodes[0] in second_nodes)
 
-            controller.error_limit(first_nodes[0], 'test')
-            second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
+            self.app.error_limit(first_nodes[0], 'test')
+            second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
             self.assertTrue(first_nodes[0] not in second_nodes)
 
     def test_iter_nodes_gives_extra_if_error_limited_inline(self):
@@ -2393,33 +2446,31 @@ class TestObjectController(unittest.TestCase):
                 mock.patch.object(self.app, 'request_node_count',
                                   lambda r: 6),
                 mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)):
-            controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
-            first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
+            first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
             second_nodes = []
-            for node in controller.iter_nodes(self.app.object_ring, 0):
+            for node in self.app.iter_nodes(self.app.object_ring, 0):
                 if not second_nodes:
-                    controller.error_limit(node, 'test')
+                    self.app.error_limit(node, 'test')
                 second_nodes.append(node)
             self.assertEquals(len(first_nodes), 6)
             self.assertEquals(len(second_nodes), 7)
 
     def test_iter_nodes_with_custom_node_iter(self):
-        controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
         node_list = [dict(id=n) for n in xrange(10)]
         with nested(
                 mock.patch.object(self.app, 'sort_nodes', lambda n: n),
                 mock.patch.object(self.app, 'request_node_count',
                                   lambda r: 3)):
-            got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
-                                                   node_iter=iter(node_list)))
+            got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0,
+                                                 node_iter=iter(node_list)))
         self.assertEqual(node_list[:3], got_nodes)
 
         with nested(
                 mock.patch.object(self.app, 'sort_nodes', lambda n: n),
                 mock.patch.object(self.app, 'request_node_count',
                                   lambda r: 1000000)):
-            got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
-                                                   node_iter=iter(node_list)))
+            got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0,
+                                                 node_iter=iter(node_list)))
         self.assertEqual(node_list, got_nodes)
 
     def test_best_response_sets_headers(self):