Merge "Use bigger GreenPool for concurrent EC"
This commit is contained in:
commit
4236a6efa3
@ -2909,9 +2909,10 @@ class ECObjectController(BaseObjectController):
|
||||
safe_iter = GreenthreadSafeIterator(node_iter)
|
||||
|
||||
policy_options = self.app.get_policy_options(policy)
|
||||
ec_request_count = policy.ec_ndata + \
|
||||
policy_options.concurrent_ec_extra_requests
|
||||
with ContextPool(ec_request_count) as pool:
|
||||
ec_request_count = policy.ec_ndata
|
||||
if policy_options.concurrent_gets:
|
||||
ec_request_count += policy_options.concurrent_ec_extra_requests
|
||||
with ContextPool(policy.ec_n_unique_fragments) as pool:
|
||||
pile = GreenAsyncPile(pool)
|
||||
buckets = ECGetResponseCollection(policy)
|
||||
node_iter.set_node_provider(buckets.provide_alternate_node)
|
||||
@ -2923,7 +2924,7 @@ class ECObjectController(BaseObjectController):
|
||||
self.app.logger.thread_locals)
|
||||
|
||||
feeder_q = None
|
||||
if self.app.get_policy_options(policy).concurrent_gets:
|
||||
if policy_options.concurrent_gets:
|
||||
feeder_q = Queue()
|
||||
pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req,
|
||||
partition, policy, buckets, feeder_q,
|
||||
|
@ -838,6 +838,11 @@ class FakeStatus(object):
|
||||
self.expect_sleep_list.append(None)
|
||||
self.response_sleep = response_sleep
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%s, expect_status=%r, response_sleep=%s)' % (
|
||||
self.__class__.__name__, self.status,
|
||||
self.expect_status, self.response_sleep)
|
||||
|
||||
def get_response_status(self):
|
||||
if self.response_sleep is not None:
|
||||
eventlet.sleep(self.response_sleep)
|
||||
@ -1078,7 +1083,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
# the code under test may swallow the StopIteration, so by logging
|
||||
# unexpected requests here we allow the test framework to check for
|
||||
# them after the connect function has been used.
|
||||
unexpected_requests.append((args, kwargs))
|
||||
unexpected_requests.append((args, ckwargs))
|
||||
raise
|
||||
|
||||
if 'give_connect' in kwargs:
|
||||
@ -1142,8 +1147,8 @@ def mocked_http_conn(*args, **kwargs):
|
||||
if left_over_status:
|
||||
raise AssertionError('left over status %r' % left_over_status)
|
||||
if fake_conn.unexpected_requests:
|
||||
raise AssertionError('unexpected requests %r' %
|
||||
fake_conn.unexpected_requests)
|
||||
raise AssertionError('unexpected requests:\n%s' % '\n '.join(
|
||||
'%r' % (req,) for req in fake_conn.unexpected_requests))
|
||||
|
||||
|
||||
def make_timestamp_iter(offset=0):
|
||||
|
@ -2602,6 +2602,50 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
self.assertEqual(len(log.requests),
|
||||
self.policy.ec_n_unique_fragments)
|
||||
|
||||
def test_ec_concurrent_GET_with_slow_leaders(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-289]
|
||||
etag = md5(test_data).hexdigest()
|
||||
ec_archive_bodies = self._make_ec_archive_bodies(test_data)
|
||||
ts = self.ts()
|
||||
headers = []
|
||||
for i, body in enumerate(ec_archive_bodies):
|
||||
headers.append({
|
||||
'X-Object-Sysmeta-Ec-Etag': etag,
|
||||
'X-Object-Sysmeta-Ec-Content-Length': len(body),
|
||||
'X-Object-Sysmeta-Ec-Frag-Index':
|
||||
self.policy.get_backend_index(i),
|
||||
'X-Backend-Timestamp': ts.internal,
|
||||
'X-Timestamp': ts.normal,
|
||||
'X-Backend-Durable-Timestamp': ts.internal,
|
||||
'X-Backend-Data-Timestamp': ts.internal,
|
||||
})
|
||||
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_gets = True
|
||||
policy_opts.concurrency_timeout = 0.0
|
||||
|
||||
slow_count = 4
|
||||
status_codes = ([
|
||||
FakeStatus(200, response_sleep=0.2),
|
||||
] * slow_count) + ([
|
||||
FakeStatus(200, response_sleep=0.1),
|
||||
] * (self.policy.ec_n_unique_fragments - slow_count))
|
||||
for i in range(slow_count):
|
||||
# poison the super slow requests
|
||||
ec_archive_bodies[i] = ''
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.body, test_data, '%r != %r' % (
|
||||
resp.body if len(resp.body) < 60 else '%s...' % resp.body[:60],
|
||||
test_data if len(test_data) < 60 else '%s...' % test_data[:60],
|
||||
))
|
||||
self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments)
|
||||
|
||||
def test_GET_with_slow_nodes_and_failures(self):
|
||||
segment_size = self.policy.ec_segment_size
|
||||
test_data = (b'test' * segment_size)[:-289]
|
||||
@ -2722,6 +2766,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
|
||||
policy_opts = self.app.get_policy_options(self.policy)
|
||||
policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1
|
||||
req = swift.common.swob.Request.blank('/v1/a/c/o')
|
||||
# w/o concurrent_gets ec_extra_requests has no effect
|
||||
status_codes = [200] * self.policy.ec_ndata
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
resp = req.get_response(self.app)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(len(log.requests), self.policy.ec_ndata)
|
||||
self.assertEqual(resp.body, test_data)
|
||||
|
||||
policy_opts.concurrent_gets = True
|
||||
status_codes = [200] * (self.policy.object_ring.replicas - 1)
|
||||
with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies,
|
||||
headers=headers) as log:
|
||||
|
Loading…
Reference in New Issue
Block a user