proxy: refactor to share namespace cache helpers

Create new helper functions to set and get namespaces in cache. Use
these in both the object and container controllers when caching
namespaces for updating and listing state shard ranges respectively.

Add unit tests for the new helper functions.

No intentional behavioural changes.

Change-Id: I6833ec64540fa19f658f0ee78952ecb43b49f169
This commit is contained in:
Alistair Coles 2023-11-17 12:12:51 +00:00
parent a52e18e005
commit 72ac5b3be0
6 changed files with 238 additions and 142 deletions

View File

@ -39,12 +39,13 @@ from sys import exc_info
from eventlet.timeout import Timeout
import six
from swift.common.memcached import MemcacheConnectionError
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, cache_from_env, \
CooperativeIterator
CooperativeIterator, NamespaceBoundList
from swift.common.bufferedhttp import http_connect
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
@ -889,6 +890,75 @@ def _get_info_from_caches(app, env, account, container=None):
return info, cache_state
def get_namespaces_from_cache(req, cache_key, skip_chance):
"""
Get cached namespaces from infocache or memcache.
:param req: a :class:`swift.common.swob.Request` object.
:param cache_key: the cache key for both infocache and memcache.
:param skip_chance: the probability of skipping the memcache look-up.
:return: a tuple of
(:class:`swift.common.utils.NamespaceBoundList`, cache state)
"""
# try get namespaces from infocache first
infocache = req.environ.setdefault('swift.infocache', {})
ns_bound_list = infocache.get(cache_key)
if ns_bound_list:
return ns_bound_list, 'infocache_hit'
# then try get them from memcache
memcache = cache_from_env(req.environ, True)
if not memcache:
return None, 'disabled'
if skip_chance and random.random() < skip_chance:
return None, 'skip'
try:
bounds = memcache.get(cache_key, raise_on_error=True)
cache_state = 'hit' if bounds else 'miss'
except MemcacheConnectionError:
bounds = None
cache_state = 'error'
if bounds:
if six.PY2:
# json.loads() in memcache.get will convert json 'string' to
# 'unicode' with python2, here we cast 'unicode' back to 'str'
bounds = [
[lower.encode('utf-8'), name.encode('utf-8')]
for lower, name in bounds]
ns_bound_list = NamespaceBoundList(bounds)
infocache[cache_key] = ns_bound_list
else:
ns_bound_list = None
return ns_bound_list, cache_state
def set_namespaces_in_cache(req, cache_key, ns_bound_list, time):
"""
Set a list of namespace bounds in infocache and memcache.
:param req: a :class:`swift.common.swob.Request` object.
:param cache_key: the cache key for both infocache and memcache.
:param ns_bound_list: a :class:`swift.common.utils.NamespaceBoundList`.
:param time: how long the namespaces should remain in memcache.
:return: the cache_state.
"""
infocache = req.environ.setdefault('swift.infocache', {})
infocache[cache_key] = ns_bound_list
memcache = cache_from_env(req.environ, True)
if memcache and ns_bound_list:
try:
memcache.set(cache_key, ns_bound_list.bounds, time=time,
raise_on_error=True)
except MemcacheConnectionError:
cache_state = 'set_error'
else:
cache_state = 'set'
else:
cache_state = 'disabled'
return cache_state
def _prepare_pre_auth_info_request(env, path, swift_source):
"""
Prepares a pre authed request to obtain info using a HEAD.

View File

@ -14,12 +14,10 @@
# limitations under the License.
import json
import random
import six
from six.moves.urllib.parse import unquote
from swift.common.memcached import MemcacheConnectionError
from swift.common.utils import public, private, csv_append, Timestamp, \
config_true_value, ShardRange, cache_from_env, filter_namespaces, \
NamespaceBoundList
@ -30,7 +28,7 @@ from swift.common.request_helpers import get_sys_meta_prefix, get_param, \
from swift.proxy.controllers.base import Controller, delay_denial, NodeIter, \
cors_validation, set_info_cache, clear_info_cache, get_container_info, \
record_cache_op_metrics, get_cache_key, headers_from_container_info, \
update_headers
update_headers, set_namespaces_in_cache, get_namespaces_from_cache
from swift.common.storage_policy import POLICIES
from swift.common.swob import HTTPBadRequest, HTTPForbidden, HTTPNotFound, \
HTTPServiceUnavailable, str_to_wsgi, wsgi_to_str, Response
@ -147,48 +145,14 @@ class ContainerController(Controller):
:return: a tuple comprising (an instance of ``swob.Response``or
``None`` if no namespaces were found in cache, the cache state).
"""
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cache_key = get_cache_key(self.account_name,
self.container_name,
cache_key = get_cache_key(self.account_name, self.container_name,
shard='listing')
resp_body = None
ns_bound_list = infocache.get(cache_key)
skip_chance = self.app.container_listing_shard_ranges_skip_cache
ns_bound_list, cache_state = get_namespaces_from_cache(
req, cache_key, skip_chance)
if ns_bound_list:
cache_state = 'infocache_hit'
resp_body = self._make_namespaces_response_body(req, ns_bound_list)
elif memcache:
skip_chance = \
self.app.container_listing_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
cache_state = 'skip'
else:
try:
cached_namespaces = memcache.get(
cache_key, raise_on_error=True)
if cached_namespaces:
cache_state = 'hit'
if six.PY2:
# json.loads() in memcache.get will convert json
# 'string' to 'unicode' with python2, here we cast
# 'unicode' back to 'str'
cached_namespaces = [
[lower.encode('utf-8'), name.encode('utf-8')]
for lower, name in cached_namespaces]
ns_bound_list = NamespaceBoundList(cached_namespaces)
resp_body = self._make_namespaces_response_body(
req, ns_bound_list)
else:
cache_state = 'miss'
except MemcacheConnectionError:
cache_state = 'error'
if resp_body is None:
resp = None
else:
# shard ranges can be returned from cache
infocache[cache_key] = ns_bound_list
resp_body = self._make_namespaces_response_body(req, ns_bound_list)
self.logger.debug('Found %d shards in cache for %s',
len(ns_bound_list.bounds), req.path_qs)
headers.update({'x-backend-record-type': 'shard',
@ -202,6 +166,8 @@ class ContainerController(Controller):
resp.environ['swift_x_timestamp'] = headers.get('x-timestamp')
resp.accept_ranges = 'bytes'
resp.content_type = 'application/json'
else:
resp = None
return resp, cache_state
@ -233,17 +199,15 @@ class ContainerController(Controller):
if resp.headers.get('x-backend-sharding-state') == 'sharded':
# cache in infocache even if no shard ranges returned; this
# is unexpected but use that result for this request
infocache = req.environ.setdefault('swift.infocache', {})
cache_key = get_cache_key(
self.account_name, self.container_name, shard='listing')
infocache[cache_key] = ns_bound_list
memcache = cache_from_env(req.environ, True)
if memcache and ns_bound_list:
# cache in memcache only if shard ranges as expected
self.logger.info('Caching listing shards for %s (%d shards)',
set_cache_state = set_namespaces_in_cache(
req, cache_key, ns_bound_list,
self.app.recheck_listing_shard_ranges)
if set_cache_state == 'set':
self.logger.info(
'Caching listing namespaces for %s (%d namespaces)',
cache_key, len(ns_bound_list.bounds))
memcache.set(cache_key, ns_bound_list.bounds,
time=self.app.recheck_listing_shard_ranges)
return ns_bound_list
def _get_shard_ranges_from_backend(self, req):

View File

@ -48,8 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
ShardRange, find_namespace, cache_from_env, NamespaceBoundList,
CooperativeIterator)
find_namespace, NamespaceBoundList, CooperativeIterator, ShardRange)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@ -64,13 +63,13 @@ from swift.common.http import (
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY,
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND)
from swift.common.memcached import MemcacheConnectionError
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
ECDriverError, PolicyError)
from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, update_headers, bytes_to_skip, ByteCountEnforcer, \
record_cache_op_metrics, get_cache_key, GetterBase, GetterSource, \
is_good_source, NodeIter
is_good_source, NodeIter, get_namespaces_from_cache, \
set_namespaces_in_cache
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
@ -282,48 +281,6 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _get_cached_updating_namespaces(
self, infocache, memcache, cache_key):
"""
Fetch cached updating namespaces of updating shard ranges from
infocache and memcache.
:param infocache: the infocache instance.
:param memcache: an instance of a memcache client,
:class:`swift.common.memcached.MemcacheRing`.
:param cache_key: the cache key for both infocache and memcache.
:return: a tuple of (an instance of NamespaceBoundList, cache state)
"""
# try get namespaces from infocache first
namespace_list = infocache.get(cache_key)
if namespace_list:
return namespace_list, 'infocache_hit'
# then try get them from memcache
if not memcache:
return None, 'disabled'
skip_chance = self.app.container_updating_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
return None, 'skip'
try:
namespaces = memcache.get(cache_key, raise_on_error=True)
cache_state = 'hit' if namespaces else 'miss'
except MemcacheConnectionError:
namespaces = None
cache_state = 'error'
if namespaces:
if six.PY2:
# json.loads() in memcache.get will convert json 'string' to
# 'unicode' with python2, here we cast 'unicode' back to 'str'
namespaces = [
[lower.encode('utf-8'), name.encode('utf-8')]
for lower, name in namespaces]
namespace_list = NamespaceBoundList(namespaces)
else:
namespace_list = None
return namespace_list, cache_state
def _get_update_shard_caching_disabled(self, req, account, container, obj):
"""
Fetch all updating shard ranges for the given root container when
@ -345,25 +302,6 @@ class BaseObjectController(Controller):
# there will be only one shard range in the list if any
return shard_ranges[0] if shard_ranges else None
def _cache_update_namespaces(self, memcache, cache_key, namespaces):
if not memcache:
return
self.logger.info(
'Caching updating shards for %s (%d shards)',
cache_key, len(namespaces.bounds))
try:
memcache.set(
cache_key, namespaces.bounds,
time=self.app.recheck_updating_shard_ranges,
raise_on_error=True)
cache_state = 'set'
except MemcacheConnectionError:
cache_state = 'set_error'
finally:
record_cache_op_metrics(self.logger, self.server_type.lower(),
'shard_updating', cache_state, None)
def _get_update_shard(self, req, account, container, obj):
"""
Find the appropriate shard range for an object update.
@ -387,14 +325,12 @@ class BaseObjectController(Controller):
# caching is enabled, try to get from caches
response = None
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cached_namespaces, cache_state = self._get_cached_updating_namespaces(
infocache, memcache, cache_key)
if cached_namespaces:
skip_chance = self.app.container_updating_shard_ranges_skip_cache
ns_bound_list, get_cache_state = get_namespaces_from_cache(
req, cache_key, skip_chance)
if ns_bound_list:
# found cached namespaces in either infocache or memcache
infocache[cache_key] = cached_namespaces
namespace = cached_namespaces.get_namespace(obj)
namespace = ns_bound_list.get_namespace(obj)
update_shard = ShardRange(
name=namespace.name, timestamp=0, lower=namespace.lower,
upper=namespace.upper)
@ -405,13 +341,21 @@ class BaseObjectController(Controller):
if shard_ranges:
# only store the list of namespace lower bounds and names into
# infocache and memcache.
namespaces = NamespaceBoundList.parse(shard_ranges)
infocache[cache_key] = namespaces
self._cache_update_namespaces(memcache, cache_key, namespaces)
ns_bound_list = NamespaceBoundList.parse(shard_ranges)
set_cache_state = set_namespaces_in_cache(
req, cache_key, ns_bound_list,
self.app.recheck_updating_shard_ranges)
record_cache_op_metrics(
self.logger, self.server_type.lower(), 'shard_updating',
set_cache_state, None)
if set_cache_state == 'set':
self.logger.info(
'Caching updating shards for %s (%d shards)',
cache_key, len(shard_ranges))
update_shard = find_namespace(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, self.server_type.lower(), 'shard_updating',
cache_state, response)
get_cache_state, response)
return update_shard
def _get_update_target(self, req, container_info):

View File

@ -410,6 +410,7 @@ class FakeMemcache(object):
def __init__(self, error_on_set=None, error_on_get=None):
self.store = {}
self.times = {}
self.calls = []
self.error_on_incr = False
self.error_on_get = error_on_get or []
@ -440,6 +441,7 @@ class FakeMemcache(object):
else:
assert isinstance(value, (str, bytes))
self.store[key] = value
self.times[key] = time
return True
@track
@ -463,12 +465,14 @@ class FakeMemcache(object):
def delete(self, key):
try:
del self.store[key]
del self.times[key]
except Exception:
pass
return True
def delete_all(self):
self.store.clear()
self.times.clear()
# This decorator only makes sense in the context of FakeMemcache;

View File

@ -29,12 +29,13 @@ from swift.proxy.controllers.base import headers_to_container_info, \
get_cache_key, get_account_info, get_info, get_object_info, \
Controller, GetOrHeadHandler, bytes_to_skip, clear_info_cache, \
set_info_cache, NodeIter, headers_from_container_info, \
record_cache_op_metrics, GetterSource
record_cache_op_metrics, GetterSource, get_namespaces_from_cache, \
set_namespaces_in_cache
from swift.common.swob import Request, HTTPException, RESPONSE_REASONS, \
bytes_to_wsgi, wsgi_to_str
from swift.common import exceptions
from swift.common.utils import split_path, ShardRange, Timestamp, \
GreenthreadSafeIterator, GreenAsyncPile
GreenthreadSafeIterator, GreenAsyncPile, NamespaceBoundList
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy, StoragePolicyCollection
@ -181,8 +182,8 @@ class FakeCache(FakeMemcache):
# Fake a json roundtrip
self.stub = json.loads(json.dumps(stub))
def get(self, key):
return self.stub or self.store.get(key)
def get(self, key, raise_on_error=False):
return self.stub or super(FakeCache, self).get(key, raise_on_error)
class BaseTest(unittest.TestCase):
@ -202,6 +203,120 @@ class BaseTest(unittest.TestCase):
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
class TestFuncs(BaseTest):
def test_get_namespaces_from_cache_disabled(self):
cache_key = 'shard-updating-v2/a/c/'
req = Request.blank('a/c')
actual = get_namespaces_from_cache(req, cache_key, 0)
self.assertEqual((None, 'disabled'), actual)
def test_get_namespaces_from_cache_miss(self):
cache_key = 'shard-updating-v2/a/c/'
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
actual = get_namespaces_from_cache(req, cache_key, 0)
self.assertEqual((None, 'miss'), actual)
def test_get_namespaces_from_cache_infocache_hit(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list1 = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
ns_bound_list2 = NamespaceBoundList([['', 'sr3'], ['t', 'sr4']])
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
req.environ['swift.infocache'] = {cache_key: ns_bound_list1}
# memcache ignored if infocache hits
self.cache.set(cache_key, ns_bound_list2.bounds)
actual = get_namespaces_from_cache(req, cache_key, 0)
self.assertEqual((ns_bound_list1, 'infocache_hit'), actual)
def test_get_namespaces_from_cache_hit(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr3'], ['t', 'sr4']])
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
req.environ['swift.infocache'] = {}
self.cache.set(cache_key, ns_bound_list.bounds)
actual = get_namespaces_from_cache(req, 'shard-updating-v2/a/c/', 0)
self.assertEqual((ns_bound_list, 'hit'), actual)
self.assertEqual({cache_key: ns_bound_list},
req.environ['swift.infocache'])
def test_get_namespaces_from_cache_skips(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
self.cache.set(cache_key, ns_bound_list.bounds)
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
with mock.patch('swift.proxy.controllers.base.random.random',
return_value=0.099):
actual = get_namespaces_from_cache(req, cache_key, 0.1)
self.assertEqual((None, 'skip'), actual)
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
with mock.patch('swift.proxy.controllers.base.random.random',
return_value=0.1):
actual = get_namespaces_from_cache(req, cache_key, 0.1)
self.assertEqual((ns_bound_list, 'hit'), actual)
def test_get_namespaces_from_cache_error(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
self.cache.set(cache_key, ns_bound_list.bounds)
# sanity check
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
actual = get_namespaces_from_cache(req, cache_key, 0.0)
self.assertEqual((ns_bound_list, 'hit'), actual)
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
self.cache.error_on_get = [True]
actual = get_namespaces_from_cache(req, cache_key, 0.0)
self.assertEqual((None, 'error'), actual)
def test_set_namespaces_in_cache_disabled(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
req = Request.blank('a/c')
actual = set_namespaces_in_cache(req, cache_key, ns_bound_list, 123)
self.assertEqual('disabled', actual)
self.assertEqual({cache_key: ns_bound_list},
req.environ['swift.infocache'])
def test_set_namespaces_in_cache_ok(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
actual = set_namespaces_in_cache(req, cache_key, ns_bound_list, 123)
self.assertEqual('set', actual)
self.assertEqual({cache_key: ns_bound_list},
req.environ['swift.infocache'])
self.assertEqual(ns_bound_list.bounds, self.cache.store.get(cache_key))
self.assertEqual(123, self.cache.times.get(cache_key))
def test_set_namespaces_in_cache_infocache_exists(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
req = Request.blank('a/c')
req.environ['swift.infocache'] = {'already': 'exists'}
actual = set_namespaces_in_cache(req, cache_key, ns_bound_list, 123)
self.assertEqual('disabled', actual)
self.assertEqual({'already': 'exists', cache_key: ns_bound_list},
req.environ['swift.infocache'])
def test_set_namespaces_in_cache_error(self):
cache_key = 'shard-updating-v2/a/c/'
ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']])
req = Request.blank('a/c')
req.environ['swift.cache'] = self.cache
self.cache.error_on_set = [True]
actual = set_namespaces_in_cache(req, cache_key, ns_bound_list, 123)
self.assertEqual('set_error', actual)
self.assertEqual(ns_bound_list,
req.environ['swift.infocache'].get(cache_key))
def test_get_info_zero_recheck(self):
mock_cache = mock.Mock()
mock_cache.get.return_value = None

View File

@ -2758,7 +2758,7 @@ class TestContainerController(TestRingBase):
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
time=exp_recheck_listing, raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertEqual(sharding_state,
@ -2797,7 +2797,7 @@ class TestContainerController(TestRingBase):
[mock.call.get('container/a/c'),
mock.call.get(cache_key, raise_on_error=True),
mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
time=exp_recheck_listing, raise_on_error=True),
# Since there was a backend request, we go ahead and cache
# container info, too
mock.call.set('container/a/c', mock.ANY, time=60)],
@ -2860,7 +2860,7 @@ class TestContainerController(TestRingBase):
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.set(cache_key, self.ns_bound_list.bounds,
time=exp_recheck_listing),
time=exp_recheck_listing, raise_on_error=True),
# Since there was a backend request, we go ahead and cache
# container info, too
mock.call.set('container/a/c', mock.ANY, time=60)],
@ -3214,14 +3214,13 @@ class TestContainerController(TestRingBase):
expected_hdrs.update(resp_hdrs)
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.set(
'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600),
mock.call.set('shard-listing-v2/a/c', self.ns_bound_list.bounds,
time=600, raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
info_lines = self.logger.get_lines_for_level('info')
self.assertIn(
'Caching listing shards for shard-listing-v2/a/c (3 shards)',
info_lines)
self.assertIn('Caching listing namespaces for shard-listing-v2/a/c '
'(3 namespaces)', info_lines)
# shards were cached
self.assertEqual('sharded',
self.memcache.calls[2][1][1]['sharding_state'])
@ -3314,8 +3313,8 @@ class TestContainerController(TestRingBase):
self._check_response(resp, self.ns_dicts, expected_hdrs)
self.assertEqual(
[mock.call.get('container/a/c'),
mock.call.set(
'shard-listing-v2/a/c', self.ns_bound_list.bounds, time=600),
mock.call.set('shard-listing-v2/a/c', self.ns_bound_list.bounds,
time=600, raise_on_error=True),
mock.call.set('container/a/c', mock.ANY, time=60)],
self.memcache.calls)
self.assertEqual('sharded',