diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 4872c479d7..d381d5fc14 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -74,6 +74,9 @@ use = egg:swift#proxy # node_timeout = 10 # conn_timeout = 0.5 # +# How long to wait for requests to finish after a quorum has been established. +# post_quorum_timeout = 0.5 +# # How long without an error before a node's error count is reset. This will # also be how long before a node is reenabled after suppression is triggered. # error_suppression_interval = 60 diff --git a/swift/common/utils.py b/swift/common/utils.py index 1290b94d73..390e57c129 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -1595,6 +1595,10 @@ class ContextPool(GreenPool): coro.kill() +class GreenAsyncPileWaitallTimeout(Timeout): + pass + + class GreenAsyncPile(object): """ Runs jobs in a pool of green threads, and the results can be retrieved by @@ -1627,6 +1631,22 @@ class GreenAsyncPile(object): self._inflight += 1 self._pool.spawn(self._run_func, func, args, kwargs) + def waitall(self, timeout): + """ + Wait timeout seconds for any results to come in. + + :param timeout: seconds to wait for results + :returns: list of results accrued in that time + """ + results = [] + try: + with GreenAsyncPileWaitallTimeout(timeout): + while True: + results.append(self.next()) + except (GreenAsyncPileWaitallTimeout, StopIteration): + pass + return results + def __iter__(self): return self diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 35ca457830..61f695f417 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1012,6 +1012,8 @@ class Controller(object): statuses.append(resp[0]) if self.have_quorum(statuses, len(start_nodes)): break + # give any pending requests *some* chance to finish + pile.waitall(self.app.post_quorum_timeout) while len(response) < len(start_nodes): response.append((HTTP_SERVICE_UNAVAILABLE, '', '', '')) statuses, reasons, resp_headers, bodies = zip(*response) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index da690b4680..8235c54125 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -859,6 +859,8 @@ class ObjectController(Controller): etags.add(response.getheader('etag').strip('"')) if self.have_quorum(statuses, len(nodes)): break + # give any pending requests *some* chance to finish + pile.waitall(self.app.post_quorum_timeout) while len(statuses) < len(nodes): statuses.append(HTTP_SERVICE_UNAVAILABLE) reasons.append('') diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 569fc653c8..b489ed9e7c 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -55,6 +55,7 @@ class Application(object): self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.trans_id_suffix = conf.get('trans_id_suffix', '') + self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5)) self.error_suppression_interval = \ int(conf.get('error_suppression_interval', 60)) self.error_suppression_limit = \ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index b262004a08..3ecd175dcb 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -2567,6 +2567,32 @@ class TestGreenAsyncPile(unittest.TestCase): self.assertEqual(next(pile), None) self.assertRaises(StopIteration, lambda: next(pile)) + def test_waitall_timeout_timesout(self): + def run_test(sleep_duration): + eventlet.sleep(sleep_duration) + completed[0] += 1 + return sleep_duration + + completed = [0] + pile = utils.GreenAsyncPile(3) + pile.spawn(run_test, 0.1) + pile.spawn(run_test, 1.0) + self.assertEqual(pile.waitall(0.2), [0.1]) + self.assertEqual(completed[0], 1) + + def test_waitall_timeout_completes(self): + def run_test(sleep_duration): + eventlet.sleep(sleep_duration) + completed[0] += 1 + return sleep_duration + + completed = [0] + pile = utils.GreenAsyncPile(3) + pile.spawn(run_test, 0.1) + pile.spawn(run_test, 0.1) + self.assertEqual(pile.waitall(0.5), [0.1, 0.1]) + self.assertEqual(completed[0], 2) + if __name__ == '__main__': unittest.main()