Merge "slightly less early quorum"

This commit is contained in:
Jenkins 2013-11-27 15:59:37 +00:00 committed by Gerrit Code Review
commit d8e46eba47
6 changed files with 54 additions and 0 deletions

View File

@ -74,6 +74,9 @@ use = egg:swift#proxy
# node_timeout = 10 # node_timeout = 10
# conn_timeout = 0.5 # conn_timeout = 0.5
# #
# How long to wait for requests to finish after a quorum has been established.
# post_quorum_timeout = 0.5
#
# How long without an error before a node's error count is reset. This will # How long without an error before a node's error count is reset. This will
# also be how long before a node is reenabled after suppression is triggered. # also be how long before a node is reenabled after suppression is triggered.
# error_suppression_interval = 60 # error_suppression_interval = 60

View File

@ -1595,6 +1595,10 @@ class ContextPool(GreenPool):
coro.kill() coro.kill()
class GreenAsyncPileWaitallTimeout(Timeout):
pass
class GreenAsyncPile(object): class GreenAsyncPile(object):
""" """
Runs jobs in a pool of green threads, and the results can be retrieved by Runs jobs in a pool of green threads, and the results can be retrieved by
@ -1627,6 +1631,22 @@ class GreenAsyncPile(object):
self._inflight += 1 self._inflight += 1
self._pool.spawn(self._run_func, func, args, kwargs) self._pool.spawn(self._run_func, func, args, kwargs)
def waitall(self, timeout):
"""
Wait timeout seconds for any results to come in.
:param timeout: seconds to wait for results
:returns: list of results accrued in that time
"""
results = []
try:
with GreenAsyncPileWaitallTimeout(timeout):
while True:
results.append(self.next())
except (GreenAsyncPileWaitallTimeout, StopIteration):
pass
return results
def __iter__(self): def __iter__(self):
return self return self

View File

@ -1012,6 +1012,8 @@ class Controller(object):
statuses.append(resp[0]) statuses.append(resp[0])
if self.have_quorum(statuses, len(start_nodes)): if self.have_quorum(statuses, len(start_nodes)):
break break
# give any pending requests *some* chance to finish
pile.waitall(self.app.post_quorum_timeout)
while len(response) < len(start_nodes): while len(response) < len(start_nodes):
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', '')) response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
statuses, reasons, resp_headers, bodies = zip(*response) statuses, reasons, resp_headers, bodies = zip(*response)

View File

@ -859,6 +859,8 @@ class ObjectController(Controller):
etags.add(response.getheader('etag').strip('"')) etags.add(response.getheader('etag').strip('"'))
if self.have_quorum(statuses, len(nodes)): if self.have_quorum(statuses, len(nodes)):
break break
# give any pending requests *some* chance to finish
pile.waitall(self.app.post_quorum_timeout)
while len(statuses) < len(nodes): while len(statuses) < len(nodes):
statuses.append(HTTP_SERVICE_UNAVAILABLE) statuses.append(HTTP_SERVICE_UNAVAILABLE)
reasons.append('') reasons.append('')

View File

@ -55,6 +55,7 @@ class Application(object):
self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
self.trans_id_suffix = conf.get('trans_id_suffix', '') self.trans_id_suffix = conf.get('trans_id_suffix', '')
self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
self.error_suppression_interval = \ self.error_suppression_interval = \
int(conf.get('error_suppression_interval', 60)) int(conf.get('error_suppression_interval', 60))
self.error_suppression_limit = \ self.error_suppression_limit = \

View File

@ -2567,6 +2567,32 @@ class TestGreenAsyncPile(unittest.TestCase):
self.assertEqual(next(pile), None) self.assertEqual(next(pile), None)
self.assertRaises(StopIteration, lambda: next(pile)) self.assertRaises(StopIteration, lambda: next(pile))
def test_waitall_timeout_timesout(self):
def run_test(sleep_duration):
eventlet.sleep(sleep_duration)
completed[0] += 1
return sleep_duration
completed = [0]
pile = utils.GreenAsyncPile(3)
pile.spawn(run_test, 0.1)
pile.spawn(run_test, 1.0)
self.assertEqual(pile.waitall(0.2), [0.1])
self.assertEqual(completed[0], 1)
def test_waitall_timeout_completes(self):
def run_test(sleep_duration):
eventlet.sleep(sleep_duration)
completed[0] += 1
return sleep_duration
completed = [0]
pile = utils.GreenAsyncPile(3)
pile.spawn(run_test, 0.1)
pile.spawn(run_test, 0.1)
self.assertEqual(pile.waitall(0.5), [0.1, 0.1])
self.assertEqual(completed[0], 2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()