diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index eacfe68e9d..cd924b0c61 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2346,7 +2346,7 @@ def is_good_source(status): class ECFragGetter(object): def __init__(self, app, req, node_iter, partition, policy, path, - backend_headers, header_provider=None): + backend_headers, header_provider, logger_thread_locals): self.app = app self.req = req self.node_iter = node_iter @@ -2359,6 +2359,7 @@ class ECFragGetter(object): self.skip_bytes = 0 self.bytes_used_from_backend = 0 self.source = None + self.logger_thread_locals = logger_thread_locals def fast_forward(self, num_bytes): """ @@ -2703,8 +2704,8 @@ class ECFragGetter(object): else: return HeaderKeyDict() - def _make_node_request(self, node, node_timeout, logger_thread_locals): - self.app.logger.thread_locals = logger_thread_locals + def _make_node_request(self, node, node_timeout): + self.app.logger.thread_locals = self.logger_thread_locals req_headers = dict(self.backend_headers) ip, port = get_ip_port(node, req_headers) req_headers.update(self.header_provider()) @@ -2772,8 +2773,7 @@ class ECFragGetter(object): self.status = self.reason = self.body = self.source_headers = None for node in self.node_iter: source = self._make_node_request( - node, self.app.recoverable_node_timeout, - self.app.logger.thread_locals) + node, self.app.recoverable_node_timeout) if source: self.node = node @@ -2794,17 +2794,19 @@ class ECFragGetter(object): @ObjectControllerRouter.register(EC_POLICY) class ECObjectController(BaseObjectController): - def _fragment_GET_request(self, req, node_iter, partition, policy, - header_provider=None): + def _fragment_GET_request( + self, req, node_iter, partition, policy, + header_provider, logger_thread_locals): """ Makes a GET request for a fragment. """ + self.app.logger.thread_locals = logger_thread_locals backend_headers = self.generate_request_headers( req, additional=req.headers) getter = ECFragGetter(self.app, req, node_iter, partition, policy, req.swift_entity_path, backend_headers, - header_provider=header_provider) + header_provider, logger_thread_locals) return (getter, getter.response_parts_iter(req)) def _convert_range(self, req, policy): @@ -2860,7 +2862,7 @@ class ECObjectController(BaseObjectController): return range_specs def feed_remaining_primaries(self, safe_iter, pile, req, partition, policy, - buckets, feeder_q): + buckets, feeder_q, logger_thread_locals): timeout = self.app.get_policy_options(policy).concurrency_timeout while True: try: @@ -2871,7 +2873,8 @@ class ECObjectController(BaseObjectController): # primary we won't find out until the next pass pile.spawn(self._fragment_GET_request, req, safe_iter, partition, - policy, buckets.get_extra_headers) + policy, buckets.get_extra_headers, + logger_thread_locals) else: # ran out of primaries break @@ -2914,13 +2917,15 @@ class ECObjectController(BaseObjectController): for node_count in range(ec_request_count): pile.spawn(self._fragment_GET_request, req, safe_iter, partition, - policy, buckets.get_extra_headers) + policy, buckets.get_extra_headers, + self.app.logger.thread_locals) feeder_q = None if self.app.get_policy_options(policy).concurrent_gets: feeder_q = Queue() pool.spawn(self.feed_remaining_primaries, safe_iter, pile, req, - partition, policy, buckets, feeder_q) + partition, policy, buckets, feeder_q, + self.app.logger.thread_locals) extra_requests = 0 # max_extra_requests is an arbitrary hard limit for spawning extra @@ -2947,9 +2952,9 @@ class ECObjectController(BaseObjectController): if requests_available and ( buckets.shortfall > pile._pending or bad_resp): extra_requests += 1 - pile.spawn(self._fragment_GET_request, - req, safe_iter, partition, - policy, buckets.get_extra_headers) + pile.spawn(self._fragment_GET_request, req, safe_iter, + partition, policy, buckets.get_extra_headers, + self.app.logger.thread_locals) if feeder_q: feeder_q.put('stop') diff --git a/test/unit/__init__.py b/test/unit/__init__.py index b80df0c19c..a412ba8202 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -674,10 +674,13 @@ class DebugLogger(FakeLogger): FakeLogger.__init__(self, *args, **kwargs) self.formatter = DebugSwiftLogFormatter( "%(server)s %(levelname)s: %(message)s") + self.records = defaultdict(list) def handle(self, record): self._handle(record) - print(self.formatter.format(record)) + formatted = self.formatter.format(record) + print(formatted) + self.records[record.levelname].append(formatted) class DebugLogAdapter(utils.LogAdapter): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index e785afa884..3f73dd8d5a 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2494,7 +2494,7 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): feeder_q.get.side_effect = feeder_timeout controller.feed_remaining_primaries( safe_iter, pile, req, 0, self.policy, - mock.MagicMock(), feeder_q) + mock.MagicMock(), feeder_q, mock.MagicMock()) expected_timeout = self.app.get_policy_options( self.policy).concurrency_timeout expected_call = mock.call(timeout=expected_timeout) @@ -2505,12 +2505,21 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): def test_GET_timeout(self): req = swift.common.swob.Request.blank('/v1/a/c/o') self.app.recoverable_node_timeout = 0.01 - codes = [FakeStatus(404, response_sleep=1.0)] + \ + codes = [FakeStatus(404, response_sleep=1.0)] * 2 + \ [200] * (self.policy.ec_ndata) with mocked_http_conn(*codes) as log: resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) - self.assertEqual(self.policy.ec_ndata + 1, len(log.requests)) + self.assertEqual(self.policy.ec_ndata + 2, len(log.requests)) + self.assertEqual( + len(self.logger.logger.records['ERROR']), 2, + 'Expected 2 ERROR lines, got %r' % ( + self.logger.logger.records['ERROR'], )) + for retry_line in self.logger.logger.records['ERROR']: + self.assertIn('ERROR with Object server', retry_line) + self.assertIn('Trying to GET', retry_line) + self.assertIn('Timeout (0.01s)', retry_line) + self.assertIn(req.headers['x-trans-id'], retry_line) def test_GET_with_slow_primaries(self): segment_size = self.policy.ec_segment_size @@ -4194,6 +4203,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): error_lines = self.logger.get_lines_for_level('error') self.assertEqual(1, len(error_lines)) self.assertIn('retrying', error_lines[0]) + retry_line = self.logger.logger.records['ERROR'][0] + self.assertIn(req.headers['x-trans-id'], retry_line) def test_GET_read_timeout_resume_mixed_etag(self): segment_size = self.policy.ec_segment_size