Get retry.

If a source times out on read try another one of them with a
modified range.  There had to be a lot of moved around code
to get this working but it should all make sense.

Change-Id: Ieaf045690a8823927a6f38098a95b37a4d4adb70
This commit is contained in:
David Goetz 2013-11-04 17:06:06 +00:00
parent c850580566
commit f5648638ee
6 changed files with 557 additions and 356 deletions

View File

@ -28,7 +28,7 @@ import os
import time
import functools
import inspect
import itertools
from sys import exc_info
from swift import gettext_ as _
from urllib import quote
@ -46,7 +46,8 @@ from swift.common.http import is_informational, is_success, is_redirection, \
is_server_error, HTTP_OK, HTTP_PARTIAL_CONTENT, HTTP_MULTIPLE_CHOICES, \
HTTP_BAD_REQUEST, HTTP_NOT_FOUND, HTTP_SERVICE_UNAVAILABLE, \
HTTP_INSUFFICIENT_STORAGE, HTTP_UNAUTHORIZED
from swift.common.swob import Request, Response, HeaderKeyDict
from swift.common.swob import Request, Response, HeaderKeyDict, Range, \
HTTPException, HTTPRequestedRangeNotSatisfiable
def update_headers(response, headers):
@ -518,6 +519,286 @@ def _get_object_info(app, env, account, container, obj, swift_source=None):
return None
def close_swift_conn(src):
"""
Force close the http connection to the backend.
:param src: the response from the backend
"""
try:
# Since the backends set "Connection: close" in their response
# headers, the response object (src) is solely responsible for the
# socket. The connection object (src.swift_conn) has no references
# to the socket, so calling its close() method does nothing, and
# therefore we don't do it.
#
# Also, since calling the response's close() method might not
# close the underlying socket but only decrement some
# reference-counter, we have a special method here that really,
# really kills the underlying socket with a close() syscall.
src.nuke_from_orbit() # it's the only way to be sure
except Exception:
pass
class GetOrHeadHandler(object):
def __init__(self, app, req, server_type, ring, partition, path,
backend_headers):
self.app = app
self.ring = ring
self.server_type = server_type
self.partition = partition
self.path = path
self.backend_headers = backend_headers
self.used_nodes = []
self.used_source_etag = ''
# stuff from request
self.req_method = req.method
self.req_path = req.path
self.req_query_string = req.query_string
self.newest = config_true_value(req.headers.get('x-newest', 'f'))
# populated when finding source
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
def fast_forward(self, num_bytes):
"""
Will skip num_bytes into the current ranges.
:params num_bytes: the number of bytes that have already been read on
this request. This will change the Range header
so that the next req will start where it left off.
:raises NotImplementedError: if this is a multirange request
:raises ValueError: if invalid range header
:raises HTTPRequestedRangeNotSatisfiable: if begin + num_bytes
> end of range
"""
if 'Range' in self.backend_headers:
req_range = Range(self.backend_headers['Range'])
if len(req_range.ranges) > 1:
raise NotImplementedError()
begin, end = req_range.ranges.pop()
if begin is None:
# this is a -50 range req (last 50 bytes of file)
end -= num_bytes
else:
begin += num_bytes
if end and begin > end:
raise HTTPRequestedRangeNotSatisfiable()
req_range.ranges = [(begin, end)]
self.backend_headers['Range'] = str(req_range)
else:
self.backend_headers['Range'] = 'bytes=%d-' % num_bytes
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param src: the response from the backend
:returns: True if found, False if not
"""
if self.server_type == 'Object' and src.status == 416:
return True
return is_success(src.status) or is_redirection(src.status)
def _make_app_iter(self, node, source):
"""
Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed.
:param source: The httplib.Response object this iterator should read
from.
:param node: The node the source is reading from, for logging purposes.
"""
try:
nchunks = 0
bytes_read_from_source = 0
while True:
try:
with ChunkReadTimeout(self.app.node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
bytes_read_from_source += len(chunk)
except ChunkReadTimeout:
exc_type, exc_value, exc_traceback = exc_info()
if self.newest:
raise exc_type, exc_value, exc_traceback
try:
self.fast_forward(bytes_read_from_source)
except (NotImplementedError, HTTPException, ValueError):
raise exc_type, exc_value, exc_traceback
new_source, new_node = self._get_source_and_node()
if new_source:
self.app.exception_occurred(
node, _('Object'),
_('Trying to read during GET (retrying)'))
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
source = new_source
node = new_node
bytes_read_from_source = 0
continue
else:
raise exc_type, exc_value, exc_traceback
if not chunk:
break
with ChunkWriteTimeout(self.app.client_timeout):
yield chunk
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
# greenthreads on its own. We do it manually so that clients
# don't starve.
#
# The number 5 here was chosen by making stuff up. It's not
# every single chunk, but it's not too big either, so it seemed
# like it would probably be an okay choice.
#
# Note that we may trampoline to other greenthreads more often
# than once every 5 chunks, depending on how blocking our
# network IO is; the explicit sleep here simply provides a
# lower bound on the rate of trampolining.
if nchunks % 5 == 0:
sleep()
except ChunkReadTimeout:
self.app.exception_occurred(node, _('Object'),
_('Trying to read during GET'))
raise
except ChunkWriteTimeout:
self.app.logger.warn(
_('Client did not read from proxy within %ss') %
self.app.client_timeout)
self.app.logger.increment('client_timeouts')
except GeneratorExit:
self.app.logger.warn(_('Client disconnected on read'))
except Exception:
self.app.logger.exception(_('Trying to send to client'))
raise
finally:
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
close_swift_conn(source)
def _get_source_and_node(self):
self.statuses = []
self.reasons = []
self.bodies = []
self.source_headers = []
sources = []
for node in self.app.iter_nodes(self.ring, self.partition):
if node in self.used_nodes:
continue
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
node['ip'], node['port'], node['device'],
self.partition, self.req_method, self.path,
headers=self.backend_headers,
query_string=self.req_query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
possible_source = conn.getresponse()
# See NOTE: swift_conn at top of file about this.
possible_source.swift_conn = conn
except (Exception, Timeout):
self.app.exception_occurred(
node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': self.req_method, 'path': self.req_path})
continue
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append('')
close_swift_conn(possible_source)
else:
if self.used_source_etag:
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
if src_headers.get('etag', '').strip('"') != \
self.used_source_etag:
self.statuses.append(HTTP_NOT_FOUND)
self.reasons.append('')
self.bodies.append('')
self.source_headers.append('')
continue
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append('')
self.source_headers.append('')
sources.append((possible_source, node))
if not self.newest: # one good source is enough
break
else:
self.statuses.append(possible_source.status)
self.reasons.append(possible_source.reason)
self.bodies.append(possible_source.read())
self.source_headers.append(possible_source.getheaders())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.app.error_occurred(
node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
{'status': possible_source.status,
'body': self.bodies[-1][:1024],
'type': self.server_type})
if sources:
sources.sort(key=lambda s: source_key(s[0]))
source, node = sources.pop()
for src, _junk in sources:
close_swift_conn(src)
self.used_nodes.append(node)
src_headers = dict(
(k.lower(), v) for k, v in
possible_source.getheaders())
self.used_source_etag = src_headers.get('etag', '').strip('"')
return source, node
return None, None
def get_working_response(self, req):
source, node = self._get_source_and_node()
res = None
if source:
res = Response(request=req)
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
res.app_iter = self._make_app_iter(node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
res.status = source.status
update_headers(res, source.getheaders())
if not res.environ:
res.environ = {}
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
res.accept_ranges = 'bytes'
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
res.charset = None
res.content_type = source.getheader('Content-Type')
return res
class Controller(object):
"""Base WSGI controller class for the proxy"""
server_type = 'Base'
@ -602,71 +883,6 @@ class Controller(object):
headers['referer'] = referer
return headers
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
:param node: dictionary of node to handle errors for
:param msg: error message
"""
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def exception_occurred(self, node, typ, additional_info):
"""
Handle logging of generic exceptions.
:param node: dictionary of node to log the error for
:param typ: server type
:param additional_info: additional information to log
"""
self.app.logger.exception(
_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
'%(info)s'),
{'type': typ, 'ip': node['ip'], 'port': node['port'],
'device': node['device'], 'info': additional_info})
def error_limited(self, node):
"""
Check if the node is currently error limited.
:param node: dictionary of node to check
:returns: True if error limited, False otherwise
"""
now = time.time()
if 'errors' not in node:
return False
if 'last_error' in node and node['last_error'] < \
now - self.app.error_suppression_interval:
del node['last_error']
if 'errors' in node:
del node['errors']
return False
limited = node['errors'] > self.app.error_suppression_limit
if limited:
self.app.logger.debug(
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
return limited
def error_limit(self, node, msg):
"""
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
use :func:`error_occurred`.
:param node: dictionary of node to error limit
:param msg: error message
"""
node['errors'] = self.app.error_suppression_limit + 1
node['last_error'] = time.time()
self.app.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def account_info(self, account, req=None):
"""
Get account information, and also verify that the account exists.
@ -719,62 +935,6 @@ class Controller(object):
info['nodes'] = nodes
return info
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of
nodes. If a node yielded subsequently gets error limited, an
extra node will be yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
swift.common.utils.GreenthreadSafeIterator to serialize access.
Otherwise, you may get ValueErrors from concurrent access. (You also
may not, depending on how logging is configured, the vagaries of
socket IO and eventlet, and the phase of the moon.)
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
:param node_iter: optional iterable of nodes to try. Useful if you
want to filter or reorder the nodes.
"""
part_nodes = ring.get_part_nodes(partition)
if node_iter is None:
node_iter = itertools.chain(part_nodes,
ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes)
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
primary_nodes = self.app.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)))
handoff_nodes = node_iter
nodes_left = self.app.request_node_count(ring)
for node in primary_nodes:
if not self.error_limited(node):
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
handoffs = 0
for node in handoff_nodes:
if not self.error_limited(node):
handoffs += 1
if self.app.log_handoffs:
self.app.logger.increment('handoff_count')
self.app.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs == len(primary_nodes):
self.app.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
def _make_request(self, nodes, part, method, path, headers, query,
logger_thread_locals):
"""
@ -812,11 +972,13 @@ class Controller(object):
return resp.status, resp.reason, resp.getheaders(), \
resp.read()
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node, _('ERROR Insufficient Storage'))
self.app.error_limit(node,
_('ERROR Insufficient Storage'))
except (Exception, Timeout):
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': method, 'path': path})
self.app.exception_occurred(
node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': method, 'path': path})
def make_requests(self, req, ring, part, method, path, headers,
query_string=''):
@ -836,7 +998,7 @@ class Controller(object):
:returns: a swob.Response object
"""
start_nodes = ring.get_part_nodes(part)
nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part))
nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))
pile = GreenAsyncPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
@ -929,92 +1091,6 @@ class Controller(object):
"""
return self.GETorHEAD(req)
def _make_app_iter(self, node, source):
"""
Returns an iterator over the contents of the source (via its read
func). There is also quite a bit of cleanup to ensure garbage
collection works and the underlying socket of the source is closed.
:param source: The bufferedhttp.Response object this iterator should
read from.
:param node: The node the source is reading from, for logging purposes.
"""
try:
nchunks = 0
while True:
with ChunkReadTimeout(self.app.node_timeout):
chunk = source.read(self.app.object_chunk_size)
nchunks += 1
if not chunk:
break
with ChunkWriteTimeout(self.app.client_timeout):
yield chunk
# This is for fairness; if the network is outpacing the CPU,
# we'll always be able to read and write data without
# encountering an EWOULDBLOCK, and so eventlet will not switch
# greenthreads on its own. We do it manually so that clients
# don't starve.
#
# The number 5 here was chosen by making stuff up. It's not
# every single chunk, but it's not too big either, so it seemed
# like it would probably be an okay choice.
#
# Note that we may trampoline to other greenthreads more often
# than once every 5 chunks, depending on how blocking our
# network IO is; the explicit sleep here simply provides a
# lower bound on the rate of trampolining.
if nchunks % 5 == 0:
sleep()
except ChunkReadTimeout:
self.exception_occurred(node, _('Object'),
_('Trying to read during GET'))
raise
except ChunkWriteTimeout:
self.app.logger.warn(
_('Client did not read from proxy within %ss') %
self.app.client_timeout)
self.app.logger.increment('client_timeouts')
except GeneratorExit:
self.app.logger.warn(_('Client disconnected on read'))
except Exception:
self.app.logger.exception(_('Trying to send to client'))
raise
finally:
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
self.close_swift_conn(source)
def close_swift_conn(self, src):
"""
Force close the http connection to the backend.
:param src: the response from the backend
"""
try:
# Since the backends set "Connection: close" in their response
# headers, the response object (src) is solely responsible for the
# socket. The connection object (src.swift_conn) has no references
# to the socket, so calling its close() method does nothing, and
# therefore we don't do it.
#
# Also, since calling the response's close() method might not
# close the underlying socket but only decrement some
# reference-counter, we have a special method here that really,
# really kills the underlying socket with a close() syscall.
src.nuke_from_orbit() # it's the only way to be sure
except Exception:
pass
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
:param src: the response from the backend
:returns: True if found, False if not
"""
return is_success(src.status) or is_redirection(src.status)
def autocreate_account(self, env, account):
"""
Autocreate an account
@ -1047,87 +1123,18 @@ class Controller(object):
:param path: path for the request
:returns: swob.Response object
"""
statuses = []
reasons = []
bodies = []
source_headers = []
sources = []
newest = config_true_value(req.headers.get('x-newest', 'f'))
headers = self.generate_request_headers(req, additional=req.headers)
for node in self.iter_nodes(ring, partition):
start_node_timing = time.time()
try:
with ConnectionTimeout(self.app.conn_timeout):
conn = http_connect(
node['ip'], node['port'], node['device'], partition,
req.method, path, headers=headers,
query_string=req.query_string)
self.app.set_node_timing(node, time.time() - start_node_timing)
with Timeout(self.app.node_timeout):
possible_source = conn.getresponse()
# See NOTE: swift_conn at top of file about this.
possible_source.swift_conn = conn
except (Exception, Timeout):
self.exception_occurred(
node, server_type, _('Trying to %(method)s %(path)s') %
{'method': req.method, 'path': req.path})
continue
if self.is_good_source(possible_source):
# 404 if we know we don't have a synced copy
if not float(possible_source.getheader('X-PUT-Timestamp', 1)):
statuses.append(HTTP_NOT_FOUND)
reasons.append('')
bodies.append('')
source_headers.append('')
self.close_swift_conn(possible_source)
else:
statuses.append(possible_source.status)
reasons.append(possible_source.reason)
bodies.append('')
source_headers.append('')
sources.append((possible_source, node))
if not newest: # one good source is enough
break
else:
statuses.append(possible_source.status)
reasons.append(possible_source.reason)
bodies.append(possible_source.read())
source_headers.append(possible_source.getheaders())
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node, _('ERROR Insufficient Storage'))
elif is_server_error(possible_source.status):
self.error_occurred(node, _('ERROR %(status)d %(body)s '
'From %(type)s Server') %
{'status': possible_source.status,
'body': bodies[-1][:1024],
'type': server_type})
res = None
if sources:
sources.sort(key=lambda s: source_key(s[0]))
source, node = sources.pop()
for src, _junk in sources:
self.close_swift_conn(src)
res = Response(request=req)
if req.method == 'GET' and \
source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT):
res.app_iter = self._make_app_iter(node, source)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
res.status = source.status
update_headers(res, source.getheaders())
if not res.environ:
res.environ = {}
res.environ['swift_x_timestamp'] = \
source.getheader('x-timestamp')
res.accept_ranges = 'bytes'
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
res.charset = None
res.content_type = source.getheader('Content-Type')
backend_headers = self.generate_request_headers(
req, additional=req.headers)
handler = GetOrHeadHandler(self.app, req, server_type, ring,
partition, path, backend_headers)
res = handler.get_working_response(req)
if not res:
res = self.best_response(req, statuses, reasons, bodies,
'%s %s' % (server_type, req.method),
headers=source_headers)
res = self.best_response(
req, handler.statuses, handler.reasons, handler.bodies,
'%s %s' % (server_type, req.method),
headers=handler.source_headers)
try:
(account, container) = split_path(req.path_info, 1, 2)
_set_info_cache(self.app, req.environ, account, container, res)

View File

@ -504,7 +504,7 @@ class ObjectController(Controller):
is_local = self.app.write_affinity_is_local_fn
if is_local is None:
return self.iter_nodes(ring, partition)
return self.app.iter_nodes(ring, partition)
all_nodes = itertools.chain(primary_nodes,
ring.get_more_nodes(partition))
@ -519,20 +519,9 @@ class ObjectController(Controller):
itertools.ifilter(lambda node: node not in first_n_local_nodes,
all_nodes))
return self.iter_nodes(
return self.app.iter_nodes(
ring, partition, node_iter=local_first_node_iter)
def is_good_source(self, src):
"""
Indicates whether or not the request made to the backend found
what it was looking for.
In the case of an object, a 416 indicates that we found a
backend with the object.
"""
return src.status == 416 or \
super(ObjectController, self).is_good_source(src)
def GETorHEAD(self, req):
"""Handle HTTP GET or HEAD requests."""
container_info = self.container_info(
@ -800,8 +789,9 @@ class ObjectController(Controller):
conn.send(chunk)
except (Exception, ChunkWriteTimeout):
conn.failed = True
self.exception_occurred(conn.node, _('Object'),
_('Trying to write to %s') % path)
self.app.exception_occurred(
conn.node, _('Object'),
_('Trying to write to %s') % path)
conn.queue.task_done()
def _connect_put_node(self, nodes, part, path, headers,
@ -827,10 +817,11 @@ class ObjectController(Controller):
conn.node = node
return conn
elif resp.status == HTTP_INSUFFICIENT_STORAGE:
self.error_limit(node, _('ERROR Insufficient Storage'))
self.app.error_limit(node, _('ERROR Insufficient Storage'))
except (Exception, Timeout):
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
self.app.exception_occurred(
node, _('Object'),
_('Expect: 100-continue on %s') % path)
def _get_put_responses(self, req, conns, nodes):
statuses = []
@ -846,7 +837,7 @@ class ObjectController(Controller):
else:
return conn.getresponse()
except (Exception, Timeout):
self.exception_occurred(
self.app.exception_occurred(
conn.node, _('Object'),
_('Trying to get final status of PUT to %s') % req.path)
pile = GreenAsyncPile(len(conns))
@ -858,7 +849,7 @@ class ObjectController(Controller):
reasons.append(response.reason)
bodies.append(response.read())
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
self.error_occurred(
self.app.error_occurred(
conn.node,
_('ERROR %(status)d %(body)s From Object Server '
're: %(path)s') %

View File

@ -19,6 +19,7 @@ import socket
from swift import gettext_ as _
from random import shuffle
from time import time
import itertools
from eventlet import Timeout
@ -332,6 +333,126 @@ class Application(object):
timing = round(timing, 3) # sort timings to the millisecond
self.node_timings[node['ip']] = (timing, now + self.timing_expiry)
def error_limited(self, node):
"""
Check if the node is currently error limited.
:param node: dictionary of node to check
:returns: True if error limited, False otherwise
"""
now = time()
if 'errors' not in node:
return False
if 'last_error' in node and node['last_error'] < \
now - self.error_suppression_interval:
del node['last_error']
if 'errors' in node:
del node['errors']
return False
limited = node['errors'] > self.error_suppression_limit
if limited:
self.logger.debug(
_('Node error limited %(ip)s:%(port)s (%(device)s)'), node)
return limited
def error_limit(self, node, msg):
"""
Mark a node as error limited. This immediately pretends the
node received enough errors to trigger error suppression. Use
this for errors like Insufficient Storage. For other errors
use :func:`error_occurred`.
:param node: dictionary of node to error limit
:param msg: error message
"""
node['errors'] = self.error_suppression_limit + 1
node['last_error'] = time()
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def error_occurred(self, node, msg):
"""
Handle logging, and handling of errors.
:param node: dictionary of node to handle errors for
:param msg: error message
"""
node['errors'] = node.get('errors', 0) + 1
node['last_error'] = time()
self.logger.error(_('%(msg)s %(ip)s:%(port)s/%(device)s'),
{'msg': msg, 'ip': node['ip'],
'port': node['port'], 'device': node['device']})
def iter_nodes(self, ring, partition, node_iter=None):
"""
Yields nodes for a ring partition, skipping over error
limited nodes and stopping at the configurable number of
nodes. If a node yielded subsequently gets error limited, an
extra node will be yielded to take its place.
Note that if you're going to iterate over this concurrently from
multiple greenthreads, you'll want to use a
swift.common.utils.GreenthreadSafeIterator to serialize access.
Otherwise, you may get ValueErrors from concurrent access. (You also
may not, depending on how logging is configured, the vagaries of
socket IO and eventlet, and the phase of the moon.)
:param ring: ring to get yield nodes from
:param partition: ring partition to yield nodes for
:param node_iter: optional iterable of nodes to try. Useful if you
want to filter or reorder the nodes.
"""
part_nodes = ring.get_part_nodes(partition)
if node_iter is None:
node_iter = itertools.chain(part_nodes,
ring.get_more_nodes(partition))
num_primary_nodes = len(part_nodes)
# Use of list() here forcibly yanks the first N nodes (the primary
# nodes) from node_iter, so the rest of its values are handoffs.
primary_nodes = self.sort_nodes(
list(itertools.islice(node_iter, num_primary_nodes)))
handoff_nodes = node_iter
nodes_left = self.request_node_count(ring)
for node in primary_nodes:
if not self.error_limited(node):
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
handoffs = 0
for node in handoff_nodes:
if not self.error_limited(node):
handoffs += 1
if self.log_handoffs:
self.logger.increment('handoff_count')
self.logger.warning(
'Handoff requested (%d)' % handoffs)
if handoffs == len(primary_nodes):
self.logger.increment('handoff_all_count')
yield node
if not self.error_limited(node):
nodes_left -= 1
if nodes_left <= 0:
return
def exception_occurred(self, node, typ, additional_info):
"""
Handle logging of generic exceptions.
:param node: dictionary of node to log the error for
:param typ: server type
:param additional_info: additional information to log
"""
self.logger.exception(
_('ERROR with %(type)s server %(ip)s:%(port)s/%(device)s re: '
'%(info)s'),
{'type': typ, 'ip': node['ip'], 'port': node['port'],
'device': node['device'], 'info': additional_info})
def app_factory(global_conf, **local_conf):
"""paste.deploy app factory for creating WSGI proxy apps."""

View File

@ -426,6 +426,8 @@ def fake_http_connect(*code_iter, **kwargs):
self.body = body
self.headers = headers or {}
self.timestamp = timestamp
if kwargs.get('slow') and isinstance(kwargs['slow'], list):
kwargs['slow'][0] -= 1
def getresponse(self):
if kwargs.get('raise_exc'):
@ -469,13 +471,18 @@ def fake_http_connect(*code_iter, **kwargs):
headers['x-container-timestamp'] = '1'
except StopIteration:
pass
if 'slow' in kwargs:
if self.am_slow():
headers['content-length'] = '4'
headers.update(self.headers)
return headers.items()
def am_slow(self):
if kwargs.get('slow') and isinstance(kwargs['slow'], list):
return kwargs['slow'][0] >= 0
return bool(kwargs.get('slow'))
def read(self, amt=None):
if 'slow' in kwargs:
if self.am_slow():
if self.sent < 4:
self.sent += 1
sleep(0.1)
@ -485,7 +492,7 @@ def fake_http_connect(*code_iter, **kwargs):
return rv
def send(self, amt=None):
if 'slow' in kwargs:
if self.am_slow():
if self.received < 4:
self.received += 1
sleep(0.1)

View File

@ -18,8 +18,9 @@ from mock import patch
from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \
get_container_memcache_key, get_account_info, get_account_memcache_key, \
get_object_env_key, _get_cache_key, get_info, get_object_info, Controller
from swift.common.swob import Request
get_object_env_key, _get_cache_key, get_info, get_object_info, \
Controller, GetOrHeadHandler
from swift.common.swob import Request, HTTPException
from swift.common.utils import split_path
from test.unit import fake_http_connect, FakeRing, FakeMemcache
from swift.proxy import server as proxy_server
@ -446,3 +447,26 @@ class TestFuncs(unittest.TestCase):
self.assertEqual(base.have_quorum([201, 201], 2), True)
self.assertEqual(base.have_quorum([404, 404], 2), True)
self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True)
def test_range_fast_forward(self):
req = Request.blank('/')
handler = GetOrHeadHandler(None, req, None, None, None, None, {})
handler.fast_forward(50)
self.assertEquals(handler.backend_headers['Range'], 'bytes=50-')
handler = GetOrHeadHandler(None, req, None, None, None, None,
{'Range': 'bytes=23-50'})
handler.fast_forward(20)
self.assertEquals(handler.backend_headers['Range'], 'bytes=43-50')
self.assertRaises(HTTPException,
handler.fast_forward, 80)
handler = GetOrHeadHandler(None, req, None, None, None, None,
{'Range': 'bytes=23-'})
handler.fast_forward(20)
self.assertEquals(handler.backend_headers['Range'], 'bytes=43-')
handler = GetOrHeadHandler(None, req, None, None, None, None,
{'Range': 'bytes=-100'})
handler.fast_forward(20)
self.assertEquals(handler.backend_headers['Range'], 'bytes=-80')

View File

@ -810,7 +810,7 @@ class TestObjectController(unittest.TestCase):
controller = \
proxy_server.ObjectController(self.app, 'a', 'c', 'o.jpg')
controller.error_limit(
self.app.error_limit(
self.app.object_ring.get_part_nodes(1)[0], 'test')
set_http_connect(200, 200, # account, container
201, 201, 201, # 3 working backends
@ -2267,6 +2267,73 @@ class TestObjectController(unittest.TestCase):
got_exc = True
self.assert_(got_exc)
def test_node_read_timeout_retry(self):
with save_globals():
self.app.account_ring.get_nodes('account')
for dev in self.app.account_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1
self.app.container_ring.get_nodes('account')
for dev in self.app.container_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1
self.app.object_ring.get_nodes('account')
for dev in self.app.object_ring.devs.values():
dev['ip'] = '127.0.0.1'
dev['port'] = 1
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
self.app.update_request(req)
self.app.node_timeout = 0.1
set_http_connect(200, 200, 200, slow=[3])
resp = req.get_response(self.app)
got_exc = False
try:
resp.body
except ChunkReadTimeout:
got_exc = True
self.assert_(got_exc)
set_http_connect(200, 200, 200, body='lalala', slow=[2])
resp = req.get_response(self.app)
got_exc = False
try:
self.assertEquals(resp.body, 'lalala')
except ChunkReadTimeout:
got_exc = True
self.assert_(not got_exc)
set_http_connect(200, 200, 200, body='lalala', slow=[2],
etags=['a', 'a', 'a'])
resp = req.get_response(self.app)
got_exc = False
try:
self.assertEquals(resp.body, 'lalala')
except ChunkReadTimeout:
got_exc = True
self.assert_(not got_exc)
set_http_connect(200, 200, 200, body='lalala', slow=[2],
etags=['a', 'b', 'a'])
resp = req.get_response(self.app)
got_exc = False
try:
self.assertEquals(resp.body, 'lalala')
except ChunkReadTimeout:
got_exc = True
self.assert_(not got_exc)
req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
set_http_connect(200, 200, 200, body='lalala', slow=[2],
etags=['a', 'b', 'b'])
resp = req.get_response(self.app)
got_exc = False
try:
resp.body
except ChunkReadTimeout:
got_exc = True
self.assert_(got_exc)
def test_node_write_timeout(self):
with save_globals():
self.app.account_ring.get_nodes('account')
@ -2305,44 +2372,35 @@ class TestObjectController(unittest.TestCase):
with save_globals():
try:
self.app.object_ring.max_more_nodes = 2
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
partition, nodes = self.app.object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(self.app.object_ring,
partition):
for node in self.app.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.app.object_ring.max_more_nodes = 20
self.app.request_node_count = lambda r: 20
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
partition, nodes = self.app.object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(self.app.object_ring,
partition):
for node in self.app.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 9)
self.app.log_handoffs = True
self.app.logger = FakeLogger()
self.app.object_ring.max_more_nodes = 2
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
partition, nodes = self.app.object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(self.app.object_ring,
partition):
for node in self.app.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(
@ -2353,15 +2411,12 @@ class TestObjectController(unittest.TestCase):
self.app.log_handoffs = False
self.app.logger = FakeLogger()
self.app.object_ring.max_more_nodes = 2
controller = proxy_server.ObjectController(self.app, 'account',
'container',
'object')
partition, nodes = self.app.object_ring.get_nodes('account',
'container',
'object')
collected_nodes = []
for node in controller.iter_nodes(self.app.object_ring,
partition):
for node in self.app.iter_nodes(self.app.object_ring,
partition):
collected_nodes.append(node)
self.assertEquals(len(collected_nodes), 5)
self.assertEquals(self.app.logger.log_dict['warning'], [])
@ -2370,21 +2425,19 @@ class TestObjectController(unittest.TestCase):
def test_iter_nodes_calls_sort_nodes(self):
with mock.patch.object(self.app, 'sort_nodes') as sort_nodes:
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
for node in controller.iter_nodes(self.app.object_ring, 0):
for node in self.app.iter_nodes(self.app.object_ring, 0):
pass
sort_nodes.assert_called_once_with(
self.app.object_ring.get_part_nodes(0))
def test_iter_nodes_skips_error_limited(self):
with mock.patch.object(self.app, 'sort_nodes', lambda n: n):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] in second_nodes)
controller.error_limit(first_nodes[0], 'test')
second_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
self.app.error_limit(first_nodes[0], 'test')
second_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
self.assertTrue(first_nodes[0] not in second_nodes)
def test_iter_nodes_gives_extra_if_error_limited_inline(self):
@ -2393,33 +2446,31 @@ class TestObjectController(unittest.TestCase):
mock.patch.object(self.app, 'request_node_count',
lambda r: 6),
mock.patch.object(self.app.object_ring, 'max_more_nodes', 99)):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
first_nodes = list(controller.iter_nodes(self.app.object_ring, 0))
first_nodes = list(self.app.iter_nodes(self.app.object_ring, 0))
second_nodes = []
for node in controller.iter_nodes(self.app.object_ring, 0):
for node in self.app.iter_nodes(self.app.object_ring, 0):
if not second_nodes:
controller.error_limit(node, 'test')
self.app.error_limit(node, 'test')
second_nodes.append(node)
self.assertEquals(len(first_nodes), 6)
self.assertEquals(len(second_nodes), 7)
def test_iter_nodes_with_custom_node_iter(self):
controller = proxy_server.ObjectController(self.app, 'a', 'c', 'o')
node_list = [dict(id=n) for n in xrange(10)]
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
mock.patch.object(self.app, 'request_node_count',
lambda r: 3)):
got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
node_iter=iter(node_list)))
got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0,
node_iter=iter(node_list)))
self.assertEqual(node_list[:3], got_nodes)
with nested(
mock.patch.object(self.app, 'sort_nodes', lambda n: n),
mock.patch.object(self.app, 'request_node_count',
lambda r: 1000000)):
got_nodes = list(controller.iter_nodes(self.app.object_ring, 0,
node_iter=iter(node_list)))
got_nodes = list(self.app.iter_nodes(self.app.object_ring, 0,
node_iter=iter(node_list)))
self.assertEqual(node_list, got_nodes)
def test_best_response_sets_headers(self):