Merge "early quorum responses"

This commit is contained in:
Jenkins 2013-11-08 00:35:15 +00:00 committed by Gerrit Code Review
commit c8b6bc7bd5
6 changed files with 179 additions and 44 deletions

View File

@ -52,6 +52,7 @@ import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \
greenio, event
from eventlet.green import socket, threading
import eventlet.queue
import netifaces
import codecs
utf8_decoder = codecs.getdecoder('utf-8')
@ -1591,6 +1592,51 @@ class ContextPool(GreenPool):
coro.kill()
class GreenAsyncPile(object):
"""
Runs jobs in a pool of green threads, and the results can be retrieved by
using this object as an iterator.
This is very similar in principle to eventlet.GreenPile, except it returns
results as they become available rather than in the order they were
launched.
Correlating results with jobs (if necessary) is left to the caller.
"""
def __init__(self, size):
"""
:param size: size pool of green threads to use
"""
self._pool = GreenPool(size)
self._responses = eventlet.queue.LightQueue(size)
self._inflight = 0
def _run_func(self, func, args, kwargs):
try:
self._responses.put(func(*args, **kwargs))
finally:
self._inflight -= 1
def spawn(self, func, *args, **kwargs):
"""
Spawn a job in a green thread on the pile.
"""
self._inflight += 1
self._pool.spawn(self._run_func, func, args, kwargs)
def __iter__(self):
return self
def next(self):
try:
return self._responses.get_nowait()
except Empty:
if self._inflight == 0:
raise StopIteration()
else:
return self._responses.get()
class ModifiedParseResult(ParseResult):
"Parse results class for urlparse."

View File

@ -32,13 +32,13 @@ import itertools
from swift import gettext_ as _
from urllib import quote
from eventlet import sleep, GreenPile
from eventlet import sleep
from eventlet.timeout import Timeout
from swift.common.wsgi import make_pre_authed_env
from swift.common.utils import normalize_timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
quorum_size
quorum_size, GreenAsyncPile
from swift.common.bufferedhttp import http_connect
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
ConnectionTimeout
@ -836,11 +836,19 @@ class Controller(object):
"""
start_nodes = ring.get_part_nodes(part)
nodes = GreenthreadSafeIterator(self.iter_nodes(ring, part))
pile = GreenPile(len(start_nodes))
pile = GreenAsyncPile(len(start_nodes))
for head in headers:
pile.spawn(self._make_request, nodes, part, method, path,
head, query_string, self.app.logger.thread_locals)
response = [resp for resp in pile if resp]
response = []
statuses = []
for resp in pile:
if not resp:
continue
response.append(resp)
statuses.append(resp[0])
if self.have_quorum(statuses, len(start_nodes)):
break
while len(response) < len(start_nodes):
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
statuses, reasons, resp_headers, bodies = zip(*response)
@ -848,6 +856,23 @@ class Controller(object):
'%s %s' % (self.server_type, req.method),
headers=resp_headers)
def have_quorum(self, statuses, node_count):
"""
Given a list of statuses from several requests, determine if
a quorum response can already be decided.
:param statuses: list of statuses returned
:param node_count: number of nodes being queried (basically ring count)
:returns: True or False, depending on if quorum is established
"""
quorum = quorum_size(node_count)
if len(statuses) >= quorum:
for hundred in (HTTP_OK, HTTP_MULTIPLE_CHOICES, HTTP_BAD_REQUEST):
if sum(1 for s in statuses
if hundred <= s < hundred + 100) >= quorum:
return True
return False
def best_response(self, req, statuses, reasons, bodies, server_type,
etag=None, headers=None):
"""

View File

@ -41,7 +41,7 @@ from eventlet.timeout import Timeout
from swift.common.utils import ContextPool, normalize_timestamp, \
config_true_value, public, json, csv_append, GreenthreadSafeIterator, \
quorum_size, split_path, override_bytes_from_content_type, \
get_valid_utf8_str
get_valid_utf8_str, GreenAsyncPile
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
@ -848,6 +848,48 @@ class ObjectController(Controller):
self.exception_occurred(node, _('Object'),
_('Expect: 100-continue on %s') % path)
def _get_put_responses(self, req, conns, nodes):
statuses = []
reasons = []
bodies = []
etags = set()
def get_conn_response(conn):
try:
with Timeout(self.app.node_timeout):
if conn.resp:
return conn.resp
else:
return conn.getresponse()
except (Exception, Timeout):
self.exception_occurred(
conn.node, _('Object'),
_('Trying to get final status of PUT to %s') % req.path)
pile = GreenAsyncPile(len(conns))
for conn in conns:
pile.spawn(get_conn_response, conn)
for response in pile:
if response:
statuses.append(response.status)
reasons.append(response.reason)
bodies.append(response.read())
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
self.error_occurred(
conn.node,
_('ERROR %(status)d %(body)s From Object Server '
're: %(path)s') %
{'status': response.status,
'body': bodies[-1][:1024], 'path': req.path})
elif is_success(response.status):
etags.add(response.getheader('etag').strip('"'))
if self.have_quorum(statuses, len(nodes)):
break
while len(statuses) < len(nodes):
statuses.append(HTTP_SERVICE_UNAVAILABLE)
reasons.append('')
bodies.append('')
return statuses, reasons, bodies, etags
@public
@cors_validation
@delay_denial
@ -1124,42 +1166,15 @@ class ObjectController(Controller):
_('Client disconnected without sending enough data'))
self.app.logger.increment('client_disconnects')
return HTTPClientDisconnect(request=req)
statuses = []
reasons = []
bodies = []
etags = set()
for conn in conns:
try:
with Timeout(self.app.node_timeout):
if conn.resp:
response = conn.resp
else:
response = conn.getresponse()
statuses.append(response.status)
reasons.append(response.reason)
bodies.append(response.read())
if response.status >= HTTP_INTERNAL_SERVER_ERROR:
self.error_occurred(
conn.node,
_('ERROR %(status)d %(body)s From Object Server '
're: %(path)s') %
{'status': response.status,
'body': bodies[-1][:1024], 'path': req.path})
elif is_success(response.status):
etags.add(response.getheader('etag').strip('"'))
except (Exception, Timeout):
self.exception_occurred(
conn.node, _('Object'),
_('Trying to get final status of PUT to %s') % req.path)
statuses, reasons, bodies, etags = self._get_put_responses(req, conns,
nodes)
if len(etags) > 1:
self.app.logger.error(
_('Object servers returned %s mismatched etags'), len(etags))
return HTTPServerError(request=req)
etag = etags.pop() if len(etags) else None
while len(statuses) < len(nodes):
statuses.append(HTTP_SERVICE_UNAVAILABLE)
reasons.append('')
bodies.append('')
resp = self.best_response(req, statuses, reasons, bodies,
_('Object PUT'), etag=etag)
if source_header:

View File

@ -21,6 +21,7 @@ from test.unit import temptree
import ctypes
import errno
import eventlet
import eventlet.event
import logging
import os
import random
@ -2483,5 +2484,40 @@ class TestAuditLocationGenerator(unittest.TestCase):
[(obj_path, "drive", "partition2")])
class TestGreenAsyncPile(unittest.TestCase):
def test_runs_everything(self):
def run_test():
tests_ran[0] += 1
return tests_ran[0]
tests_ran = [0]
pile = utils.GreenAsyncPile(3)
for x in xrange(3):
pile.spawn(run_test)
self.assertEqual(sorted(x for x in pile), [1, 2, 3])
def test_is_asynchronous(self):
def run_test(index):
events[index].wait()
return index
pile = utils.GreenAsyncPile(3)
for order in ((1, 2, 0), (0, 1, 2), (2, 1, 0), (0, 2, 1)):
events = [eventlet.event.Event(), eventlet.event.Event(),
eventlet.event.Event()]
for x in xrange(3):
pile.spawn(run_test, x)
for x in order:
events[x].send()
self.assertEqual(next(pile), x)
def test_next_when_empty(self):
def run_test():
pass
pile = utils.GreenAsyncPile(3)
pile.spawn(run_test)
self.assertEqual(next(pile), None)
self.assertRaises(StopIteration, lambda: next(pile))
if __name__ == '__main__':
unittest.main()

View File

@ -433,3 +433,16 @@ class TestFuncs(unittest.TestCase):
self.assertEquals(
resp,
headers_to_object_info(headers.items(), 200))
def test_have_quorum(self):
base = Controller(self.app)
# just throw a bunch of test cases at it
self.assertEqual(base.have_quorum([201, 404], 3), False)
self.assertEqual(base.have_quorum([201, 201], 4), False)
self.assertEqual(base.have_quorum([201, 201, 404, 404], 4), False)
self.assertEqual(base.have_quorum([201, 503, 503, 201], 4), False)
self.assertEqual(base.have_quorum([201, 201], 3), True)
self.assertEqual(base.have_quorum([404, 404], 3), True)
self.assertEqual(base.have_quorum([201, 201], 2), True)
self.assertEqual(base.have_quorum([404, 404], 2), True)
self.assertEqual(base.have_quorum([201, 404, 201, 201], 4), True)

View File

@ -2907,7 +2907,7 @@ class TestObjectController(unittest.TestCase):
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'COPY'},
headers={'Destination': 'c/o'})
req.account = 'a'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
@ -2919,7 +2919,7 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': 'c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
@ -2930,7 +2930,7 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
@ -2942,7 +2942,7 @@ class TestObjectController(unittest.TestCase):
headers={'Destination': '/c/o'})
req.account = 'a'
controller.object_name = 'o/o2'
set_http_connect(200, 200, 200, 200, 200, 200, 200, 201, 201, 201)
set_http_connect(200, 200, 200, 200, 200, 201, 201, 201, 200, 200)
# acct cont acct cont objc objc objc obj obj obj
self.app.memcache.store = {}
resp = controller.COPY(req)
@ -5151,7 +5151,7 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.PUT,
(200, 200, 200, 201, 201, 201), 201,
(200, 201, 201, 201), 201,
missing_container=True)
self.app.max_containers_per_account = 12345
@ -5165,7 +5165,7 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.assert_status_map(controller.PUT,
(200, 200, 200, 201, 201, 201), 201,
(200, 201, 201, 201), 201,
missing_container=True)
def test_PUT_max_container_name_length(self):
@ -5174,7 +5174,7 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'1' * limit)
self.assert_status_map(controller.PUT,
(200, 200, 200, 201, 201, 201), 201,
(200, 201, 201, 201), 201,
missing_container=True)
controller = proxy_server.ContainerController(self.app, 'account',
'2' * (limit + 1))
@ -5259,7 +5259,7 @@ class TestContainerController(unittest.TestCase):
controller = proxy_server.ContainerController(self.app, 'account',
'container')
self.app.memcache = MockMemcache(allow_lock=True)
set_http_connect(200, 200, 200, 201, 201, 201,
set_http_connect(200, 201, 201, 201,
missing_container=True)
req = Request.blank('/a/c', environ={'REQUEST_METHOD': 'PUT'})
self.app.update_request(req)