diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 31788be1a8..027182d83a 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -129,6 +129,13 @@ use = egg:swift#proxy # log_handoffs = true # recheck_account_existence = 60 # recheck_container_existence = 60 +# +# How long the proxy should cache a set of shard ranges for a container. +# Note that stale shard range info should be fine; updates will still +# eventually make their way to the correct shard. As a result, you can +# usually set this much higher than the existence checks above. +# recheck_updating_shard_ranges = 3600 +# # object_chunk_size = 65536 # client_chunk_size = 65536 # diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 3ddc8393ec..a941cc219b 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -45,7 +45,7 @@ from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.utils import Timestamp, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ GreenAsyncPile, quorum_size, parse_content_type, \ - document_iters_to_http_response_body, ShardRange + document_iters_to_http_response_body, ShardRange, find_shard_range from swift.common.bufferedhttp import http_connect from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ @@ -67,6 +67,7 @@ from swift.common.storage_policy import POLICIES DEFAULT_RECHECK_ACCOUNT_EXISTENCE = 60 # seconds DEFAULT_RECHECK_CONTAINER_EXISTENCE = 60 # seconds +DEFAULT_RECHECK_UPDATING_SHARD_RANGES = 3600 # seconds def update_headers(response, headers): @@ -443,7 +444,7 @@ def get_account_info(env, app, swift_source=None): return info -def get_cache_key(account, container=None, obj=None): +def get_cache_key(account, container=None, obj=None, shard=None): """ Get the keys for both memcache and env['swift.infocache'] (cache_key) where info about accounts, containers, and objects is cached @@ -451,6 +452,9 @@ def get_cache_key(account, container=None, obj=None): :param account: The name of the account :param container: The name of the container (or None if account) :param obj: The name of the object (or None if account or container) + :param shard: Sharding state for the container query; typically 'updating' + or 'listing' (Requires account and container; cannot use + with obj) :returns: a (native) string cache_key """ if six.PY2: @@ -468,7 +472,13 @@ def get_cache_key(account, container=None, obj=None): container = to_native(container) obj = to_native(obj) - if obj: + if shard: + if not (account and container): + raise ValueError('Shard cache key requires account and container') + if obj: + raise ValueError('Shard cache key cannot have obj') + cache_key = 'shard-%s/%s/%s' % (shard, account, container) + elif obj: if not (account and container): raise ValueError('Object cache key requires account and container') cache_key = 'object/%s/%s/%s' % (account, container, obj) @@ -2191,3 +2201,55 @@ class Controller(object): "Failed to get shard ranges from %s: invalid data: %r", req.path_qs, err) return None + + def _get_update_shard(self, req, account, container, obj): + """ + Find the appropriate shard range for an object update. + + Note that this fetches and caches (in both the per-request infocache + and memcache, if available) all shard ranges for the given root + container so we won't have to contact the container DB for every write. + + :param req: original Request instance. + :param account: account from which shard ranges should be fetched. + :param container: container from which shard ranges should be fetched. + :param obj: object getting updated. + :return: an instance of :class:`swift.common.utils.ShardRange`, + or None if the update should go back to the root + """ + if not self.app.recheck_updating_shard_ranges: + # caching is disabled; fall back to old behavior + shard_ranges = self._get_shard_ranges( + req, account, container, states='updating', includes=obj) + if not shard_ranges: + return None + return shard_ranges[0] + + cache_key = get_cache_key(account, container, shard='updating') + infocache = req.environ.setdefault('swift.infocache', {}) + memcache = getattr(self.app, 'memcache', None) or req.environ.get( + 'swift.cache') + + cached_ranges = infocache.get(cache_key) + if cached_ranges is None and memcache: + cached_ranges = memcache.get(cache_key) + + if cached_ranges: + shard_ranges = [ + ShardRange.from_dict(shard_range) + for shard_range in cached_ranges] + else: + shard_ranges = self._get_shard_ranges( + req, account, container, states='updating') + if shard_ranges: + cached_ranges = [dict(sr) for sr in shard_ranges] + # went to disk; cache it + if memcache: + memcache.set(cache_key, cached_ranges, + time=self.app.recheck_updating_shard_ranges) + + if not shard_ranges: + return None + + infocache[cache_key] = tuple(cached_ranges) + return find_shard_range(obj, shard_ranges) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index ffc543a666..8e6c475463 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -271,13 +271,12 @@ class BaseObjectController(Controller): # find the sharded container to which we'll send the update db_state = container_info.get('sharding_state', 'unsharded') if db_state in ('sharded', 'sharding'): - shard_ranges = self._get_shard_ranges( - req, self.account_name, self.container_name, - includes=self.object_name, states='updating') - if shard_ranges: + shard_range = self._get_update_shard( + req, self.account_name, self.container_name, self.object_name) + if shard_range: partition, nodes = self.app.container_ring.get_nodes( - shard_ranges[0].account, shard_ranges[0].container) - return partition, nodes, shard_ranges[0].name + shard_range.account, shard_range.container) + return partition, nodes, shard_range.name return container_info['partition'], container_info['nodes'], None diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 4415a82571..ed2320e28b 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -40,7 +40,8 @@ from swift.common.constraints import check_utf8, valid_api_version from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController from swift.proxy.controllers.base import get_container_info, NodeIter, \ - DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE + DEFAULT_RECHECK_CONTAINER_EXISTENCE, DEFAULT_RECHECK_ACCOUNT_EXISTENCE, \ + DEFAULT_RECHECK_UPDATING_SHARD_RANGES from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPMethodNotAllowed, HTTPNotFound, HTTPPreconditionFailed, \ HTTPServerError, HTTPException, Request, HTTPServiceUnavailable, \ @@ -202,6 +203,9 @@ class Application(object): self.recheck_container_existence = \ int(conf.get('recheck_container_existence', DEFAULT_RECHECK_CONTAINER_EXISTENCE)) + self.recheck_updating_shard_ranges = \ + int(conf.get('recheck_updating_shard_ranges', + DEFAULT_RECHECK_UPDATING_SHARD_RANGES)) self.recheck_account_existence = \ int(conf.get('recheck_account_existence', DEFAULT_RECHECK_ACCOUNT_EXISTENCE)) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 0befd604f0..6724e05e2e 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -20,15 +20,16 @@ import uuid from nose import SkipTest -from swift.common import direct_client +from swift.common import direct_client, utils +from swift.common.manager import Manager +from swift.common.memcached import MemcacheRing from swift.common.direct_client import DirectClientException from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ quorum_size, config_true_value, Timestamp from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING -from swift.common import utils -from swift.common.manager import Manager from swiftclient import client, get_auth, ClientException +from swift.proxy.controllers.base import get_cache_key from swift.proxy.controllers.obj import num_container_updates from test import annotate_failure from test.probe.brain import BrainSplitter @@ -116,6 +117,7 @@ class BaseTestContainerSharding(ReplProbeTest): self.brain.put_container(policy_index=int(self.policy)) self.sharders = Manager(['container-sharder']) self.internal_client = self.make_internal_client() + self.memcache = MemcacheRing(['127.0.0.1:11211']) def stop_container_servers(self, node_numbers=None): if node_numbers: @@ -835,6 +837,9 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing(more_obj_names + obj_names) self.assert_container_object_count(len(more_obj_names + obj_names)) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) # add another object that lands in the first of the new sub-shards self.put_objects(['alpha']) @@ -1217,6 +1222,10 @@ class TestContainerSharding(BaseTestContainerSharding): # now look up the shard target for subsequent updates self.assert_container_listing(obj_names) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) + # delete objects from first shard range first_shard_objects = [obj_name for obj_name in obj_names if obj_name <= orig_shard_ranges[0].upper] @@ -1243,6 +1252,11 @@ class TestContainerSharding(BaseTestContainerSharding): # to a GET for a redirect target, the object update will default to # being targeted at the root container self.stop_container_servers() + + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) + self.put_objects([beta]) self.brain.servers.start() async_pendings = self.gather_async_pendings( @@ -1746,6 +1760,9 @@ class TestContainerSharding(BaseTestContainerSharding): shard_part, shard_nodes = self.get_part_and_node_numbers( shard_ranges[1]) self.brain.servers.stop(number=shard_nodes[2]) + # Before writing, kill the cache + self.memcache.delete(get_cache_key( + self.account, self.container_name, shard='updating')) self.delete_objects(['alpha']) self.put_objects(['beta']) self.assert_container_listing(['beta']) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index f892fa15f1..ea5a5dcfd3 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -3631,12 +3631,13 @@ class TestReplicatedObjectController( StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), StoragePolicy(1, 'one', object_ring=FakeRing()), ]) - def test_backend_headers_update_shard_container(self): + def test_backend_headers_update_shard_container_no_cache(self): # verify that when container is sharded the backend container update is # directed to the shard container # reset the router post patch_policies self.app.obj_controller_router = proxy_server.ObjectControllerRouter() self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 0 def do_test(method, sharding_state): self.app.memcache.store = {} @@ -3686,7 +3687,8 @@ class TestReplicatedObjectController( container_request_shard = backend_requests[2] check_request( container_request_shard, method='GET', path='/sda/0/a/c', - params={'includes': 'o'}) + params={'includes': 'o', 'states': 'updating'}, + headers={'X-Backend-Record-Type': 'shard'}) # make sure backend requests included expected container headers container_headers = {} @@ -3720,6 +3722,219 @@ class TestReplicatedObjectController( do_test('PUT', 'sharding') do_test('PUT', 'sharded') + @patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), + ]) + def test_backend_headers_update_shard_container_with_empty_cache(self): + # verify that when container is sharded the backend container update is + # directed to the shard container + # reset the router post patch_policies + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + + def do_test(method, sharding_state): + self.app.memcache.store = {} + req = Request.blank('/v1/a/c/o', {}, method=method, body='', + headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, cont shard GET, obj POSTs + status_codes = (200, 200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + with mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + backend_requests = fake_conn.requests + + def check_request(req, method, path, headers=None, params=None): + self.assertEqual(method, req['method']) + # caller can ignore leading path parts + self.assertTrue(req['path'].endswith(path), + 'expected path to end with %s, it was %s' % ( + path, req['path'])) + headers = headers or {} + # caller can ignore some headers + for k, v in headers.items(): + self.assertEqual(req['headers'][k], v, + 'Expected %s but got %s for key %s' % + (v, req['headers'][k], k)) + params = params or {} + req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {} + for k, v in params.items(): + self.assertEqual(req_params[k], v, + 'Expected %s but got %s for key %s' % + (v, req_params[k], k)) + + account_request = backend_requests[0] + check_request(account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + check_request(container_request, method='HEAD', path='/sda/0/a/c') + container_request_shard = backend_requests[2] + check_request( + container_request_shard, method='GET', path='/sda/0/a/c', + params={'states': 'updating'}, + headers={'X-Backend-Record-Type': 'shard'}) + + cache_key = 'shard-updating/a/c' + self.assertIn(cache_key, self.app.memcache.store) + self.assertEqual(self.app.memcache.store[cache_key], + [dict(sr) for sr in shard_ranges]) + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual(req.environ['swift.infocache'][cache_key], + tuple(dict(sr) for sr in shard_ranges)) + + # make sure backend requests included expected container headers + container_headers = {} + + for request in backend_requests[3:]: + req_headers = request['headers'] + device = req_headers['x-container-device'] + container_headers[device] = req_headers['x-container-host'] + expectations = { + 'method': method, + 'path': '/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Container-Path': shard_ranges[1].name + }, + } + check_request(request, **expectations) + + expected = {} + for i, device in enumerate(['sda', 'sdb', 'sdc']): + expected[device] = '10.0.0.%d:100%d' % (i, i) + self.assertEqual(container_headers, expected) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + @patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), + ]) + def test_backend_headers_update_shard_container_with_live_cache(self): + # verify that when container is sharded the backend container update is + # directed to the shard container + # reset the router post patch_policies + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + + def do_test(method, sharding_state): + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + self.app.memcache.store = {'shard-updating/a/c': tuple( + dict(shard_range) for shard_range in shard_ranges)} + req = Request.blank('/v1/a/c/o', {}, method=method, body='', + headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, obj POSTs + status_codes = (200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + with mocked_http_conn(*status_codes, + headers=resp_headers) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + backend_requests = fake_conn.requests + + def check_request(req, method, path, headers=None, params=None): + self.assertEqual(method, req['method']) + # caller can ignore leading path parts + self.assertTrue(req['path'].endswith(path), + 'expected path to end with %s, it was %s' % ( + path, req['path'])) + headers = headers or {} + # caller can ignore some headers + for k, v in headers.items(): + self.assertEqual(req['headers'][k], v, + 'Expected %s but got %s for key %s' % + (v, req['headers'][k], k)) + params = params or {} + req_params = dict(parse_qsl(req['qs'])) if req['qs'] else {} + for k, v in params.items(): + self.assertEqual(req_params[k], v, + 'Expected %s but got %s for key %s' % + (v, req_params[k], k)) + + account_request = backend_requests[0] + check_request(account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + check_request(container_request, method='HEAD', path='/sda/0/a/c') + + # infocache gets populated from memcache + cache_key = 'shard-updating/a/c' + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual(req.environ['swift.infocache'][cache_key], + tuple(dict(sr) for sr in shard_ranges)) + + # make sure backend requests included expected container headers + container_headers = {} + + for request in backend_requests[2:]: + req_headers = request['headers'] + device = req_headers['x-container-device'] + container_headers[device] = req_headers['x-container-host'] + expectations = { + 'method': method, + 'path': '/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Container-Path': shard_ranges[1].name + }, + } + check_request(request, **expectations) + + expected = {} + for i, device in enumerate(['sda', 'sdb', 'sdc']): + expected[device] = '10.0.0.%d:100%d' % (i, i) + self.assertEqual(container_headers, expected) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + def test_DELETE(self): with save_globals(): def test_status_map(statuses, expected):