Merge "sharding: Cache shard ranges for object writes"

This commit is contained in:
Zuul 2019-07-13 00:34:06 +00:00 committed by Gerrit Code Review
commit cf18e1f47b
6 changed files with 319 additions and 15 deletions

View File

@ -129,6 +129,13 @@ use = egg:swift#proxy
# log_handoffs = true # log_handoffs = true
# recheck_account_existence = 60 # recheck_account_existence = 60
# recheck_container_existence = 60 # recheck_container_existence = 60
#
# How long the proxy should cache a set of shard ranges for a container.
# Note that stale shard range info should be fine; updates will still
# eventually make their way to the correct shard. As a result, you can
# usually set this much higher than the existence checks above.
# recheck_updating_shard_ranges = 3600
#
# object_chunk_size = 65536 # object_chunk_size = 65536
# client_chunk_size = 65536 # client_chunk_size = 65536
# #

View File

@ -45,7 +45,7 @@ from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, config_true_value, \ from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, \ GreenAsyncPile, quorum_size, parse_content_type, \
document_iters_to_http_response_body, ShardRange document_iters_to_http_response_body, ShardRange, find_shard_range
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common import constraints from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
@ -67,6 +67,7 @@ from swift.common.storage_policy import POLICIES
DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds
DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds
def update_headers(response, headers): def update_headers(response, headers):
@ -443,7 +444,7 @@ def get_account_info(env, app, swift_source=None):
return info return info
def get_cache_key(account, container=None, obj=None): def get_cache_key(account, container=None, obj=None, shard=None):
""" """
Get the keys for both memcache and env['swift.infocache'] (cache_key) Get the keys for both memcache and env['swift.infocache'] (cache_key)
where info about accounts, containers, and objects is cached where info about accounts, containers, and objects is cached
@ -451,6 +452,9 @@ def get_cache_key(account, container=None, obj=None):
:param account: The name of the account :param account: The name of the account
:param container: The name of the container (or None if account) :param container: The name of the container (or None if account)
:param obj: The name of the object (or None if account or container) :param obj: The name of the object (or None if account or container)
:param shard: Sharding state for the container query; typically 'updating'
or 'listing' (Requires account and container; cannot use
with obj)
:returns: a (native) string cache_key :returns: a (native) string cache_key
""" """
if six.PY2: if six.PY2:
@ -468,7 +472,13 @@ def get_cache_key(account, container=None, obj=None):
container = to_native(container) container = to_native(container)
obj = to_native(obj) obj = to_native(obj)
if obj: if shard:
if not (account and container):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
elif obj:
if not (account and container): if not (account and container):
raise ValueError('Object cache key requires account and container') raise ValueError('Object cache key requires account and container')
cache_key = 'object/%s/%s/%s' % (account, container, obj) cache_key = 'object/%s/%s/%s' % (account, container, obj)
@ -2191,3 +2201,55 @@ class Controller(object):
"Failed to get shard ranges from %s: invalid data: %r", "Failed to get shard ranges from %s: invalid data: %r",
req.path_qs, err) req.path_qs, err)
return None return None
def _get_update_shard(self, req, account, container, obj):
"""
Find the appropriate shard range for an object update.
Note that this fetches and caches (in both the per-request infocache
and memcache, if available) all shard ranges for the given root
container so we won't have to contact the container DB for every write.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
or None if the update should go back to the root
"""
if not self.app.recheck_updating_shard_ranges:
# caching is disabled; fall back to old behavior
shard_ranges = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
if not shard_ranges:
return None
return shard_ranges[0]
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = getattr(self.app, 'memcache', None) or req.environ.get(
'swift.cache')
cached_ranges = infocache.get(cache_key)
if cached_ranges is None and memcache:
cached_ranges = memcache.get(cache_key)
if cached_ranges:
shard_ranges = [
ShardRange.from_dict(shard_range)
for shard_range in cached_ranges]
else:
shard_ranges = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
cached_ranges = [dict(sr) for sr in shard_ranges]
# went to disk; cache it
if memcache:
memcache.set(cache_key, cached_ranges,
time=self.app.recheck_updating_shard_ranges)
if not shard_ranges:
return None
infocache[cache_key] = tuple(cached_ranges)
return find_shard_range(obj, shard_ranges)

