Proxy: restructure cached updating shard ranges

Restructure the shard ranges that are stored in memcache for
object updating to only persist the essential attributes of
shard ranges in memcache (lower bounds and names), so the
aggregate of memcache values is much smaller and retrieval
will be much faster too.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>

UpgradeImpact
=============
The cache key for updating shard ranges in memcached is renamed
from 'shard-updating/<account>/<container>' to
'shard-updating-v2/<account>/<container>', and cache data is
changed to be a list of [lower bound, name]. As a result, this
will invalid all existing updating shard ranges stored in the
memcache cluster.

Change-Id: If98af569f99aa1ac79b9485ce9028fdd8d22576b
This commit is contained in:
Jianjian Huo 2023-01-17 12:17:40 -08:00
parent a2952962d2
commit 6ff90ea73e
6 changed files with 517 additions and 271 deletions

View File

@ -96,7 +96,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.linkat import linkat
# For backwards compatability with 3rd party middlewares
from swift.common.registry import register_swift_info, get_swift_info # noqa
from swift.common.registry import register_swift_info, get_swift_info # noqa
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
@ -585,6 +585,7 @@ class _UTC(datetime.tzinfo):
"""
A tzinfo class for datetime objects that returns a 0 timedelta (UTC time)
"""
def dst(self, dt):
return datetime.timedelta(0)
utcoffset = dst
@ -934,6 +935,7 @@ class _LibcWrapper(object):
has the function of that name. If false, then calls will fail with a
NotImplementedError.
"""
def __init__(self, func_name):
self._func_name = func_name
self._func_handle = None
@ -1715,6 +1717,7 @@ class RateLimitedIterator(object):
this many elements; default is 0 (rate limit
immediately)
"""
def __init__(self, iterable, elements_per_second, limit_after=0,
ratelimit_if=lambda _junk: True):
self.iterator = iter(iterable)
@ -1749,6 +1752,7 @@ class GreenthreadSafeIterator(object):
an error like "ValueError: generator already executing". By wrapping calls
to next() with a mutex, we avoid that error.
"""
def __init__(self, unsafe_iterable):
self.unsafe_iter = iter(unsafe_iterable)
self.semaphore = eventlet.semaphore.Semaphore(value=1)
@ -2068,6 +2072,7 @@ class SwiftLoggerAdapter(logging.LoggerAdapter):
Like logging.LoggerAdapter, you have to subclass this and override the
process() method to accomplish anything useful.
"""
def get_metric_name(self, metric):
# subclasses may override this method to annotate the metric name
return metric
@ -2110,6 +2115,7 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
Adds an optional prefix to all its log messages. When the prefix has not
been set, messages are unchanged.
"""
def set_prefix(self, prefix):
self.extra['prefix'] = prefix
@ -2129,6 +2135,7 @@ class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter):
"""
Adds a prefix to all Statsd metrics' names.
"""
def __init__(self, logger, extra, metric_prefix):
"""
:param logger: an instance of logging.Logger
@ -2382,6 +2389,7 @@ class LogLevelFilter(object):
(DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL)
Default: DEBUG
"""
def __init__(self, level=logging.DEBUG):
self.level = level
@ -3682,6 +3690,7 @@ class GreenAsyncPile(object):
Correlating results with jobs (if necessary) is left to the caller.
"""
def __init__(self, size_or_pool):
"""
:param size_or_pool: thread pool size or a pool to use
@ -3775,6 +3784,7 @@ class StreamingPile(GreenAsyncPile):
When used as a context manager, has the same worker-killing properties as
:class:`ContextPool`.
"""
def __init__(self, size):
""":param size: number of worker threads to use"""
self.pool = ContextPool(size)
@ -4266,6 +4276,7 @@ class Everything(object):
A container that contains everything. If "e" is an instance of
Everything, then "x in e" is true for all x.
"""
def __contains__(self, element):
return True
@ -4297,6 +4308,7 @@ class CloseableChain(object):
Like itertools.chain, but with a close method that will attempt to invoke
its sub-iterators' close methods, if any.
"""
def __init__(self, *iterables):
self.iterables = iterables
self.chained_iter = itertools.chain(*self.iterables)
@ -4340,6 +4352,7 @@ class InputProxy(object):
File-like object that counts bytes read.
To be swapped in for wsgi.input for accounting purposes.
"""
def __init__(self, wsgi_input):
"""
:param wsgi_input: file-like object to wrap the functionality of
@ -4481,6 +4494,7 @@ class Spliterator(object):
"l" # shorter than requested; this can happen with the last iterator
"""
def __init__(self, source_iterable):
self.input_iterator = iter(source_iterable)
self.leftovers = None
@ -5238,6 +5252,7 @@ class ShardName(object):
root container's own shard range will have a name format of
<account>/<root_container> which will raise ValueError if passed to parse.
"""
def __init__(self, account, root_container,
parent_container_hash,
timestamp,
@ -5329,7 +5344,277 @@ class ShardName(object):
raise ValueError('invalid name: %s' % name)
class ShardRange(object):
@functools.total_ordering
class Namespace(object):
__slots__ = ('_lower', '_upper', 'name')
@functools.total_ordering
class MaxBound(ShardRangeOuterBound):
# singleton for maximum bound
def __ge__(self, other):
return True
@functools.total_ordering
class MinBound(ShardRangeOuterBound):
# singleton for minimum bound
def __le__(self, other):
return True
MIN = MinBound()
MAX = MaxBound()
def __init__(self, name, lower, upper):
self._lower = Namespace.MIN
self._upper = Namespace.MAX
self.lower = lower
self.upper = upper
self.name = name
def __iter__(self):
yield 'name', str(self.name)
yield 'lower', self.lower_str
yield 'upper', self.upper_str
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
'%s=%r' % prop for prop in self))
def __lt__(self, other):
# a Namespace is less than other if its entire namespace is less than
# other; if other is another Namespace that implies that this
# Namespace's upper must be less than or equal to the other
# Namespace's lower
if self.upper == Namespace.MAX:
return False
if isinstance(other, Namespace):
return self.upper <= other.lower
elif other is None:
return True
else:
return self.upper < self._encode(other)
def __gt__(self, other):
# a Namespace is greater than other if its entire namespace is greater
# than other; if other is another Namespace that implies that this
# Namespace's lower must be less greater than or equal to the other
# Namespace's upper
if self.lower == Namespace.MIN:
return False
if isinstance(other, Namespace):
return self.lower >= other.upper
elif other is None:
return False
else:
return self.lower >= self._encode(other)
def __eq__(self, other):
# test for equality of range bounds only
if not isinstance(other, Namespace):
return False
return self.lower == other.lower and self.upper == other.upper
def __ne__(self, other):
return not (self == other)
def __contains__(self, item):
# test if the given item is within the namespace
if item == '':
return False
item = self._encode_bound(item)
return self.lower < item <= self.upper
@classmethod
def _encode(cls, value):
if six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
if six.PY3 and isinstance(value, six.binary_type):
# This should never fail -- the value should always be coming from
# valid swift paths, which means UTF-8
return value.decode('utf-8')
return value
def _encode_bound(self, bound):
if isinstance(bound, ShardRangeOuterBound):
return bound
if not (isinstance(bound, six.text_type) or
isinstance(bound, six.binary_type)):
raise TypeError('must be a string type')
return self._encode(bound)
@property
def lower(self):
return self._lower
@property
def lower_str(self):
return str(self.lower)
@lower.setter
def lower(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = Namespace.MIN
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('lower %s' % err)
if value > self._upper:
raise ValueError(
'lower (%r) must be less than or equal to upper (%r)' %
(value, self.upper))
self._lower = value
@property
def upper(self):
return self._upper
@property
def upper_str(self):
return str(self.upper)
@upper.setter
def upper(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = Namespace.MAX
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('upper %s' % err)
if value < self._lower:
raise ValueError(
'upper (%r) must be greater than or equal to lower (%r)' %
(value, self.lower))
self._upper = value
def entire_namespace(self):
"""
Returns True if this namespace includes the entire namespace, False
otherwise.
"""
return (self.lower == Namespace.MIN and
self.upper == Namespace.MAX)
def overlaps(self, other):
"""
Returns True if this namespace overlaps with the other namespace.
:param other: an instance of :class:`~swift.common.utils.Namespace`
"""
if not isinstance(other, Namespace):
return False
return max(self.lower, other.lower) < min(self.upper, other.upper)
def includes(self, other):
"""
Returns True if this namespace includes the whole of the other
namespace, False otherwise.
:param other: an instance of :class:`~swift.common.utils.Namespace`
"""
return (self.lower <= other.lower) and (other.upper <= self.upper)
def expand(self, donors):
"""
Expands the bounds as necessary to match the minimum and maximum bounds
of the given donors.
:param donors: A list of :class:`~swift.common.utils.Namespace`
:return: True if the bounds have been modified, False otherwise.
"""
modified = False
new_lower = self.lower
new_upper = self.upper
for donor in donors:
new_lower = min(new_lower, donor.lower)
new_upper = max(new_upper, donor.upper)
if self.lower > new_lower or self.upper < new_upper:
self.lower = new_lower
self.upper = new_upper
modified = True
return modified
class NamespaceBoundList(object):
def __init__(self, bounds):
"""
Encapsulate a compact representation of namespaces. Each item in the
list is a list [lower bound, name].
:param bounds: a list of lists ``[lower bound, name]``. The list
should be ordered by ``lower bound``.
"""
self.bounds = [] if bounds is None else bounds
@classmethod
def parse(cls, namespaces):
"""
Create a NamespaceBoundList object by parsing a list of Namespaces or
shard ranges and only storing the compact bounds list.
Each Namespace in the given list of ``namespaces`` provides the next
[lower bound, name] list to append to the NamespaceBoundList. The
given ``namespaces`` should be contiguous because the
NamespaceBoundList only stores lower bounds; if ``namespaces`` has
overlaps then at least one of the overlapping namespaces may be
ignored; similarly, gaps between namespaces are not represented in the
NamespaceBoundList.
:param namespaces: A list of Namespace instances. The list should be
ordered by namespace bounds.
:return: a NamespaceBoundList.
"""
if not namespaces:
return None
bounds = []
upper = namespaces[0].lower
for ns in namespaces:
if ns.lower < upper:
# Discard overlapping namespace.
# Overlapping namespaces are expected in lists of shard ranges
# fetched from the backend. For example, while a parent
# container is in the process of sharding, the parent shard
# range and its children shard ranges may be returned in the
# list of shard ranges. However, the backend sorts the list by
# (upper, state, lower, name) such that the children precede
# the parent, and it is the children that we prefer to retain
# in the NamespaceBoundList. For example, these namespaces:
# (a-b, "child1"), (b-c, "child2"), (a-c, "parent")
# would result in a NamespaceBoundList:
# (a, "child1"), (b, "child2")
# Unexpected overlaps or gaps may result in namespaces being
# 'extended' because only lower bounds are stored. For example,
# these namespaces:
# (a-b, "ns1"), (d-e, "ns2")
# would result in a NamespaceBoundList:
# (a, "ns1"), (d, "ns2")
# When used to find a target namespace for an object update
# that lies in a gap, the NamespaceBoundList will map the
# object name to the preceding namespace. In the example, an
# object named "c" would be mapped to "ns1". (In previous
# versions, an object update lying in a gap would have been
# mapped to the root container.)
continue
bounds.append([ns.lower_str, str(ns.name)])
upper = ns.upper
return cls(bounds)
def get_namespace(self, item):
"""
Get a Namespace instance that contains ``item``.
:param item: The item for a which a Namespace is to be found.
:return: the Namespace that contains ``item``.
"""
pos = bisect.bisect(self.bounds, [item]) - 1
lower, name = self.bounds[pos]
upper = ('' if pos + 1 == len(self.bounds)
else self.bounds[pos + 1][0])
return Namespace(name, lower, upper)
class ShardRange(Namespace):
"""
A ShardRange encapsulates sharding state related to a container including
lower and upper bounds that define the object namespace for which the
@ -5398,41 +5683,25 @@ class ShardRange(object):
SHARDING_STATES = (SHARDING, SHARDED)
CLEAVING_STATES = SHRINKING_STATES + SHARDING_STATES
@functools.total_ordering
class MaxBound(ShardRangeOuterBound):
# singleton for maximum bound
def __ge__(self, other):
return True
@functools.total_ordering
class MinBound(ShardRangeOuterBound):
# singleton for minimum bound
def __le__(self, other):
return True
MIN = MinBound()
MAX = MaxBound()
__slots__ = (
'account', 'container',
'_timestamp', '_meta_timestamp', '_state_timestamp', '_epoch',
'_lower', '_upper', '_deleted', '_state', '_count', '_bytes',
'_deleted', '_state', '_count', '_bytes',
'_tombstones', '_reported')
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
def __init__(self, name, timestamp,
lower=Namespace.MIN, upper=Namespace.MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None,
reported=False, tombstones=-1):
super(ShardRange, self).__init__(name=name, lower=lower, upper=upper)
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
self._upper = ShardRange.MAX
self._deleted = False
self._state = None
self.name = name
self.timestamp = timestamp
self.lower = lower
self.upper = upper
self.deleted = deleted
self.object_count = object_count
self.bytes_used = bytes_used
@ -5450,24 +5719,6 @@ class ShardRange(object):
# a key assumption for bisect, which is used by utils.find_shard_range
return sr.upper, sr.state, sr.lower, sr.name
@classmethod
def _encode(cls, value):
if six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
if six.PY3 and isinstance(value, six.binary_type):
# This should never fail -- the value should always be coming from
# valid swift paths, which means UTF-8
return value.decode('utf-8')
return value
def _encode_bound(self, bound):
if isinstance(bound, ShardRangeOuterBound):
return bound
if not (isinstance(bound, six.text_type) or
isinstance(bound, six.binary_type)):
raise TypeError('must be a string type')
return self._encode(bound)
def is_child_of(self, parent):
"""
Test if this shard range is a child of another shard range. The
@ -5638,56 +5889,10 @@ class ShardRange(object):
def meta_timestamp(self, ts):
self._meta_timestamp = self._to_timestamp(ts)
@property
def lower(self):
return self._lower
@property
def lower_str(self):
return str(self.lower)
@lower.setter
def lower(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = ShardRange.MIN
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('lower %s' % err)
if value > self._upper:
raise ValueError(
'lower (%r) must be less than or equal to upper (%r)' %
(value, self.upper))
self._lower = value
@property
def end_marker(self):
return self.upper_str + '\x00' if self.upper else ''
@property
def upper(self):
return self._upper
@property
def upper_str(self):
return str(self.upper)
@upper.setter
def upper(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = ShardRange.MAX
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('upper %s' % err)
if value < self._lower:
raise ValueError(
'upper (%r) must be greater than or equal to lower (%r)' %
(value, self.lower))
self._upper = value
@property
def object_count(self):
return self._count
@ -5895,56 +6100,12 @@ class ShardRange(object):
self.timestamp = timestamp or Timestamp.now()
return True
def __contains__(self, item):
# test if the given item is within the namespace
if item == '':
return False
item = self._encode_bound(item)
return self.lower < item <= self.upper
def __lt__(self, other):
# a ShardRange is less than other if its entire namespace is less than
# other; if other is another ShardRange that implies that this
# ShardRange's upper must be less than or equal to the other
# ShardRange's lower
if self.upper == ShardRange.MAX:
return False
if isinstance(other, ShardRange):
return self.upper <= other.lower
elif other is None:
return True
else:
return self.upper < self._encode(other)
def __gt__(self, other):
# a ShardRange is greater than other if its entire namespace is greater
# than other; if other is another ShardRange that implies that this
# ShardRange's lower must be less greater than or equal to the other
# ShardRange's upper
if self.lower == ShardRange.MIN:
return False
if isinstance(other, ShardRange):
return self.lower >= other.upper
elif other is None:
return False
else:
return self.lower >= self._encode(other)
def __eq__(self, other):
# test for equality of range bounds only
if not isinstance(other, ShardRange):
return False
return self.lower == other.lower and self.upper == other.upper
# A by-the-book implementation should probably hash the value, which
# in our case would be account+container+lower+upper (+timestamp ?).
# But we seem to be okay with just the identity.
def __hash__(self):
return id(self)
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return '%s<%r to %r as of %s, (%d, %d) as of %s, %s as of %s>' % (
self.__class__.__name__, self.lower, self.upper,
@ -5952,34 +6113,6 @@ class ShardRange(object):
self.meta_timestamp.internal, self.state_text,
self.state_timestamp.internal)
def entire_namespace(self):
"""
Returns True if the ShardRange includes the entire namespace, False
otherwise.
"""
return (self.lower == ShardRange.MIN and
self.upper == ShardRange.MAX)
def overlaps(self, other):
"""
Returns True if the ShardRange namespace overlaps with the other
ShardRange's namespace.
:param other: an instance of :class:`~swift.common.utils.ShardRange`
"""
if not isinstance(other, ShardRange):
return False
return max(self.lower, other.lower) < min(self.upper, other.upper)
def includes(self, other):
"""
Returns True if this namespace includes the whole of the other
namespace, False otherwise.
:param other: an instance of :class:`~swift.common.utils.ShardRange`
"""
return (self.lower <= other.lower) and (other.upper <= self.upper)
def __iter__(self):
yield 'name', self.name
yield 'timestamp', self.timestamp.internal
@ -6028,26 +6161,6 @@ class ShardRange(object):
params['state_timestamp'], params['epoch'],
params.get('reported', 0), params.get('tombstones', -1))
def expand(self, donors):
"""
Expands the bounds as necessary to match the minimum and maximum bounds
of the given donors.
:param donors: A list of :class:`~swift.common.utils.ShardRange`
:return: True if the bounds have been modified, False otherwise.
"""
modified = False
new_lower = self.lower
new_upper = self.upper
for donor in donors:
new_lower = min(new_lower, donor.lower)
new_upper = max(new_upper, donor.upper)
if self.lower > new_lower or self.upper < new_upper:
self.lower = new_lower
self.upper = new_upper
modified = True
return modified
class ShardRangeList(UserList):
"""
@ -6057,6 +6170,7 @@ class ShardRangeList(UserList):
This class does not enforce ordering or continuity of the list items:
callers should ensure that items are added in order as appropriate.
"""
def __getitem__(self, index):
# workaround for py3 - not needed for py2.7,py3.8
result = self.data[index]
@ -6069,27 +6183,27 @@ class ShardRangeList(UserList):
only be equal to the lowest bound of all items in the list if the list
contents has been sorted.
:return: lower bound of first item in the list, or ShardRange.MIN
:return: lower bound of first item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
return ShardRange.MIN
return Namespace.MIN
return self[0].lower
@property
def upper(self):
"""
Returns the upper bound of the first item in the list. Note: this will
Returns the upper bound of the last item in the list. Note: this will
only be equal to the uppermost bound of all items in the list if the
list has previously been sorted.
:return: upper bound of first item in the list, or ShardRange.MIN
:return: upper bound of last item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
return ShardRange.MIN
return Namespace.MIN
return self[-1].upper
@property
@ -6231,7 +6345,7 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
if marker or end_marker:
return list(filter(shard_range_filter, shard_ranges))
if marker == ShardRange.MAX or end_marker == ShardRange.MIN:
if marker == Namespace.MAX or end_marker == Namespace.MIN:
# MIN and MAX are both Falsy so not handled by shard_range_filter
return []
@ -6590,6 +6704,7 @@ class NoopMutex(object):
of which have the message-interleaving trouble you'd expect from TCP or
file handlers.
"""
def __init__(self):
# Usually, it's an error to have multiple greenthreads all waiting
# to write to the same file descriptor. It's often a sign of inadequate
@ -6857,6 +6972,7 @@ class Watchdog(object):
=> the exception is raised, then the greenlet watchdog sleep(3) to
wake up for the 1st timeout expiration
"""
def __init__(self):
# key => (timeout, timeout_at, caller_greenthread, exception)
self._timeouts = dict()
@ -6946,6 +7062,7 @@ class WatchdogTimeout(object):
"""
Context manager to schedule a timeout in a Watchdog instance
"""
def __init__(self, watchdog, timeout, exc, timeout_at=None):
"""
Schedule a timeout in a Watchdog instance

View File

@ -615,7 +615,10 @@ def get_cache_key(account, container=None, obj=None, shard=None):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
if shard == 'updating':
cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
else:
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
elif obj:
if not (account and container):
raise ValueError('Object cache key requires account and container')

View File

@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
ShardRange, find_shard_range, cache_from_env)
ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@ -278,37 +278,67 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _get_cached_updating_shard_ranges(
def _get_cached_updating_namespaces(
self, infocache, memcache, cache_key):
"""
Fetch cached shard ranges from infocache and memcache.
Fetch cached updating namespaces of updating shard ranges from
infocache and memcache.
:param infocache: the infocache instance.
:param memcache: an instance of a memcache client,
:class:`swift.common.memcached.MemcacheRing`.
:param cache_key: the cache key for both infocache and memcache.
:return: a tuple of (list of shard ranges in dict format, cache state)
:return: a tuple of (an instance of NamespaceBoundList, cache state)
"""
cached_ranges = infocache.get(cache_key)
if cached_ranges:
cache_state = 'infocache_hit'
# try get namespaces from infocache first
namespace_list = infocache.get(cache_key)
if namespace_list:
return namespace_list, 'infocache_hit'
# then try get them from memcache
if not memcache:
return None, 'disabled'
skip_chance = self.app.container_updating_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
return None, 'skip'
try:
namespaces = memcache.get(cache_key, raise_on_error=True)
cache_state = 'hit' if namespaces else 'miss'
except MemcacheConnectionError:
namespaces = None
cache_state = 'error'
if namespaces:
if six.PY2:
# json.loads() in memcache.get will convert json 'string' to
# 'unicode' with python2, here we cast 'unicode' back to 'str'
namespaces = [
[lower.encode('utf-8'), name.encode('utf-8')]
for lower, name in namespaces]
namespace_list = NamespaceBoundList(namespaces)
else:
if memcache:
skip_chance = \
self.app.container_updating_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
cache_state = 'skip'
else:
try:
cached_ranges = memcache.get(
cache_key, raise_on_error=True)
cache_state = 'hit' if cached_ranges else 'miss'
except MemcacheConnectionError:
cache_state = 'error'
else:
cache_state = 'disabled'
cached_ranges = cached_ranges or []
return cached_ranges, cache_state
namespace_list = None
return namespace_list, cache_state
def _get_update_shard_caching_disabled(self, req, account, container, obj):
"""
Fetch all updating shard ranges for the given root container when
all caching is disabled.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
or None if the update should go back to the root
"""
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
record_cache_op_metrics(
self.logger, 'shard_updating', 'disabled', response)
# there will be only one shard range in the list if any
return shard_ranges[0] if shard_ranges else None
def _get_update_shard(self, req, account, container, obj):
"""
@ -327,39 +357,41 @@ class BaseObjectController(Controller):
"""
if not self.app.recheck_updating_shard_ranges:
# caching is disabled
cache_state = 'disabled'
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
else:
# try to get from cache
response = None
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
(cached_ranges, cache_state
) = self._get_cached_updating_shard_ranges(
infocache, memcache, cache_key)
if cached_ranges:
# found cached shard ranges in either infocache or memcache
infocache[cache_key] = tuple(cached_ranges)
shard_ranges = [ShardRange.from_dict(shard_range)
for shard_range in cached_ranges]
else:
# pull full set of updating shards from backend
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
cached_ranges = [dict(sr) for sr in shard_ranges]
infocache[cache_key] = tuple(cached_ranges)
if memcache:
memcache.set(
cache_key, cached_ranges,
time=self.app.recheck_updating_shard_ranges)
return self._get_update_shard_caching_disabled(
req, account, container, obj)
# caching is enabled, try to get from caches
response = None
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cached_namespaces, cache_state = self._get_cached_updating_namespaces(
infocache, memcache, cache_key)
if cached_namespaces:
# found cached namespaces in either infocache or memcache
infocache[cache_key] = cached_namespaces
namespace = cached_namespaces.get_namespace(obj)
update_shard = ShardRange(
name=namespace.name, timestamp=0, lower=namespace.lower,
upper=namespace.upper)
else:
# pull full set of updating shard ranges from backend
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
# only store the list of namespace lower bounds and names into
# infocache and memcache.
cached_namespaces = NamespaceBoundList.parse(
shard_ranges)
infocache[cache_key] = cached_namespaces
if memcache:
memcache.set(
cache_key, cached_namespaces.bounds,
time=self.app.recheck_updating_shard_ranges)
update_shard = find_shard_range(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
return find_shard_range(obj, shard_ranges or [])
return update_shard
def _get_update_target(self, req, container_info):
# find the sharded container to which we'll send the update

View File

@ -5910,6 +5910,7 @@ class UnsafeXrange(object):
"""
Like range(limit), but with extra context switching to screw things up.
"""
def __init__(self, upper_bound):
self.current = 0
self.concurrent_calls = 0
@ -8211,6 +8212,83 @@ class TestShardName(unittest.TestCase):
utils.ShardName.create('a', 'root', None, '1235678', 'bad')
class TestNamespace(unittest.TestCase):
def test_total_ordering(self):
a_start_ns = utils.Namespace('a/-a', '', 'a')
a_atob_ns = utils.Namespace('a/a-b', 'a', 'b')
a_atof_ns = utils.Namespace('a/a-f', 'a', 'f')
a_ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
a_ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
a_rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
a_end_ns = utils.Namespace('a/z-', 'z', '')
b_start_ns = utils.Namespace('b/-a', '', 'a')
self.assertEqual(a_start_ns, b_start_ns)
self.assertNotEqual(a_start_ns, a_atob_ns)
self.assertLess(a_start_ns, a_atob_ns)
self.assertLess(a_atof_ns, a_ftol_ns)
self.assertLess(a_ftol_ns, a_ltor_ns)
self.assertLess(a_ltor_ns, a_rtoz_ns)
self.assertLess(a_rtoz_ns, a_end_ns)
self.assertLessEqual(a_start_ns, a_atof_ns)
self.assertLessEqual(a_atof_ns, a_rtoz_ns)
self.assertGreater(a_end_ns, a_atof_ns)
self.assertGreater(a_rtoz_ns, a_ftol_ns)
self.assertGreater(a_end_ns, a_start_ns)
self.assertGreaterEqual(a_end_ns, a_atof_ns)
self.assertGreaterEqual(a_rtoz_ns, a_start_ns)
class TestNamespaceBoundList(unittest.TestCase):
def test_functions(self):
start = ['', 'a/-a']
start_ns = utils.Namespace('a/-a', '', 'a')
atof = ['a', 'a/a-f']
atof_ns = utils.Namespace('a/a-f', 'a', 'f')
ftol = ['f', 'a/f-l']
ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
ltor = ['l', 'a/l-r']
ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
rtoz = ['r', 'a/r-z']
rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
end = ['z', 'a/z-']
end_ns = utils.Namespace('a/z-', 'z', '')
lowerbounds = [start, atof, ftol, ltor, rtoz, end]
namespace_list = utils.NamespaceBoundList(lowerbounds)
# test 'get_namespace'
self.assertEqual(namespace_list.get_namespace('1'), start_ns)
self.assertEqual(namespace_list.get_namespace('a'), start_ns)
self.assertEqual(namespace_list.get_namespace('b'), atof_ns)
self.assertEqual(namespace_list.get_namespace('f'), atof_ns)
self.assertEqual(namespace_list.get_namespace('f\x00'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
self.assertEqual(namespace_list.get_namespace('}'), end_ns)
# test 'parse'
namespaces_list = utils.NamespaceBoundList.parse(None)
self.assertEqual(namespaces_list, None)
namespaces = [start_ns, atof_ns, ftol_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(namespaces)
self.assertEqual(namespace_list.get_namespace('1'), start_ns)
self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
self.assertEqual(namespace_list.get_namespace('}'), end_ns)
self.assertEqual(namespace_list.bounds, lowerbounds)
overlap_f_ns = utils.Namespace('a/-f', '', 'f')
overlapping_namespaces = [start_ns, atof_ns, overlap_f_ns,
ftol_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
self.assertEqual(namespace_list.bounds, lowerbounds)
overlap_l_ns = utils.Namespace('a/a-l', 'a', 'l')
overlapping_namespaces = [start_ns, atof_ns, ftol_ns,
overlap_l_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
self.assertEqual(namespace_list.bounds, lowerbounds)
class TestShardRange(unittest.TestCase):
def setUp(self):
self.ts_iter = make_timestamp_iter()

View File

@ -501,7 +501,7 @@ class TestFuncs(BaseTest):
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
'shard-listing/account/cont')
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
'shard-updating/account/cont')
'shard-updating-v2/account/cont')
self.assertRaises(ValueError,
get_cache_key, "account", shard="listing")
self.assertRaises(ValueError,

View File

@ -71,7 +71,7 @@ from swift.common import utils, constraints, registry
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, StatsdClient, \
iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \
node_to_string
node_to_string, NamespaceBoundList
from swift.common.wsgi import loadapp, ConfigString
from swift.common.http_protocol import SwiftHttpProtocol
from swift.proxy.controllers import base as proxy_base
@ -4370,13 +4370,16 @@ class TestReplicatedObjectController(
params={'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in shard_ranges])
cached_namespaces = NamespaceBoundList.parse(shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4433,8 +4436,11 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
cache.set('shard-updating/a/c', tuple(
dict(shard_range) for shard_range in shard_ranges))
cache.set(
'shard-updating-v2/a/c',
tuple(
[shard_range.lower_str, str(shard_range.name)]
for shard_range in shard_ranges))
req = Request.blank('/v1/a/c/o', {'swift.cache': cache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@ -4467,10 +4473,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# infocache gets populated from memcache
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4527,8 +4534,8 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
infocache = {
'shard-updating/a/c':
tuple(dict(shard_range) for shard_range in shard_ranges)}
'shard-updating-v2/a/c':
NamespaceBoundList.parse(shard_ranges)}
req = Request.blank('/v1/a/c/o', {'swift.infocache': infocache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@ -4560,10 +4567,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# verify content in infocache.
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4621,8 +4629,10 @@ class TestReplicatedObjectController(
'.shards_a/c_no_way', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
cache.set('shard-updating/a/c', tuple(
dict(shard_range) for shard_range in cached_shard_ranges))
cache.set('shard-updating-v2/a/c',
tuple(
[sr.lower_str, str(sr.name)]
for sr in cached_shard_ranges))
# sanity check: we can get the old shard from cache
req = Request.blank(
@ -4636,7 +4646,7 @@ class TestReplicatedObjectController(
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
with mock.patch('random.random', return_value=1), \
mocked_http_conn(*status_codes, headers=resp_headers):
mocked_http_conn(*status_codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -4646,13 +4656,16 @@ class TestReplicatedObjectController(
'object.shard_updating.cache.hit': 1}, stats)
# cached shard ranges are still there
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in cached_shard_ranges])
cached_namespaces = NamespaceBoundList.parse(cached_shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in cached_shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# ...but we have some chance to skip cache
req = Request.blank(
@ -4675,8 +4688,8 @@ class TestReplicatedObjectController(
dict(shard_range)
for shard_range in shard_ranges]).encode('ascii')
with mock.patch('random.random', return_value=0), \
mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -4698,13 +4711,16 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# and skipping cache will refresh it
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in shard_ranges])
cached_namespaces = NamespaceBoundList.parse(shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4805,7 +4821,7 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# infocache does not get populated from memcache
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertNotIn(cache_key, req.environ.get('swift.infocache'))
# make sure backend requests included expected container headers