proxy: move _get_shard_ranges to ObjectController

...and rename to _get_updating_shard_ranges.

The method is only used by the ObjectController, and it is only used
to fetch shard ranges in the 'updating' states.

Also relocate the associated unit tests.

Change-Id: I083e0c6898bf93d8a0dc593acd9723827e55508e
This commit is contained in:
Alistair Coles 2023-11-17 15:48:47 +00:00
parent 72ac5b3be0
commit 6f890d2ba9
4 changed files with 207 additions and 233 deletions

View File

@ -2479,31 +2479,3 @@ class Controller(object):
"Failed to get shard ranges from %s: invalid data: %r",
req.path_qs, err)
return None
def _get_shard_ranges(
self, req, account, container, includes=None, states=None):
"""
Fetch shard ranges from given `account/container`. If `includes` is
given then the shard range for that object name is requested, otherwise
all shard ranges are requested.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param includes: (optional) restricts the list of fetched shard ranges
to those which include the given name.
:param states: (optional) the states of shard ranges to be fetched.
:return: a list of instances of :class:`swift.common.utils.ShardRange`,
or None if there was a problem fetching the shard ranges
"""
params = req.params.copy()
params.pop('limit', None)
params['format'] = 'json'
if includes:
params['includes'] = str_to_wsgi(includes)
if states:
params['states'] = states
headers = {'X-Backend-Record-Type': 'shard'}
listing, response = self._get_container_listing(
req, account, container, headers=headers, params=params)
return self._parse_shard_ranges(req, listing, response), response

View File

@ -75,7 +75,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
HTTPUnprocessableEntity, Response, HTTPException, \
HTTPRequestedRangeNotSatisfiable, Range, HTTPInternalServerError, \
normalize_etag
normalize_etag, str_to_wsgi
from swift.common.request_helpers import update_etag_is_at_header, \
resolve_etag_is_at_header, validate_internal_obj, get_ip_port
@ -281,6 +281,32 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _get_updating_shard_ranges(
self, req, account, container, includes=None):
"""
Fetch shard ranges in 'updating' states from given `account/container`.
If `includes` is given then the shard range for that object name is
requested, otherwise all shard ranges are requested.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param includes: (optional) restricts the list of fetched shard ranges
to those which include the given name.
:return: a list of instances of :class:`swift.common.utils.ShardRange`,
or None if there was a problem fetching the shard ranges
"""
params = req.params.copy()
params.pop('limit', None)
params['format'] = 'json'
params['states'] = 'updating'
if includes:
params['includes'] = str_to_wsgi(includes)
headers = {'X-Backend-Record-Type': 'shard'}
listing, response = self._get_container_listing(
req, account, container, headers=headers, params=params)
return self._parse_shard_ranges(req, listing, response), response
def _get_update_shard_caching_disabled(self, req, account, container, obj):
"""
Fetch all updating shard ranges for the given root container when
@ -294,8 +320,8 @@ class BaseObjectController(Controller):
or None if the update should go back to the root
"""
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
shard_ranges, response = self._get_updating_shard_ranges(
req, account, container, includes=obj)
record_cache_op_metrics(
self.logger, self.server_type.lower(), 'shard_updating',
'disabled', response)
@ -336,8 +362,8 @@ class BaseObjectController(Controller):
upper=namespace.upper)
else:
# pull full set of updating shard ranges from backend
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating')
shard_ranges, response = self._get_updating_shard_ranges(
req, account, container)
if shard_ranges:
# only store the list of namespace lower bounds and names into
# infocache and memcache.

View File

