slightly less early quorum
The early quorum change has maybe added a little bit too much eventual to the consistency of requests in Swift, and users can sometimes get unexpected results. This change gives us a knob to turn in finding the right balance, by adding a timeout where pending requests can finish after quorum is achieved. Change-Id: Ife91aaa8653e75b01313bbcf19072181739e932c
This commit is contained in:
parent
91deed871b
commit
7207926cff
@ -74,6 +74,9 @@ use = egg:swift#proxy
|
|||||||
# node_timeout = 10
|
# node_timeout = 10
|
||||||
# conn_timeout = 0.5
|
# conn_timeout = 0.5
|
||||||
#
|
#
|
||||||
|
# How long to wait for requests to finish after a quorum has been established.
|
||||||
|
# post_quorum_timeout = 0.5
|
||||||
|
#
|
||||||
# How long without an error before a node's error count is reset. This will
|
# How long without an error before a node's error count is reset. This will
|
||||||
# also be how long before a node is reenabled after suppression is triggered.
|
# also be how long before a node is reenabled after suppression is triggered.
|
||||||
# error_suppression_interval = 60
|
# error_suppression_interval = 60
|
||||||
|
@ -1595,6 +1595,10 @@ class ContextPool(GreenPool):
|
|||||||
coro.kill()
|
coro.kill()
|
||||||
|
|
||||||
|
|
||||||
|
class GreenAsyncPileWaitallTimeout(Timeout):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class GreenAsyncPile(object):
|
class GreenAsyncPile(object):
|
||||||
"""
|
"""
|
||||||
Runs jobs in a pool of green threads, and the results can be retrieved by
|
Runs jobs in a pool of green threads, and the results can be retrieved by
|
||||||
@ -1627,6 +1631,22 @@ class GreenAsyncPile(object):
|
|||||||
self._inflight += 1
|
self._inflight += 1
|
||||||
self._pool.spawn(self._run_func, func, args, kwargs)
|
self._pool.spawn(self._run_func, func, args, kwargs)
|
||||||
|
|
||||||
|
def waitall(self, timeout):
|
||||||
|
"""
|
||||||
|
Wait timeout seconds for any results to come in.
|
||||||
|
|
||||||
|
:param timeout: seconds to wait for results
|
||||||
|
:returns: list of results accrued in that time
|
||||||
|
"""
|
||||||
|
results = []
|
||||||
|
try:
|
||||||
|
with GreenAsyncPileWaitallTimeout(timeout):
|
||||||
|
while True:
|
||||||
|
results.append(self.next())
|
||||||
|
except (GreenAsyncPileWaitallTimeout, StopIteration):
|
||||||
|
pass
|
||||||
|
return results
|
||||||
|
|
||||||
def __iter__(self):
|
def __iter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
@ -1012,6 +1012,8 @@ class Controller(object):
|
|||||||
statuses.append(resp[0])
|
statuses.append(resp[0])
|
||||||
if self.have_quorum(statuses, len(start_nodes)):
|
if self.have_quorum(statuses, len(start_nodes)):
|
||||||
break
|
break
|
||||||
|
# give any pending requests *some* chance to finish
|
||||||
|
pile.waitall(self.app.post_quorum_timeout)
|
||||||
while len(response) < len(start_nodes):
|
while len(response) < len(start_nodes):
|
||||||
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
|
response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))
|
||||||
statuses, reasons, resp_headers, bodies = zip(*response)
|
statuses, reasons, resp_headers, bodies = zip(*response)
|
||||||
|
@ -859,6 +859,8 @@ class ObjectController(Controller):
|
|||||||
etags.add(response.getheader('etag').strip('"'))
|
etags.add(response.getheader('etag').strip('"'))
|
||||||
if self.have_quorum(statuses, len(nodes)):
|
if self.have_quorum(statuses, len(nodes)):
|
||||||
break
|
break
|
||||||
|
# give any pending requests *some* chance to finish
|
||||||
|
pile.waitall(self.app.post_quorum_timeout)
|
||||||
while len(statuses) < len(nodes):
|
while len(statuses) < len(nodes):
|
||||||
statuses.append(HTTP_SERVICE_UNAVAILABLE)
|
statuses.append(HTTP_SERVICE_UNAVAILABLE)
|
||||||
reasons.append('')
|
reasons.append('')
|
||||||
|
@ -55,6 +55,7 @@ class Application(object):
|
|||||||
self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
|
self.object_chunk_size = int(conf.get('object_chunk_size', 65536))
|
||||||
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
|
self.client_chunk_size = int(conf.get('client_chunk_size', 65536))
|
||||||
self.trans_id_suffix = conf.get('trans_id_suffix', '')
|
self.trans_id_suffix = conf.get('trans_id_suffix', '')
|
||||||
|
self.post_quorum_timeout = float(conf.get('post_quorum_timeout', 0.5))
|
||||||
self.error_suppression_interval = \
|
self.error_suppression_interval = \
|
||||||
int(conf.get('error_suppression_interval', 60))
|
int(conf.get('error_suppression_interval', 60))
|
||||||
self.error_suppression_limit = \
|
self.error_suppression_limit = \
|
||||||
|
@ -2567,6 +2567,32 @@ class TestGreenAsyncPile(unittest.TestCase):
|
|||||||
self.assertEqual(next(pile), None)
|
self.assertEqual(next(pile), None)
|
||||||
self.assertRaises(StopIteration, lambda: next(pile))
|
self.assertRaises(StopIteration, lambda: next(pile))
|
||||||
|
|
||||||
|
def test_waitall_timeout_timesout(self):
|
||||||
|
def run_test(sleep_duration):
|
||||||
|
eventlet.sleep(sleep_duration)
|
||||||
|
completed[0] += 1
|
||||||
|
return sleep_duration
|
||||||
|
|
||||||
|
completed = [0]
|
||||||
|
pile = utils.GreenAsyncPile(3)
|
||||||
|
pile.spawn(run_test, 0.1)
|
||||||
|
pile.spawn(run_test, 1.0)
|
||||||
|
self.assertEqual(pile.waitall(0.2), [0.1])
|
||||||
|
self.assertEqual(completed[0], 1)
|
||||||
|
|
||||||
|
def test_waitall_timeout_completes(self):
|
||||||
|
def run_test(sleep_duration):
|
||||||
|
eventlet.sleep(sleep_duration)
|
||||||
|
completed[0] += 1
|
||||||
|
return sleep_duration
|
||||||
|
|
||||||
|
completed = [0]
|
||||||
|
pile = utils.GreenAsyncPile(3)
|
||||||
|
pile.spawn(run_test, 0.1)
|
||||||
|
pile.spawn(run_test, 0.1)
|
||||||
|
self.assertEqual(pile.waitall(0.5), [0.1, 0.1])
|
||||||
|
self.assertEqual(completed[0], 2)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user