View File

@ -271,13 +271,12 @@ class BaseObjectController(Controller):
# find the sharded container to which we'll send the update # find the sharded container to which we'll send the update
db_state = container_info.get('sharding_state', 'unsharded') db_state = container_info.get('sharding_state', 'unsharded')
if db_state in ('sharded', 'sharding'): if db_state in ('sharded', 'sharding'):
shard_ranges = self._get_shard_ranges( shard_range = self._get_update_shard(
req, self.account_name, self.container_name, req, self.account_name, self.container_name, self.object_name)
includes=self.object_name, states='updating') if shard_range:
if shard_ranges:
partition, nodes = self.app.container_ring.get_nodes( partition, nodes = self.app.container_ring.get_nodes(
shard_ranges[0].account, shard_ranges[0].container) shard_range.account, shard_range.container)
return partition, nodes, shard_ranges[0].name return partition, nodes, shard_range.name
return container_info['partition'], container_info['nodes'], None return container_info['partition'], container_info['nodes'], None

View File

@ -40,7 +40,8 @@ from swift.common.constraints import check_utf8, valid_api_version
from swift.proxy.controllers import AccountController, ContainerController, \ from swift.proxy.controllers import AccountController, ContainerController, \
ObjectControllerRouter, InfoController ObjectControllerRouter, InfoController
from swift.proxy.controllers.base import get_container_info, NodeIter, \ from swift.proxy.controllers.base import get_container_info, NodeIter, \
DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \
DEFAULT_RECHECK_UPDATING_SHARD_RANGES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ from swift.common.swob import HTTPBadRequest, HTTPForbidden, \
HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \
HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \
@ -202,6 +203,9 @@ class Application(object):
self.recheck_container_existence = \ self.recheck_container_existence = \
int(conf.get('recheck_container_existence', int(conf.get('recheck_container_existence',
DEFAULT_RECHECK_CONTAINER_EXISTENCE)) DEFAULT_RECHECK_CONTAINER_EXISTENCE))
self.recheck_updating_shard_ranges = \
int(conf.get('recheck_updating_shard_ranges',
DEFAULT_RECHECK_UPDATING_SHARD_RANGES))
self.recheck_account_existence = \ self.recheck_account_existence = \
int(conf.get('recheck_account_existence', int(conf.get('recheck_account_existence',
DEFAULT_RECHECK_ACCOUNT_EXISTENCE)) DEFAULT_RECHECK_ACCOUNT_EXISTENCE))

View File