@ -32,18 +32,17 @@ from swift.proxy.controllers.base import headers_to_container_info, \
record_cache_op_metrics, GetterSource, get_namespaces_from_cache, \
set_namespaces_in_cache
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
bytes_to_wsgi, wsgi_to_str
bytes_to_wsgi
from swift.common import exceptions
from swift.common.utils import split_path, ShardRange, Timestamp, \
from swift.common.utils import split_path, Timestamp, \
GreenthreadSafeIterator, GreenAsyncPile, NamespaceBoundList
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
from test.debug_logger import debug_logger
from test.unit import (
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies,
make_timestamp_iter, mocked_http_conn, patch_policies, FakeSource,
StubResponse)
fake_http_connect, FakeRing, FakeMemcache, PatchPolicies, patch_policies,
FakeSource, StubResponse)
from swift.common.request_helpers import (
get_sys_meta_prefix, get_object_transient_sysmeta
)
@ -1583,198 +1582,6 @@ class TestFuncs(BaseTest):
self.assertEqual(bytes_to_skip(11, 7), 4)
self.assertEqual(bytes_to_skip(97, 7873823), 55)
def test_get_shard_ranges_for_container_get(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
base = Controller(self.app)
req = Request.blank('/v1/a/c', method='GET')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'', json.dumps(shard_ranges).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = base._get_shard_ranges(req, 'a', 'c')
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
self.assertEqual('format=json', captured[1]['qs'])
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges, [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_shard_ranges_for_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=1_test'], params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_shard_ranges_for_utf8_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), u'\u1234%d_lower' % i,
u'\u1234%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = base._get_shard_ranges(
req, 'a', 'c', wsgi_to_str('\xe1\x88\xb41_test'))
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=%E1%88%B41_test'], params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def _check_get_shard_ranges_bad_data(self, body):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
# empty response
headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(200, 200, body_iter=iter([b'', body]),
headers=headers):
actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
lines = self.app.logger.get_lines_for_level('error')
return lines
def test_get_shard_ranges_empty_body(self):
error_lines = self._check_get_shard_ranges_bad_data(b'')
self.assertIn('Problem with listing response', error_lines[0])
if six.PY2:
self.assertIn('No JSON', error_lines[0])
else:
self.assertIn('JSONDecodeError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_not_a_list(self):
body = json.dumps({}).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Problem with listing response', error_lines[0])
self.assertIn('not a list', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_key_missing(self):
body = json.dumps([{}]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('KeyError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_invalid_shard_range(self):
sr = ShardRange('a/c', Timestamp.now())
bad_sr_data = dict(sr, name='bad_name')
body = json.dumps([bad_sr_data]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('ValueError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_missing_record_type(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
body = json.dumps([dict(sr)]).encode('ascii')
with mocked_http_conn(
200, 200, body_iter=iter([b'', body])):
actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('unexpected record type', error_lines[0])
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_wrong_record_type(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
body = json.dumps([dict(sr)]).encode('ascii')
headers = {'X-Backend-Record-Type': 'object'}
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=headers):
actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('unexpected record type', error_lines[0])
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_request_failed(self):
base = Controller(self.app)
req = Request.blank('/v1/a/c/o', method='PUT')
with mocked_http_conn(200, 404, 404, 404):
actual, resp = base._get_shard_ranges(req, 'a', 'c', '1_test')
self.assertEqual(404, resp.status_int)
self.assertIsNone(actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
warning_lines = self.app.logger.get_lines_for_level('warning')
self.assertIn('Failed to get container listing', warning_lines[0])
self.assertIn('/a/c', warning_lines[0])
self.assertFalse(warning_lines[1:])
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestNodeIter(BaseTest):

View File

@ -41,7 +41,8 @@ import swift
from swift.common import utils, swob, exceptions
from swift.common.exceptions import ChunkWriteTimeout, ShortReadError, \
ChunkReadTimeout
from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter
from swift.common.utils import Timestamp, list_from_csv, md5, FileLikeIter, \
ShardRange
from swift.proxy import server as proxy_server
from swift.proxy.controllers import obj
from swift.proxy.controllers.base import \
@ -49,13 +50,13 @@ from swift.proxy.controllers.base import \
NodeIter
from swift.common.storage_policy import POLICIES, ECDriverError, \
StoragePolicy, ECStoragePolicy
from swift.common.swob import Request
from swift.common.swob import Request, wsgi_to_str
from test.debug_logger import debug_logger
from test.unit import (
FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus,
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, make_ec_object_stub,
fake_ec_node_response, StubResponse, mocked_http_conn,
quiet_eventlet_exceptions, FakeSource)
quiet_eventlet_exceptions, FakeSource, make_timestamp_iter)
from test.unit.proxy.test_server import node_error_count
@ -7178,5 +7179,173 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase):
self.assertEqual(range_headers, [None, 'bytes=8-'])
@patch_policies()
class TestGetUpdatingShardRanges(BaseObjectControllerMixin, unittest.TestCase):
def setUp(self):
super(TestGetUpdatingShardRanges, self).setUp()
self.ctrl = obj.BaseObjectController(self.app, 'a', 'c', 'o')
def test_get_shard_ranges_for_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), '%d_lower' % i,
'%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=1_test', 'states=updating'], params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def test_get_shard_ranges_for_utf8_object_put(self):
ts_iter = make_timestamp_iter()
shard_ranges = [dict(ShardRange(
'.sharded_a/sr%d' % i, next(ts_iter), u'\u1234%d_lower' % i,
u'\u1234%d_upper' % i, object_count=i, bytes_used=1024 * i,
meta_timestamp=next(ts_iter)))
for i in range(3)]
req = Request.blank('/v1/a/c/o', method='PUT')
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(
200, 200,
body_iter=iter([b'',
json.dumps(shard_ranges[1:2]).encode('ascii')]),
headers=resp_headers
) as fake_conn:
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', wsgi_to_str('\xe1\x88\xb41_test'))
self.assertEqual(200, resp.status_int)
# account info
captured = fake_conn.requests
self.assertEqual('HEAD', captured[0]['method'])
self.assertEqual('a', captured[0]['path'][7:])
# container GET
self.assertEqual('GET', captured[1]['method'])
self.assertEqual('a/c', captured[1]['path'][7:])
params = sorted(captured[1]['qs'].split('&'))
self.assertEqual(
['format=json', 'includes=%E1%88%B41_test', 'states=updating'],
params)
self.assertEqual(
'shard', captured[1]['headers'].get('X-Backend-Record-Type'))
self.assertEqual(shard_ranges[1:2], [dict(pr) for pr in actual])
self.assertFalse(self.app.logger.get_lines_for_level('error'))
def _check_get_shard_ranges_bad_data(self, body):
req = Request.blank('/v1/a/c/o', method='PUT')
# empty response
resp_headers = {'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(200, 200, body_iter=iter([b'', body]),
headers=resp_headers):
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
lines = self.app.logger.get_lines_for_level('error')
return lines
def test_get_shard_ranges_empty_body(self):
error_lines = self._check_get_shard_ranges_bad_data(b'')
self.assertIn('Problem with listing response', error_lines[0])
if six.PY2:
self.assertIn('No JSON', error_lines[0])
else:
self.assertIn('JSONDecodeError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_not_a_list(self):
body = json.dumps({}).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Problem with listing response', error_lines[0])
self.assertIn('not a list', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_key_missing(self):
body = json.dumps([{}]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('KeyError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_invalid_shard_range(self):
sr = ShardRange('a/c', Timestamp.now())
bad_sr_data = dict(sr, name='bad_name')
body = json.dumps([bad_sr_data]).encode('ascii')
error_lines = self._check_get_shard_ranges_bad_data(body)
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('ValueError', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_missing_record_type(self):
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
body = json.dumps([dict(sr)]).encode('ascii')
with mocked_http_conn(
200, 200, body_iter=iter([b'', body])):
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('unexpected record type', error_lines[0])
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_wrong_record_type(self):
req = Request.blank('/v1/a/c/o', method='PUT')
sr = ShardRange('a/c', Timestamp.now())
body = json.dumps([dict(sr)]).encode('ascii')
headers = {'X-Backend-Record-Type': 'object'}
with mocked_http_conn(
200, 200, body_iter=iter([b'', body]),
headers=headers):
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(200, resp.status_int)
self.assertIsNone(actual)
error_lines = self.app.logger.get_lines_for_level('error')
self.assertIn('Failed to get shard ranges', error_lines[0])
self.assertIn('unexpected record type', error_lines[0])
self.assertIn('/a/c', error_lines[0])
self.assertFalse(error_lines[1:])
def test_get_shard_ranges_request_failed(self):
req = Request.blank('/v1/a/c/o', method='PUT')
with mocked_http_conn(200, 404, 404, 404):
actual, resp = self.ctrl._get_updating_shard_ranges(
req, 'a', 'c', '1_test')
self.assertEqual(404, resp.status_int)
self.assertIsNone(actual)
self.assertFalse(self.app.logger.get_lines_for_level('error'))
warning_lines = self.app.logger.get_lines_for_level('warning')
self.assertIn('Failed to get container listing', warning_lines[0])
self.assertIn('/a/c', warning_lines[0])
self.assertFalse(warning_lines[1:])
if __name__ == '__main__':
unittest.main()