From 6f890d2ba92a2b2a83db791e7af3e0aad661b01a Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Fri, 17 Nov 2023 15:48:47 +0000 Subject: [PATCH] proxy: move _get_shard_ranges to ObjectController ...and rename to _get_updating_shard_ranges. The method is only used by the ObjectController, and it is only used to fetch shard ranges in the 'updating' states. Also relocate the associated unit tests. Change-Id: I083e0c6898bf93d8a0dc593acd9723827e55508e --- swift/proxy/controllers/base.py | 28 ---- swift/proxy/controllers/obj.py | 36 +++- test/unit/proxy/controllers/test_base.py | 201 +---------------------- test/unit/proxy/controllers/test_obj.py | 175 +++++++++++++++++++- 4 files changed, 207 insertions(+), 233 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 57ab7722d8..599394d04d 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -2479,31 +2479,3 @@ class Controller(object): "Failed to get shard ranges from %s: invalid data: %r", req.path_qs, err) return None - - def _get_shard_ranges( - self, req, account, container, includes=None, states=None): - """ - Fetch shard ranges from given `account/container`. If `includes` is - given then the shard range for that object name is requested, otherwise - all shard ranges are requested. - - :param req: original Request instance. - :param account: account from which shard ranges should be fetched. - :param container: container from which shard ranges should be fetched. - :param includes: (optional) restricts the list of fetched shard ranges - to those which include the given name. - :param states: (optional) the states of shard ranges to be fetched. - :return: a list of instances of :class:`swift.common.utils.ShardRange`, - or None if there was a problem fetching the shard ranges - """ - params = req.params.copy() - params.pop('limit', None) - params['format'] = 'json' - if includes: - params['includes'] = str_to_wsgi(includes) - if states: - params['states'] = states - headers = {'X-Backend-Record-Type': 'shard'} - listing, response = self._get_container_listing( - req, account, container, headers=headers, params=params) - return self._parse_shard_ranges(req, listing, response), response diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 75a42f138b..5b4c6c1f4a 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -75,7 +75,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ HTTPUnprocessableEntity, Response, HTTPException, \ HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \ - normalize_etag + normalize_etag, str_to_wsgi from swift.common.request_helpers import update_etag_is_at_header, \ resolve_etag_is_at_header, validate_internal_obj, get_ip_port @@ -281,6 +281,32 @@ class BaseObjectController(Controller): """Handler for HTTP HEAD requests.""" return self.GETorHEAD(req) + def _get_updating_shard_ranges( + self, req, account, container, includes=None): + """ + Fetch shard ranges in 'updating' states from given `account/container`. + If `includes` is given then the shard range for that object name is + requested, otherwise all shard ranges are requested. + + :param req: original Request instance. + :param account: account from which shard ranges should be fetched. + :param container: container from which shard ranges should be fetched. + :param includes: (optional) restricts the list of fetched shard ranges + to those which include the given name. + :return: a list of instances of :class:`swift.common.utils.ShardRange`, + or None if there was a problem fetching the shard ranges + """ + params = req.params.copy() + params.pop('limit', None) + params['format'] = 'json' + params['states'] = 'updating' + if includes: + params['includes'] = str_to_wsgi(includes) + headers = {'X-Backend-Record-Type': 'shard'} + listing, response = self._get_container_listing( + req, account, container, headers=headers, params=params) + return self._parse_shard_ranges(req, listing, response), response + def _get_update_shard_caching_disabled(self, req, account, container, obj): """ Fetch all updating shard ranges for the given root container when @@ -294,8 +320,8 @@ class BaseObjectController(Controller): or None if the update should go back to the root """ # legacy behavior requests container server for includes=obj - shard_ranges, response = self._get_shard_ranges( - req, account, container, states='updating', includes=obj) + shard_ranges, response = self._get_updating_shard_ranges( + req, account, container, includes=obj) record_cache_op_metrics( self.logger, self.server_type.lower(), 'shard_updating', 'disabled', response) @@ -336,8 +362,8 @@ class BaseObjectController(Controller): upper=namespace.upper) else: # pull full set of updating shard ranges from backend - shard_ranges, response = self._get_shard_ranges( - req, account, container, states='updating') + shard_ranges, response = self._get_updating_shard_ranges( + req, account, container) if shard_ranges: # only store the list of namespace lower bounds and names into # infocache and memcache. diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 47fe6e2c34..be0704a4e7 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -32,18 +32,17 @@ from swift.proxy.controllers.base import headers_to_container_info, \ record_cache_op_metrics, GetterSource, get_namespaces_from_cache, \ set_namespaces_in_cache from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \ - bytes_to_wsgi, wsgi_to_str + bytes_to_wsgi from swift.common import exceptions -from swift.common.utils import split_path, ShardRange, Timestamp, \ +from swift.common.utils import split_path, Timestamp, \ GreenthreadSafeIterator, GreenAsyncPile, NamespaceBoundList from swift.common.header_key_dict import HeaderKeyDict from swift.common.http import is_success from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection from test.debug_logger import debug_logger from test.unit import ( - fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, - make_timestamp_iter, mocked_http_conn, patch_policies, FakeSource, - StubResponse) + fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, patch_policies, + FakeSource, StubResponse) from swift.common.request_helpers import ( get_sys_meta_prefix, get_object_transient_sysmeta ) @@ -1583,198 +1582,6 @@ class TestFuncs(BaseTest): self.assertEqual(bytes_to_skip(11, 7), 4) self.assertEqual(bytes_to_skip(97, 7873823), 55) - def test_get_shard_ranges_for_container_get(self): - ts_iter = make_timestamp_iter() - shard_ranges = [dict(ShardRange( - '.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i, - '%d_upper' % i, object_count=i, bytes_used=1024 * i, - meta_timestamp=next(ts_iter))) - for i in range(3)] - base = Controller(self.app) - req = Request.blank('/v1/a/c', method='GET') - resp_headers = {'X-Backend-Record-Type': 'shard'} - with mocked_http_conn( - 200, 200, - body_iter=iter([b'', json.dumps(shard_ranges).encode('ascii')]), - headers=resp_headers - ) as fake_conn: - actual, resp = base._get_shard_ranges(req, 'a', 'c') - self.assertEqual(200, resp.status_int) - - # account info - captured = fake_conn.requests - self.assertEqual('HEAD', captured[0]['method']) - self.assertEqual('a', captured[0]['path'][7:]) - # container GET - self.assertEqual('GET', captured[1]['method']) - self.assertEqual('a/c', captured[1]['path'][7:]) - self.assertEqual('format=json', captured[1]['qs']) - self.assertEqual( - 'shard', captured[1]['headers'].get('X-Backend-Record-Type')) - self.assertEqual(shard_ranges, [dict(pr) for pr in actual]) - self.assertFalse(self.app.logger.get_lines_for_level('error')) - - def test_get_shard_ranges_for_object_put(self): - ts_iter = make_timestamp_iter() - shard_ranges = [dict(ShardRange( - '.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i, - '%d_upper' % i, object_count=i, bytes_used=1024 * i, - meta_timestamp=next(ts_iter))) - for i in range(3)] - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - resp_headers = {'X-Backend-Record-Type': 'shard'} - with mocked_http_conn( - 200, 200, - body_iter=iter([b'', - json.dumps(shard_ranges[1:2]).encode('ascii')]), - headers=resp_headers - ) as fake_conn: - actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test') - self.assertEqual(200, resp.status_int) - - # account info - captured = fake_conn.requests - self.assertEqual('HEAD', captured[0]['method']) - self.assertEqual('a', captured[0]['path'][7:]) - # container GET - self.assertEqual('GET', captured[1]['method']) - self.assertEqual('a/c', captured[1]['path'][7:]) - params = sorted(captured[1]['qs'].split('&')) - self.assertEqual( - ['format=json', 'includes=1_test'], params) - self.assertEqual( - 'shard', captured[1]['headers'].get('X-Backend-Record-Type')) - self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual]) - self.assertFalse(self.app.logger.get_lines_for_level('error')) - - def test_get_shard_ranges_for_utf8_object_put(self): - ts_iter = make_timestamp_iter() - shard_ranges = [dict(ShardRange( - '.sharded_a/sr%d' % i, next(ts_iter), u'\u1234%d_lower' % i, - u'\u1234%d_upper' % i, object_count=i, bytes_used=1024 * i, - meta_timestamp=next(ts_iter))) - for i in range(3)] - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - resp_headers = {'X-Backend-Record-Type': 'shard'} - with mocked_http_conn( - 200, 200, - body_iter=iter([b'', - json.dumps(shard_ranges[1:2]).encode('ascii')]), - headers=resp_headers - ) as fake_conn: - actual, resp = base._get_shard_ranges( - req, 'a', 'c', wsgi_to_str('\xe1\x88\xb41_test')) - self.assertEqual(200, resp.status_int) - - # account info - captured = fake_conn.requests - self.assertEqual('HEAD', captured[0]['method']) - self.assertEqual('a', captured[0]['path'][7:]) - # container GET - self.assertEqual('GET', captured[1]['method']) - self.assertEqual('a/c', captured[1]['path'][7:]) - params = sorted(captured[1]['qs'].split('&')) - self.assertEqual( - ['format=json', 'includes=%E1%88%B41_test'], params) - self.assertEqual( - 'shard', captured[1]['headers'].get('X-Backend-Record-Type')) - self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual]) - self.assertFalse(self.app.logger.get_lines_for_level('error')) - - def _check_get_shard_ranges_bad_data(self, body): - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - # empty response - headers = {'X-Backend-Record-Type': 'shard'} - with mocked_http_conn(200, 200, body_iter=iter([b'', body]), - headers=headers): - actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test') - self.assertEqual(200, resp.status_int) - self.assertIsNone(actual) - lines = self.app.logger.get_lines_for_level('error') - return lines - - def test_get_shard_ranges_empty_body(self): - error_lines = self._check_get_shard_ranges_bad_data(b'') - self.assertIn('Problem with listing response', error_lines[0]) - if six.PY2: - self.assertIn('No JSON', error_lines[0]) - else: - self.assertIn('JSONDecodeError', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_not_a_list(self): - body = json.dumps({}).encode('ascii') - error_lines = self._check_get_shard_ranges_bad_data(body) - self.assertIn('Problem with listing response', error_lines[0]) - self.assertIn('not a list', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_key_missing(self): - body = json.dumps([{}]).encode('ascii') - error_lines = self._check_get_shard_ranges_bad_data(body) - self.assertIn('Failed to get shard ranges', error_lines[0]) - self.assertIn('KeyError', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_invalid_shard_range(self): - sr = ShardRange('a/c', Timestamp.now()) - bad_sr_data = dict(sr, name='bad_name') - body = json.dumps([bad_sr_data]).encode('ascii') - error_lines = self._check_get_shard_ranges_bad_data(body) - self.assertIn('Failed to get shard ranges', error_lines[0]) - self.assertIn('ValueError', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_missing_record_type(self): - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - sr = ShardRange('a/c', Timestamp.now()) - body = json.dumps([dict(sr)]).encode('ascii') - with mocked_http_conn( - 200, 200, body_iter=iter([b'', body])): - actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test') - self.assertEqual(200, resp.status_int) - self.assertIsNone(actual) - error_lines = self.app.logger.get_lines_for_level('error') - self.assertIn('Failed to get shard ranges', error_lines[0]) - self.assertIn('unexpected record type', error_lines[0]) - self.assertIn('/a/c', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_wrong_record_type(self): - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - sr = ShardRange('a/c', Timestamp.now()) - body = json.dumps([dict(sr)]).encode('ascii') - headers = {'X-Backend-Record-Type': 'object'} - with mocked_http_conn( - 200, 200, body_iter=iter([b'', body]), - headers=headers): - actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test') - self.assertEqual(200, resp.status_int) - self.assertIsNone(actual) - error_lines = self.app.logger.get_lines_for_level('error') - self.assertIn('Failed to get shard ranges', error_lines[0]) - self.assertIn('unexpected record type', error_lines[0]) - self.assertIn('/a/c', error_lines[0]) - self.assertFalse(error_lines[1:]) - - def test_get_shard_ranges_request_failed(self): - base = Controller(self.app) - req = Request.blank('/v1/a/c/o', method='PUT') - with mocked_http_conn(200, 404, 404, 404): - actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test') - self.assertEqual(404, resp.status_int) - self.assertIsNone(actual) - self.assertFalse(self.app.logger.get_lines_for_level('error')) - warning_lines = self.app.logger.get_lines_for_level('warning') - self.assertIn('Failed to get container listing', warning_lines[0]) - self.assertIn('/a/c', warning_lines[0]) - self.assertFalse(warning_lines[1:]) - @patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestNodeIter(BaseTest): diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index edf0323db9..ddb144474b 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -41,7 +41,8 @@ import swift from swift.common import utils, swob, exceptions from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \ ChunkReadTimeout -from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter +from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter, \ + ShardRange from swift.proxy import server as proxy_server from swift.proxy.controllers import obj from swift.proxy.controllers.base import \ @@ -49,13 +50,13 @@ from swift.proxy.controllers.base import \ NodeIter from swift.common.storage_policy import POLICIES, ECDriverError, \ StoragePolicy, ECStoragePolicy -from swift.common.swob import Request +from swift.common.swob import Request, wsgi_to_str from test.debug_logger import debug_logger from test.unit import ( FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus, DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub, fake_ec_node_response, StubResponse, mocked_http_conn, - quiet_eventlet_exceptions, FakeSource) + quiet_eventlet_exceptions, FakeSource, make_timestamp_iter) from test.unit.proxy.test_server import node_error_count @@ -7178,5 +7179,173 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual(range_headers, [None, 'bytes=8-']) +@patch_policies() +class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase): + def setUp(self): + super(TestGetUpdatingShardRanges, self).setUp() + self.ctrl = obj.BaseObjectController(self.app, 'a', 'c', 'o') + + def test_get_shard_ranges_for_object_put(self): + ts_iter = make_timestamp_iter() + shard_ranges = [dict(ShardRange( + '.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i, + '%d_upper' % i, object_count=i, bytes_used=1024 * i, + meta_timestamp=next(ts_iter))) + for i in range(3)] + req = Request.blank('/v1/a/c/o', method='PUT') + resp_headers = {'X-Backend-Record-Type': 'shard'} + with mocked_http_conn( + 200, 200, + body_iter=iter([b'', + json.dumps(shard_ranges[1:2]).encode('ascii')]), + headers=resp_headers + ) as fake_conn: + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', '1_test') + self.assertEqual(200, resp.status_int) + + # account info + captured = fake_conn.requests + self.assertEqual('HEAD', captured[0]['method']) + self.assertEqual('a', captured[0]['path'][7:]) + # container GET + self.assertEqual('GET', captured[1]['method']) + self.assertEqual('a/c', captured[1]['path'][7:]) + params = sorted(captured[1]['qs'].split('&')) + self.assertEqual( + ['format=json', 'includes=1_test', 'states=updating'], params) + self.assertEqual( + 'shard', captured[1]['headers'].get('X-Backend-Record-Type')) + self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual]) + self.assertFalse(self.app.logger.get_lines_for_level('error')) + + def test_get_shard_ranges_for_utf8_object_put(self): + ts_iter = make_timestamp_iter() + shard_ranges = [dict(ShardRange( + '.sharded_a/sr%d' % i, next(ts_iter), u'\u1234%d_lower' % i, + u'\u1234%d_upper' % i, object_count=i, bytes_used=1024 * i, + meta_timestamp=next(ts_iter))) + for i in range(3)] + req = Request.blank('/v1/a/c/o', method='PUT') + resp_headers = {'X-Backend-Record-Type': 'shard'} + with mocked_http_conn( + 200, 200, + body_iter=iter([b'', + json.dumps(shard_ranges[1:2]).encode('ascii')]), + headers=resp_headers + ) as fake_conn: + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', wsgi_to_str('\xe1\x88\xb41_test')) + self.assertEqual(200, resp.status_int) + + # account info + captured = fake_conn.requests + self.assertEqual('HEAD', captured[0]['method']) + self.assertEqual('a', captured[0]['path'][7:]) + # container GET + self.assertEqual('GET', captured[1]['method']) + self.assertEqual('a/c', captured[1]['path'][7:]) + params = sorted(captured[1]['qs'].split('&')) + self.assertEqual( + ['format=json', 'includes=%E1%88%B41_test', 'states=updating'], + params) + self.assertEqual( + 'shard', captured[1]['headers'].get('X-Backend-Record-Type')) + self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual]) + self.assertFalse(self.app.logger.get_lines_for_level('error')) + + def _check_get_shard_ranges_bad_data(self, body): + req = Request.blank('/v1/a/c/o', method='PUT') + # empty response + resp_headers = {'X-Backend-Record-Type': 'shard'} + with mocked_http_conn(200, 200, body_iter=iter([b'', body]), + headers=resp_headers): + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', '1_test') + self.assertEqual(200, resp.status_int) + self.assertIsNone(actual) + lines = self.app.logger.get_lines_for_level('error') + return lines + + def test_get_shard_ranges_empty_body(self): + error_lines = self._check_get_shard_ranges_bad_data(b'') + self.assertIn('Problem with listing response', error_lines[0]) + if six.PY2: + self.assertIn('No JSON', error_lines[0]) + else: + self.assertIn('JSONDecodeError', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_not_a_list(self): + body = json.dumps({}).encode('ascii') + error_lines = self._check_get_shard_ranges_bad_data(body) + self.assertIn('Problem with listing response', error_lines[0]) + self.assertIn('not a list', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_key_missing(self): + body = json.dumps([{}]).encode('ascii') + error_lines = self._check_get_shard_ranges_bad_data(body) + self.assertIn('Failed to get shard ranges', error_lines[0]) + self.assertIn('KeyError', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_invalid_shard_range(self): + sr = ShardRange('a/c', Timestamp.now()) + bad_sr_data = dict(sr, name='bad_name') + body = json.dumps([bad_sr_data]).encode('ascii') + error_lines = self._check_get_shard_ranges_bad_data(body) + self.assertIn('Failed to get shard ranges', error_lines[0]) + self.assertIn('ValueError', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_missing_record_type(self): + req = Request.blank('/v1/a/c/o', method='PUT') + sr = ShardRange('a/c', Timestamp.now()) + body = json.dumps([dict(sr)]).encode('ascii') + with mocked_http_conn( + 200, 200, body_iter=iter([b'', body])): + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', '1_test') + self.assertEqual(200, resp.status_int) + self.assertIsNone(actual) + error_lines = self.app.logger.get_lines_for_level('error') + self.assertIn('Failed to get shard ranges', error_lines[0]) + self.assertIn('unexpected record type', error_lines[0]) + self.assertIn('/a/c', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_wrong_record_type(self): + req = Request.blank('/v1/a/c/o', method='PUT') + sr = ShardRange('a/c', Timestamp.now()) + body = json.dumps([dict(sr)]).encode('ascii') + headers = {'X-Backend-Record-Type': 'object'} + with mocked_http_conn( + 200, 200, body_iter=iter([b'', body]), + headers=headers): + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', '1_test') + self.assertEqual(200, resp.status_int) + self.assertIsNone(actual) + error_lines = self.app.logger.get_lines_for_level('error') + self.assertIn('Failed to get shard ranges', error_lines[0]) + self.assertIn('unexpected record type', error_lines[0]) + self.assertIn('/a/c', error_lines[0]) + self.assertFalse(error_lines[1:]) + + def test_get_shard_ranges_request_failed(self): + req = Request.blank('/v1/a/c/o', method='PUT') + with mocked_http_conn(200, 404, 404, 404): + actual, resp = self.ctrl._get_updating_shard_ranges( + req, 'a', 'c', '1_test') + self.assertEqual(404, resp.status_int) + self.assertIsNone(actual) + self.assertFalse(self.app.logger.get_lines_for_level('error')) + warning_lines = self.app.logger.get_lines_for_level('warning') + self.assertIn('Failed to get container listing', warning_lines[0]) + self.assertIn('/a/c', warning_lines[0]) + self.assertFalse(warning_lines[1:]) + + if __name__ == '__main__': unittest.main()