@ -20,15 +20,16 @@ import uuid
from nose import SkipTest from nose import SkipTest
from swift.common import direct_client from swift.common import direct_client, utils
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.direct_client import DirectClientException from swift.common.direct_client import DirectClientException
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp quorum_size, config_true_value, Timestamp
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
from swift.common import utils
from swift.common.manager import Manager
from swiftclient import client, get_auth, ClientException from swiftclient import client, get_auth, ClientException
from swift.proxy.controllers.base import get_cache_key
from swift.proxy.controllers.obj import num_container_updates from swift.proxy.controllers.obj import num_container_updates
from test import annotate_failure from test import annotate_failure
from test.probe.brain import BrainSplitter from test.probe.brain import BrainSplitter
@ -116,6 +117,7 @@ class BaseTestContainerSharding(ReplProbeTest):
self.brain.put_container(policy_index=int(self.policy)) self.brain.put_container(policy_index=int(self.policy))
self.sharders = Manager(['container-sharder']) self.sharders = Manager(['container-sharder'])
self.internal_client = self.make_internal_client() self.internal_client = self.make_internal_client()
self.memcache = MemcacheRing(['127.0.0.1:11211'])
def stop_container_servers(self, node_numbers=None): def stop_container_servers(self, node_numbers=None):
if node_numbers: if node_numbers:
@ -835,6 +837,9 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing(more_obj_names + obj_names) self.assert_container_listing(more_obj_names + obj_names)
self.assert_container_object_count(len(more_obj_names + obj_names)) self.assert_container_object_count(len(more_obj_names + obj_names))
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
# add another object that lands in the first of the new sub-shards # add another object that lands in the first of the new sub-shards
self.put_objects(['alpha']) self.put_objects(['alpha'])
@ -1217,6 +1222,10 @@ class TestContainerSharding(BaseTestContainerSharding):
# now look up the shard target for subsequent updates # now look up the shard target for subsequent updates
self.assert_container_listing(obj_names) self.assert_container_listing(obj_names)
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
# delete objects from first shard range # delete objects from first shard range
first_shard_objects = [obj_name for obj_name in obj_names first_shard_objects = [obj_name for obj_name in obj_names
if obj_name <= orig_shard_ranges[0].upper] if obj_name <= orig_shard_ranges[0].upper]
@ -1243,6 +1252,11 @@ class TestContainerSharding(BaseTestContainerSharding):
# to a GET for a redirect target, the object update will default to # to a GET for a redirect target, the object update will default to
# being targeted at the root container # being targeted at the root container
self.stop_container_servers() self.stop_container_servers()
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
self.put_objects([beta]) self.put_objects([beta])
self.brain.servers.start() self.brain.servers.start()
async_pendings = self.gather_async_pendings( async_pendings = self.gather_async_pendings(
@ -1746,6 +1760,9 @@ class TestContainerSharding(BaseTestContainerSharding):
shard_part, shard_nodes = self.get_part_and_node_numbers( shard_part, shard_nodes = self.get_part_and_node_numbers(
shard_ranges[1]) shard_ranges[1])
self.brain.servers.stop(number=shard_nodes[2]) self.brain.servers.stop(number=shard_nodes[2])
# Before writing, kill the cache
self.memcache.delete(get_cache_key(
self.account, self.container_name, shard='updating'))
self.delete_objects(['alpha']) self.delete_objects(['alpha'])
self.put_objects(['beta']) self.put_objects(['beta'])
self.assert_container_listing(['beta']) self.assert_container_listing(['beta'])

View File

@ -3631,12 +3631,13 @@ class TestReplicatedObjectController(
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()), StoragePolicy(1, 'one', object_ring=FakeRing()),
]) ])
def test_backend_headers_update_shard_container(self): def test_backend_headers_update_shard_container_no_cache(self):
# verify that when container is sharded the backend container update is # verify that when container is sharded the backend container update is
# directed to the shard container # directed to the shard container
# reset the router post patch_policies # reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter() self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 0
def do_test(method, sharding_state): def do_test(method, sharding_state):
self.app.memcache.store = {} self.app.memcache.store = {}
@ -3686,7 +3687,8 @@ class TestReplicatedObjectController(
container_request_shard = backend_requests[2] container_request_shard = backend_requests[2]
check_request( check_request(
container_request_shard, method='GET', path='/sda/0/a/c', container_request_shard, method='GET', path='/sda/0/a/c',
params={'includes': 'o'}) params={'includes': 'o', 'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
# make sure backend requests included expected container headers # make sure backend requests included expected container headers
container_headers = {} container_headers = {}
@ -3720,6 +3722,219 @@ class TestReplicatedObjectController(
do_test('PUT', 'sharding') do_test('PUT', 'sharding')
do_test('PUT', 'sharded') do_test('PUT', 'sharded')
@patch_policies([
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_backend_headers_update_shard_container_with_empty_cache(self):
# verify that when container is sharded the backend container update is
# directed to the shard container
# reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 3600
def do_test(method, sharding_state):
self.app.memcache.store = {}
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
headers={'Content-Type': 'text/plain'})
# we want the container_info response to say policy index of 1 and
# sharding state
# acc HEAD, cont HEAD, cont shard GET, obj POSTs
status_codes = (200, 200, 200, 202, 202, 202)
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
shard_ranges = [
utils.ShardRange(
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
utils.ShardRange(
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
utils.ShardRange(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
body = json.dumps([
dict(shard_range)
for shard_range in shard_ranges]).encode('ascii')
with mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
backend_requests = fake_conn.requests
def check_request(req, method, path, headers=None, params=None):
self.assertEqual(method, req['method'])
# caller can ignore leading path parts
self.assertTrue(req['path'].endswith(path),
'expected path to end with %s, it was %s' % (
path, req['path']))
headers = headers or {}
# caller can ignore some headers
for k, v in headers.items():
self.assertEqual(req['headers'][k], v,
'Expected %s but got %s for key %s' %
(v, req['headers'][k], k))
params = params or {}
req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {}
for k, v in params.items():
self.assertEqual(req_params[k], v,
'Expected %s but got %s for key %s' %
(v, req_params[k], k))
account_request = backend_requests[0]
check_request(account_request, method='HEAD', path='/sda/0/a')
container_request = backend_requests[1]
check_request(container_request, method='HEAD', path='/sda/0/a/c')
container_request_shard = backend_requests[2]
check_request(
container_request_shard, method='GET', path='/sda/0/a/c',
params={'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
cache_key = 'shard-updating/a/c'
self.assertIn(cache_key, self.app.memcache.store)
self.assertEqual(self.app.memcache.store[cache_key],
[dict(sr) for sr in shard_ranges])
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
# make sure backend requests included expected container headers
container_headers = {}
for request in backend_requests[3:]:
req_headers = request['headers']
device = req_headers['x-container-device']
container_headers[device] = req_headers['x-container-host']
expectations = {
'method': method,
'path': '/0/a/c/o',
'headers': {
'X-Container-Partition': '0',
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
expected = {}
for i, device in enumerate(['sda', 'sdb', 'sdc']):
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')
do_test('DELETE', 'sharded')
do_test('PUT', 'sharding')
do_test('PUT', 'sharded')
@patch_policies([
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_backend_headers_update_shard_container_with_live_cache(self):
# verify that when container is sharded the backend container update is
# directed to the shard container
# reset the router post patch_policies
self.app.obj_controller_router = proxy_server.ObjectControllerRouter()
self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes
self.app.recheck_updating_shard_ranges = 3600
def do_test(method, sharding_state):
shard_ranges = [
utils.ShardRange(
'.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'),
utils.ShardRange(
'.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'),
utils.ShardRange(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
self.app.memcache.store = {'shard-updating/a/c': tuple(
dict(shard_range) for shard_range in shard_ranges)}
req = Request.blank('/v1/a/c/o', {}, method=method, body='',
headers={'Content-Type': 'text/plain'})
# we want the container_info response to say policy index of 1 and
# sharding state
# acc HEAD, cont HEAD, obj POSTs
status_codes = (200, 200, 202, 202, 202)
resp_headers = {'X-Backend-Storage-Policy-Index': 1,
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
with mocked_http_conn(*status_codes,
headers=resp_headers) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
backend_requests = fake_conn.requests
def check_request(req, method, path, headers=None, params=None):
self.assertEqual(method, req['method'])
# caller can ignore leading path parts
self.assertTrue(req['path'].endswith(path),
'expected path to end with %s, it was %s' % (
path, req['path']))
headers = headers or {}
# caller can ignore some headers
for k, v in headers.items():
self.assertEqual(req['headers'][k], v,
'Expected %s but got %s for key %s' %
(v, req['headers'][k], k))
params = params or {}
req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {}
for k, v in params.items():
self.assertEqual(req_params[k], v,
'Expected %s but got %s for key %s' %
(v, req_params[k], k))
account_request = backend_requests[0]
check_request(account_request, method='HEAD', path='/sda/0/a')
container_request = backend_requests[1]
check_request(container_request, method='HEAD', path='/sda/0/a/c')
# infocache gets populated from memcache
cache_key = 'shard-updating/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
# make sure backend requests included expected container headers
container_headers = {}
for request in backend_requests[2:]:
req_headers = request['headers']
device = req_headers['x-container-device']
container_headers[device] = req_headers['x-container-host']
expectations = {
'method': method,
'path': '/0/a/c/o',
'headers': {
'X-Container-Partition': '0',
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
expected = {}
for i, device in enumerate(['sda', 'sdb', 'sdc']):
expected[device] = '10.0.0.%d:100%d' % (i, i)
self.assertEqual(container_headers, expected)
do_test('POST', 'sharding')
do_test('POST', 'sharded')
do_test('DELETE', 'sharding')
do_test('DELETE', 'sharded')
do_test('PUT', 'sharding')
do_test('PUT', 'sharded')
def test_DELETE(self): def test_DELETE(self):
with save_globals(): with save_globals():
def test_status_map(statuses, expected): def test_status_map(statuses, expected):