Fixes to prevent socket hoarding...

Main thing is the addition of a Queue between reading from the backend
server's httplib.Response and the frontend client's webob.Response,
allowing timeouts on either end to tear down the sockets.

Also, Connection: close headers were added to backend requests since
we only ever do one request per connection; this will tear down those
connections more quickly after the request is complete.

Finally, the eventlet.wsgi.WRITE_TIMEOUT is set in case the Eventlet
version supports it, timing out writes to client connections if they
take too long to read.

Change-Id: I18c7559442cf17a47ff30690ffc75010a7f003c4
This commit is contained in:
gholt 2011-11-19 18:03:38 +00:00
parent e5b3e437ad
commit f643a58d32
2 changed files with 116 additions and 30 deletions

View File

@ -132,6 +132,7 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
# Redirect logging other messages by the underlying WSGI software.
wsgi.HttpProtocol.log_message = \
lambda s, f, *a: logger.error('ERROR WSGI: ' + f % a)
wsgi.WRITE_TIMEOUT = int(conf.get('client_timeout') or 60)
eventlet.hubs.use_hub('poll')
eventlet.patcher.monkey_patch(all=False, socket=True)
monkey_patch_mimetools()

View File

@ -42,7 +42,8 @@ import functools
from hashlib import md5
from random import shuffle
from eventlet import sleep, GreenPile, Queue, Timeout
from eventlet import sleep, spawn_n, GreenPile, Timeout
from eventlet.queue import Queue, Empty, Full
from eventlet.timeout import Timeout
from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPMethodNotAllowed, \
HTTPNotFound, HTTPPreconditionFailed, \
@ -389,7 +390,7 @@ class Controller(object):
result_code = 0
attempts_left = self.app.account_ring.replica_count
path = '/%s' % account
headers = {'x-trans-id': self.trans_id}
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(partition, nodes, self.app.account_ring):
try:
with ConnectionTimeout(self.app.conn_timeout):
@ -421,7 +422,8 @@ class Controller(object):
if len(account) > MAX_ACCOUNT_NAME_LENGTH:
return None, None
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-Trans-Id': self.trans_id}
'X-Trans-Id': self.trans_id,
'Connection': 'close'}
resp = self.make_requests(Request.blank('/v1' + path),
self.app.account_ring, partition, 'PUT',
path, [headers] * len(nodes))
@ -474,7 +476,7 @@ class Controller(object):
sync_key = None
container_size = None
attempts_left = self.app.container_ring.replica_count
headers = {'x-trans-id': self.trans_id}
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
try:
with ConnectionTimeout(self.app.conn_timeout):
@ -624,6 +626,94 @@ class Controller(object):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _make_app_iter_reader(self, node, source, queue):
"""
Reads from the source and places data in the queue. It expects
something else be reading from the queue and, if nothing does within
self.app.client_timeout seconds, the process will be aborted.
:param node: The node dict that the source is connected to, for
logging/error-limiting purposes.
:param source: The httplib.Response object to read from.
:param queue: The eventlet.queue.Queue to place read source data into.
"""
try:
try:
while True:
with ChunkReadTimeout(self.app.node_timeout):
chunk = source.read(self.app.object_chunk_size)
if not chunk:
break
queue.put(chunk, timeout=self.app.client_timeout)
except Full:
self.app.logger.warn(
_('Client did not read from queue within %ss') %
self.app.client_timeout)
except (Exception, Timeout):
self.exception_occurred(node, _('Object'),
_('Trying to read during GET'))
finally:
# Ensure the queue getter gets an empty-string-terminator.
queue.resize(2)
queue.put('')
# Close-out the connection as best as possible.
if getattr(source, 'swift_conn', None):
try:
source.swift_conn.close()
except Exception:
pass
source.swift_conn = None
try:
while source.read(self.app.object_chunk_size):
pass
except Exception:
pass
try:
source.close()
except Exception:
pass
def _make_app_iter(self, node, source, response):
"""
Returns an iterator over the contents of the source (via its read
func). The response.bytes_transferred will be incremented as the
iterator is read so as to measure how much the client is actually sent.
response.client_disconnect will be set to true if the GeneratorExit
occurs before all the source is read. There is also quite a bit of
cleanup to ensure garbage collection works and the underlying socket of
the source is closed.
:param response: The webob.Response object this iterator should be
assigned to via response.app_iter.
:param source: The httplib.Response object this iterator should read
from.
:param node: The node the source is reading from, for logging purposes.
"""
try:
try:
# Spawn reader to read from the source and place in the queue.
# We then drop any reference to the source or node, for garbage
# collection purposes.
queue = Queue(1)
spawn_n(self._make_app_iter_reader, node, source, queue)
source = node = None
while True:
chunk = queue.get(timeout=self.app.node_timeout)
if not chunk:
break
yield chunk
response.bytes_transferred += len(chunk)
except Empty:
raise ChunkReadTimeout()
except (GeneratorExit, Timeout):
response.client_disconnect = True
self.app.logger.warn(_('Client disconnected on read'))
except Exception:
self.app.logger.exception(_('Trying to send to client'))
raise
finally:
response.app_iter = None
def GETorHEAD_base(self, req, server_type, partition, nodes, path,
attempts):
"""
@ -649,9 +739,11 @@ class Controller(object):
continue
try:
with ConnectionTimeout(self.app.conn_timeout):
headers = dict(req.headers)
headers['Connection'] = 'close'
conn = http_connect(node['ip'], node['port'],
node['device'], partition, req.method, path,
headers=req.headers,
headers=headers,
query_string=req.query_string)
with Timeout(self.app.node_timeout):
possible_source = conn.getresponse()
@ -703,24 +795,7 @@ class Controller(object):
if req.method == 'GET' and source.status in (200, 206):
res = Response(request=req, conditional_response=True)
res.bytes_transferred = 0
def file_iter():
try:
while True:
with ChunkReadTimeout(self.app.node_timeout):
chunk = source.read(self.app.object_chunk_size)
if not chunk:
break
yield chunk
res.bytes_transferred += len(chunk)
except GeneratorExit:
res.client_disconnect = True
self.app.logger.warn(_('Client disconnected on read'))
except (Exception, Timeout):
self.exception_occurred(node, _('Object'),
_('Trying to read during GET of %s') % req.path)
raise
res.app_iter = file_iter()
res.app_iter = self._make_app_iter(node, source, res)
# See NOTE: swift_conn at top of file about this.
res.swift_conn = source.swift_conn
update_headers(res, source.getheaders())
@ -976,6 +1051,7 @@ class ObjectController(Controller):
headers = []
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['Connection'] = 'close'
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
@ -1150,6 +1226,7 @@ class ObjectController(Controller):
pile = GreenPile(len(nodes))
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['Connection'] = 'close'
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
@ -1291,6 +1368,7 @@ class ObjectController(Controller):
headers = []
for container in containers:
nheaders = dict(req.headers.iteritems())
nheaders['Connection'] = 'close'
nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container
nheaders['X-Container-Partition'] = container_partition
nheaders['X-Container-Device'] = container['device']
@ -1423,7 +1501,8 @@ class ContainerController(Controller):
'x-trans-id': self.trans_id,
'X-Account-Host': '%(ip)s:%(port)s' % account,
'X-Account-Partition': account_partition,
'X-Account-Device': account['device']}
'X-Account-Device': account['device'],
'Connection': 'close'}
nheaders.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
@ -1449,7 +1528,8 @@ class ContainerController(Controller):
container_partition, containers = self.app.container_ring.get_nodes(
self.account_name, self.container_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-trans-id': self.trans_id}
'x-trans-id': self.trans_id,
'Connection': 'close'}
headers.update(value for value in req.headers.iteritems()
if value[0].lower() in self.pass_through_headers or
value[0].lower().startswith('x-container-meta-'))
@ -1475,7 +1555,8 @@ class ContainerController(Controller):
'X-Trans-Id': self.trans_id,
'X-Account-Host': '%(ip)s:%(port)s' % account,
'X-Account-Partition': account_partition,
'X-Account-Device': account['device']})
'X-Account-Device': account['device'],
'Connection': 'close'})
if self.app.memcache:
cache_key = get_container_memcache_key(self.account_name,
self.container_name)
@ -1508,7 +1589,8 @@ class AccountController(Controller):
(len(self.account_name), MAX_ACCOUNT_NAME_LENGTH)
return resp
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-Trans-Id': self.trans_id}
'X-Trans-Id': self.trans_id,
'Connection': 'close'}
resp = self.make_requests(
Request.blank('/v1/' + self.account_name),
self.app.account_ring, partition, 'PUT',
@ -1536,7 +1618,8 @@ class AccountController(Controller):
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'x-trans-id': self.trans_id}
'x-trans-id': self.trans_id,
'Connection': 'close'}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
if self.app.memcache:
@ -1553,7 +1636,8 @@ class AccountController(Controller):
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-Trans-Id': self.trans_id}
'X-Trans-Id': self.trans_id,
'Connection': 'close'}
headers.update(value for value in req.headers.iteritems()
if value[0].lower().startswith('x-account-meta-'))
if self.app.memcache:
@ -1584,7 +1668,8 @@ class AccountController(Controller):
account_partition, accounts = \
self.app.account_ring.get_nodes(self.account_name)
headers = {'X-Timestamp': normalize_timestamp(time.time()),
'X-Trans-Id': self.trans_id}
'X-Trans-Id': self.trans_id,
'Connection': 'close'}
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.make_requests(req, self.app.account_ring,