diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 53ae1856f2..cd35973634 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2909,9 +2909,10 @@ class ECObjectController(BaseObjectController): safe_iter = GreenthreadSafeIterator(node_iter) policy_options = self.app.get_policy_options(policy) - ec_request_count = policy.ec_ndata + \ - policy_options.concurrent_ec_extra_requests - with ContextPool(ec_request_count) as pool: + ec_request_count = policy.ec_ndata + if policy_options.concurrent_gets: + ec_request_count += policy_options.concurrent_ec_extra_requests + with ContextPool(policy.ec_n_unique_fragments) as pool: pile = GreenAsyncPile(pool) buckets = ECGetResponseCollection(policy) node_iter.set_node_provider(buckets.provide_alternate_node) @@ -2923,7 +2924,7 @@ class ECObjectController(BaseObjectController): self.app.logger.thread_locals) feeder_q = None - if self.app.get_policy_options(policy).concurrent_gets: + if policy_options.concurrent_gets: feeder_q = Queue() pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, partition, policy, buckets, feeder_q, diff --git a/test/unit/__init__.py b/test/unit/__init__.py index a412ba8202..a9df23645b 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -838,6 +838,11 @@ class FakeStatus(object): self.expect_sleep_list.append(None) self.response_sleep = response_sleep + def __repr__(self): + return '%s(%s, expect_status=%r, response_sleep=%s)' % ( + self.__class__.__name__, self.status, + self.expect_status, self.response_sleep) + def get_response_status(self): if self.response_sleep is not None: eventlet.sleep(self.response_sleep) @@ -1078,7 +1083,7 @@ def fake_http_connect(*code_iter, **kwargs): # the code under test may swallow the StopIteration, so by logging # unexpected requests here we allow the test framework to check for # them after the connect function has been used. - unexpected_requests.append((args, kwargs)) + unexpected_requests.append((args, ckwargs)) raise if 'give_connect' in kwargs: @@ -1142,8 +1147,8 @@ def mocked_http_conn(*args, **kwargs): if left_over_status: raise AssertionError('left over status %r' % left_over_status) if fake_conn.unexpected_requests: - raise AssertionError('unexpected requests %r' % - fake_conn.unexpected_requests) + raise AssertionError('unexpected requests:\n%s' % '\n '.join( + '%r' % (req,) for req in fake_conn.unexpected_requests)) def make_timestamp_iter(offset=0): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 04315e1ee5..6a64345457 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2602,6 +2602,50 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments) + def test_ec_concurrent_GET_with_slow_leaders(self): + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-289] + etag = md5(test_data).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + ts = self.ts() + headers = [] + for i, body in enumerate(ec_archive_bodies): + headers.append({ + 'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': len(body), + 'X-Object-Sysmeta-Ec-Frag-Index': + self.policy.get_backend_index(i), + 'X-Backend-Timestamp': ts.internal, + 'X-Timestamp': ts.normal, + 'X-Backend-Durable-Timestamp': ts.internal, + 'X-Backend-Data-Timestamp': ts.internal, + }) + + req = swift.common.swob.Request.blank('/v1/a/c/o') + + policy_opts = self.app.get_policy_options(self.policy) + policy_opts.concurrent_gets = True + policy_opts.concurrency_timeout = 0.0 + + slow_count = 4 + status_codes = ([ + FakeStatus(200, response_sleep=0.2), + ] * slow_count) + ([ + FakeStatus(200, response_sleep=0.1), + ] * (self.policy.ec_n_unique_fragments - slow_count)) + for i in range(slow_count): + # poison the super slow requests + ec_archive_bodies[i] = '' + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(resp.body, test_data, '%r != %r' % ( + resp.body if len(resp.body) < 60 else '%s...' % resp.body[:60], + test_data if len(test_data) < 60 else '%s...' % test_data[:60], + )) + self.assertEqual(len(log.requests), self.policy.ec_n_unique_fragments) + def test_GET_with_slow_nodes_and_failures(self): segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-289] @@ -2722,6 +2766,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): policy_opts = self.app.get_policy_options(self.policy) policy_opts.concurrent_ec_extra_requests = self.policy.ec_nparity - 1 req = swift.common.swob.Request.blank('/v1/a/c/o') + # w/o concurrent_gets ec_extra_requests has no effect + status_codes = [200] * self.policy.ec_ndata + with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, + headers=headers) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + self.assertEqual(len(log.requests), self.policy.ec_ndata) + self.assertEqual(resp.body, test_data) + + policy_opts.concurrent_gets = True status_codes = [200] * (self.policy.object_ring.replicas - 1) with mocked_http_conn(*status_codes, body_iter=ec_archive_bodies, headers=headers) as log: