From a1af3811a79963e2e5d1db3e5588cbc9748f9d57 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Mon, 24 Jun 2019 12:25:33 -0700 Subject: [PATCH] sharding: Cache shard ranges for object writes Previously, we issued a GET to the root container for every object PUT, POST, and DELETE. This puts load on the container server, potentially leading to timeouts, error limiting, and erroneous 404s (!). Now, cache the complete set of 'updating' shards, and find the shard for this particular update in the proxy. Add a new config option, recheck_updating_shard_ranges, to control the cache time; it defaults to one hour. Set to 0 to fall back to previous behavior. Note that we should be able to tolerate stale shard data just fine; we already have to worry about async pendings that got written down with one shard but may not get processed until that shard has itself sharded or shrunk into another shard. Also note that memcache has a default value limit of 1MiB, which may be exceeded if a container has thousands of shards. In that case, set() will act like a delete(), causing increased memcache churn but otherwise preserving existing behavior. In the future, we may want to add support for gzipping the cached shard ranges as they should compress well. Change-Id: Ic7a732146ea19a47669114ad5dbee0bacbe66919 Closes-Bug: 1781291 --- etc/proxy-server.conf-sample | 7 + swift/proxy/controllers/base.py | 68 +++++++++- swift/proxy/controllers/obj.py | 11 +- swift/proxy/server.py | 6 +- test/probe/test_sharder.py | 23 +++- test/unit/proxy/test_server.py | 219 +++++++++++++++++++++++++++++++- 6 files changed, 319 insertions(+), 15 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index bffa2391a5..fe2d9b0939 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -129,6 +129,13 @@ use = egg:swift#proxy # log_handoffs = true # recheck_account_existence = 60 # recheck_container_existence = 60 +# +# How long the proxy should cache a set of shard ranges for a container. +# Note that stale shard range info should be fine; updates will still +# eventually make their way to the correct shard. As a result, you can +# usually set this much higher than the existence checks above. +# recheck_updating_shard_ranges = 3600 +# # object_chunk_size = 65536 # client_chunk_size = 65536 # diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 3ddc8393ec..a941cc219b 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -45,7 +45,7 @@ from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.utils import Timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ GreenAsyncPile, quorum_size, parse_content_type, \ - document_iters_to_http_response_body, ShardRange + document_iters_to_http_response_body, ShardRange, find_shard_range from swift.common.bufferedhttp import http_connect from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ @@ -67,6 +67,7 @@ from swift.common.storage_policy import POLICIES DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds +DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds def update_headers(response, headers): @@ -443,7 +444,7 @@ def get_account_info(env, app, swift_source=None): return info -def get_cache_key(account, container=None, obj=None): +def get_cache_key(account, container=None, obj=None, shard=None): """ Get the keys for both memcache and env['swift.infocache'] (cache_key) where info about accounts, containers, and objects is cached @@ -451,6 +452,9 @@ def get_cache_key(account, container=None, obj=None): :param account: The name of the account :param container: The name of the container (or None if account) :param obj: The name of the object (or None if account or container) + :param shard: Sharding state for the container query; typically 'updating' + or 'listing' (Requires account and container; cannot use + with obj) :returns: a (native) string cache_key """ if six.PY2: @@ -468,7 +472,13 @@ def get_cache_key(account, container=None, obj=None): container = to_native(container) obj = to_native(obj) - if obj: + if shard: + if not (account and container): + raise ValueError('Shard cache key requires account and container') + if obj: + raise ValueError('Shard cache key cannot have obj') + cache_key = 'shard-%s/%s/%s' % (shard, account, container) + elif obj: if not (account and container): raise ValueError('Object cache key requires account and container') cache_key = 'object/%s/%s/%s' % (account, container, obj) @@ -2191,3 +2201,55 @@ class Controller(object): "Failed to get shard ranges from %s: invalid data: %r", req.path_qs, err) return None + + def _get_update_shard(self, req, account, container, obj): + """ + Find the appropriate shard range for an object update. + + Note that this fetches and caches (in both the per-request infocache + and memcache, if available) all shard ranges for the given root + container so we won't have to contact the container DB for every write. + + :param req: original Request instance. + :param account: account from which shard ranges should be fetched. + :param container: container from which shard ranges should be fetched. + :param obj: object getting updated. + :return: an instance of :class:`swift.common.utils.ShardRange`, + or None if the update should go back to the root + """ + if not self.app.recheck_updating_shard_ranges: + # caching is disabled; fall back to old behavior + shard_ranges = self._get_shard_ranges( + req, account, container, states='updating', includes=obj) + if not shard_ranges: + return None + return shard_ranges[0] + + cache_key = get_cache_key(account, container, shard='updating') + infocache = req.environ.setdefault('swift.infocache', {}) + memcache = getattr(self.app, 'memcache', None) or req.environ.get( + 'swift.cache') + + cached_ranges = infocache.get(cache_key) + if cached_ranges is None and memcache: + cached_ranges = memcache.get(cache_key) + + if cached_ranges: + shard_ranges = [ + ShardRange.from_dict(shard_range) + for shard_range in cached_ranges] + else: + shard_ranges = self._get_shard_ranges( + req, account, container, states='updating') + if shard_ranges: + cached_ranges = [dict(sr) for sr in shard_ranges] + # went to disk; cache it + if memcache: + memcache.set(cache_key, cached_ranges, + time=self.app.recheck_updating_shard_ranges) + + if not shard_ranges: + return None + + infocache[cache_key] = tuple(cached_ranges) + return find_shard_range(obj, shard_ranges) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index ffc543a666..8e6c475463 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -271,13 +271,12 @@ class BaseObjectController(Controller): # find the sharded container to which we'll send the update db_state = container_info.get('sharding_state', 'unsharded') if db_state in ('sharded', 'sharding'): - shard_ranges = self._get_shard_ranges( - req, self.account_name, self.container_name, - includes=self.object_name, states='updating') - if shard_ranges: + shard_range = self._get_update_shard( + req, self.account_name, self.container_name, self.object_name) + if shard_range: partition, nodes = self.app.container_ring.get_nodes( - shard_ranges[0].account, shard_ranges[0].container) - return partition, nodes, shard_ranges[0].name + shard_range.account, shard_range.container) + return partition, nodes, shard_range.name return container_info['partition'], container_info['nodes'], None diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 4415a82571..ed2320e28b 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -40,7 +40,8 @@ from swift.common.constraints import check_utf8, valid_api_version from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController from swift.proxy.controllers.base import get_container_info, NodeIter, \ - DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE + DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \ + DEFAULT_RECHECK_UPDATING_SHARD_RANGES from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \ @@ -202,6 +203,9 @@ class Application(object): self.recheck_container_existence = \ int(conf.get('recheck_container_existence', DEFAULT_RECHECK_CONTAINER_EXISTENCE)) + self.recheck_updating_shard_ranges = \ + int(conf.get('recheck_updating_shard_ranges', + DEFAULT_RECHECK_UPDATING_SHARD_RANGES)) self.recheck_account_existence = \ int(conf.get('recheck_account_existence', DEFAULT_RECHECK_ACCOUNT_EXISTENCE)) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 0befd604f0..6724e05e2e 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -20,15 +20,16 @@ import uuid from nose import SkipTest -from swift.common import direct_client +from swift.common import direct_client, utils +from swift.common.manager import Manager +from swift.common.memcached import MemcacheRing from swift.common.direct_client import DirectClientException from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ quorum_size, config_true_value, Timestamp from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING -from swift.common import utils -from swift.common.manager import Manager from swiftclient import client, get_auth, ClientException +from swift.proxy.controllers.base import get_cache_key from swift.proxy.controllers.obj import num_container_updates from test import annotate_failure from test.probe.brain import BrainSplitter @@ -116,6 +117,7 @@ class BaseTestContainerSharding(ReplProbeTest): self.brain.put_container(policy_index=int(self.policy)) self.sharders = Manager(['container-sharder']) self.internal_client = self.make_internal_client() + self.memcache = MemcacheRing(['127.0.0.1:11211']) def stop_container_servers(self, node_numbers=None): if node_numbers: @@ -835,6 +837,9 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing(more_obj_names + obj_names) self.assert_container_object_count(len(more_obj_names + obj_names)) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) # add another object that lands in the first of the new sub-shards self.put_objects(['alpha']) @@ -1217,6 +1222,10 @@ class TestContainerSharding(BaseTestContainerSharding): # now look up the shard target for subsequent updates self.assert_container_listing(obj_names) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) + # delete objects from first shard range first_shard_objects = [obj_name for obj_name in obj_names if obj_name <= orig_shard_ranges[0].upper] @@ -1243,6 +1252,11 @@ class TestContainerSharding(BaseTestContainerSharding): # to a GET for a redirect target, the object update will default to # being targeted at the root container self.stop_container_servers() + + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) + self.put_objects([beta]) self.brain.servers.start() async_pendings = self.gather_async_pendings( @@ -1746,6 +1760,9 @@ class TestContainerSharding(BaseTestContainerSharding): shard_part, shard_nodes = self.get_part_and_node_numbers( shard_ranges[1]) self.brain.servers.stop(number=shard_nodes[2]) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) self.delete_objects(['alpha']) self.put_objects(['beta']) self.assert_container_listing(['beta']) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index f892fa15f1..ea5a5dcfd3 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -3631,12 +3631,13 @@ class TestReplicatedObjectController( StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), StoragePolicy(1, 'one', object_ring=FakeRing()), ]) - def test_backend_headers_update_shard_container(self): + def test_backend_headers_update_shard_container_no_cache(self): # verify that when container is sharded the backend container update is # directed to the shard container # reset the router post patch_policies self.app.obj_controller_router = proxy_server.ObjectControllerRouter() self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 0 def do_test(method, sharding_state): self.app.memcache.store = {} @@ -3686,7 +3687,8 @@ class TestReplicatedObjectController( container_request_shard = backend_requests[2] check_request( container_request_shard, method='GET', path='/sda/0/a/c', - params={'includes': 'o'}) + params={'includes': 'o', 'states': 'updating'}, + headers={'X-Backend-Record-Type': 'shard'}) # make sure backend requests included expected container headers container_headers = {} @@ -3720,6 +3722,219 @@ class TestReplicatedObjectController( do_test('PUT', 'sharding') do_test('PUT', 'sharded') + @patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), + ]) + def test_backend_headers_update_shard_container_with_empty_cache(self): + # verify that when container is sharded the backend container update is + # directed to the shard container + # reset the router post patch_policies + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + + def do_test(method, sharding_state): + self.app.memcache.store = {} + req = Request.blank('/v1/a/c/o', {}, method=method, body='', + headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, cont shard GET, obj POSTs + status_codes = (200, 200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + with mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + backend_requests = fake_conn.requests + + def check_request(req, method, path, headers=None, params=None): + self.assertEqual(method, req['method']) + # caller can ignore leading path parts + self.assertTrue(req['path'].endswith(path), + 'expected path to end with %s, it was %s' % ( + path, req['path'])) + headers = headers or {} + # caller can ignore some headers + for k, v in headers.items(): + self.assertEqual(req['headers'][k], v, + 'Expected %s but got %s for key %s' % + (v, req['headers'][k], k)) + params = params or {} + req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {} + for k, v in params.items(): + self.assertEqual(req_params[k], v, + 'Expected %s but got %s for key %s' % + (v, req_params[k], k)) + + account_request = backend_requests[0] + check_request(account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + check_request(container_request, method='HEAD', path='/sda/0/a/c') + container_request_shard = backend_requests[2] + check_request( + container_request_shard, method='GET', path='/sda/0/a/c', + params={'states': 'updating'}, + headers={'X-Backend-Record-Type': 'shard'}) + + cache_key = 'shard-updating/a/c' + self.assertIn(cache_key, self.app.memcache.store) + self.assertEqual(self.app.memcache.store[cache_key], + [dict(sr) for sr in shard_ranges]) + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual(req.environ['swift.infocache'][cache_key], + tuple(dict(sr) for sr in shard_ranges)) + + # make sure backend requests included expected container headers + container_headers = {} + + for request in backend_requests[3:]: + req_headers = request['headers'] + device = req_headers['x-container-device'] + container_headers[device] = req_headers['x-container-host'] + expectations = { + 'method': method, + 'path': '/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Container-Path': shard_ranges[1].name + }, + } + check_request(request, **expectations) + + expected = {} + for i, device in enumerate(['sda', 'sdb', 'sdc']): + expected[device] = '10.0.0.%d:100%d' % (i, i) + self.assertEqual(container_headers, expected) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + @patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), + ]) + def test_backend_headers_update_shard_container_with_live_cache(self): + # verify that when container is sharded the backend container update is + # directed to the shard container + # reset the router post patch_policies + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + + def do_test(method, sharding_state): + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + self.app.memcache.store = {'shard-updating/a/c': tuple( + dict(shard_range) for shard_range in shard_ranges)} + req = Request.blank('/v1/a/c/o', {}, method=method, body='', + headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, obj POSTs + status_codes = (200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + with mocked_http_conn(*status_codes, + headers=resp_headers) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + backend_requests = fake_conn.requests + + def check_request(req, method, path, headers=None, params=None): + self.assertEqual(method, req['method']) + # caller can ignore leading path parts + self.assertTrue(req['path'].endswith(path), + 'expected path to end with %s, it was %s' % ( + path, req['path'])) + headers = headers or {} + # caller can ignore some headers + for k, v in headers.items(): + self.assertEqual(req['headers'][k], v, + 'Expected %s but got %s for key %s' % + (v, req['headers'][k], k)) + params = params or {} + req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {} + for k, v in params.items(): + self.assertEqual(req_params[k], v, + 'Expected %s but got %s for key %s' % + (v, req_params[k], k)) + + account_request = backend_requests[0] + check_request(account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + check_request(container_request, method='HEAD', path='/sda/0/a/c') + + # infocache gets populated from memcache + cache_key = 'shard-updating/a/c' + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual(req.environ['swift.infocache'][cache_key], + tuple(dict(sr) for sr in shard_ranges)) + + # make sure backend requests included expected container headers + container_headers = {} + + for request in backend_requests[2:]: + req_headers = request['headers'] + device = req_headers['x-container-device'] + container_headers[device] = req_headers['x-container-host'] + expectations = { + 'method': method, + 'path': '/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Container-Path': shard_ranges[1].name + }, + } + check_request(request, **expectations) + + expected = {} + for i, device in enumerate(['sda', 'sdb', 'sdc']): + expected[device] = '10.0.0.%d:100%d' % (i, i) + self.assertEqual(container_headers, expected) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + def test_DELETE(self): with save_globals(): def test_status_map(statuses, expected):