Merge "sharder: make misplaced objects lookup faster"
This commit is contained in:
commit
c0483c5b94
@ -1525,106 +1525,119 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
return self._audit_root_container(broker)
|
||||
return self._audit_shard_container(broker)
|
||||
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None):
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None,
|
||||
batch_size=None):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both
|
||||
deleted and undeleted objects are included.
|
||||
Iterates through all object rows in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to ``batch_size`` in length. All batches
|
||||
of rows that are not marked deleted are yielded before all batches of
|
||||
rows that are marked deleted.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
describing the source range.
|
||||
:param since_row: include only items whose ROWID is greater than
|
||||
the given row id; by default all rows are included.
|
||||
:return: a generator of tuples of (list of objects, broker info dict)
|
||||
:param since_row: include only object rows whose ROWID is greater than
|
||||
the given row id; by default all object rows are included.
|
||||
:param batch_size: The maximum number of object rows to include in each
|
||||
yielded batch; defaults to cleave_row_batch_size.
|
||||
:return: a generator of tuples of (list of rows, broker info dict)
|
||||
"""
|
||||
if (src_shard_range.lower == ShardRange.MAX or
|
||||
src_shard_range.upper == ShardRange.MIN):
|
||||
# this is an unexpected condition but handled with an early return
|
||||
# just in case, because:
|
||||
# lower == ShardRange.MAX -> marker == ''
|
||||
# which could result in rows being erroneously yielded.
|
||||
return
|
||||
|
||||
batch_size = batch_size or self.cleave_row_batch_size
|
||||
for include_deleted in (False, True):
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
limit=batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=None, # give me everything
|
||||
include_deleted=include_deleted,
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
self.logger.debug(
|
||||
'got %s rows (deleted=%s) from %s in %ss',
|
||||
len(objects),
|
||||
include_deleted,
|
||||
broker.db_file,
|
||||
time.time() - start)
|
||||
if objects:
|
||||
yield objects, info
|
||||
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
if len(objects) < batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
|
||||
def yield_objects_to_shard_range(self, broker, src_shard_range,
|
||||
dest_shard_ranges):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` to place them in
|
||||
destination shard ranges provided by the ``next_shard_range`` function.
|
||||
Yields tuples of (object list, destination shard range in which those
|
||||
objects belong). Note that the same destination shard range may be
|
||||
referenced in more than one yielded tuple.
|
||||
Iterates through all object rows in ``src_shard_range`` to place them
|
||||
in destination shard ranges provided by the ``dest_shard_ranges``
|
||||
function. Yields tuples of ``(batch of object rows, destination shard
|
||||
range in which those object rows belong, broker info)``.
|
||||
|
||||
If no destination shard range exists for a batch of object rows then
|
||||
tuples are yielded of ``(batch of object rows, None, broker info)``.
|
||||
This indicates to the caller that there are a non-zero number of object
|
||||
rows for which no destination shard range was found.
|
||||
|
||||
Note that the same destination shard range may be referenced in more
|
||||
than one yielded tuple.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
describing the source range.
|
||||
:param dest_shard_ranges: A function which should return a list of
|
||||
destination shard ranges in name order.
|
||||
:return: a generator of tuples of
|
||||
(object list, shard range, broker info dict)
|
||||
destination shard ranges sorted in the order defined by
|
||||
:meth:`~swift.common.utils.ShardRange.sort_key`.
|
||||
:return: a generator of tuples of ``(object row list, shard range,
|
||||
broker info dict)`` where ``shard_range`` may be ``None``.
|
||||
"""
|
||||
dest_shard_range_iter = dest_shard_range = None
|
||||
for objs, info in self.yield_objects(broker, src_shard_range):
|
||||
if not objs:
|
||||
# calling dest_shard_ranges() may result in a request to fetch shard
|
||||
# ranges, so first check that the broker actually has misplaced object
|
||||
# rows in the source namespace
|
||||
for _ in self.yield_objects(broker, src_shard_range, batch_size=1):
|
||||
break
|
||||
else:
|
||||
return
|
||||
|
||||
def next_or_none(it):
|
||||
try:
|
||||
return next(it)
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
if dest_shard_range_iter is None:
|
||||
dest_shard_range_iter = iter(dest_shard_ranges())
|
||||
dest_shard_range = next_or_none(dest_shard_range_iter)
|
||||
src_shard_range_marker = src_shard_range.lower
|
||||
for dest_shard_range in dest_shard_range_iter:
|
||||
if dest_shard_range.upper <= src_shard_range.lower:
|
||||
continue
|
||||
|
||||
unplaced = False
|
||||
last_index = next_index = 0
|
||||
for obj in objs:
|
||||
if dest_shard_range is None:
|
||||
# no more destinations: yield remainder of batch and bail
|
||||
# NB there may be more batches of objects but none of them
|
||||
# will be placed so no point fetching them
|
||||
yield objs[last_index:], None, info
|
||||
return
|
||||
if obj['name'] <= dest_shard_range.lower:
|
||||
unplaced = True
|
||||
elif unplaced:
|
||||
# end of run of unplaced objects, yield them
|
||||
yield objs[last_index:next_index], None, info
|
||||
last_index = next_index
|
||||
unplaced = False
|
||||
while (dest_shard_range and
|
||||
obj['name'] > dest_shard_range.upper):
|
||||
if next_index != last_index:
|
||||
# yield the objects in current dest_shard_range
|
||||
yield (objs[last_index:next_index],
|
||||
dest_shard_range,
|
||||
info)
|
||||
last_index = next_index
|
||||
dest_shard_range = next_or_none(dest_shard_range_iter)
|
||||
next_index += 1
|
||||
if dest_shard_range.lower > src_shard_range_marker:
|
||||
# no destination for a sub-namespace of the source namespace
|
||||
sub_src_range = src_shard_range.copy(
|
||||
lower=src_shard_range_marker, upper=dest_shard_range.lower)
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, None, info
|
||||
|
||||
if next_index != last_index:
|
||||
# yield tail of current batch of objects
|
||||
# NB there may be more objects for the current
|
||||
# dest_shard_range in the next batch from yield_objects
|
||||
yield (objs[last_index:next_index],
|
||||
None if unplaced else dest_shard_range,
|
||||
info)
|
||||
sub_src_range = src_shard_range.copy(
|
||||
lower=max(dest_shard_range.lower, src_shard_range.lower),
|
||||
upper=min(dest_shard_range.upper, src_shard_range.upper))
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, dest_shard_range, info
|
||||
|
||||
src_shard_range_marker = dest_shard_range.upper
|
||||
if dest_shard_range.upper >= src_shard_range.upper:
|
||||
# the entire source namespace has been traversed
|
||||
break
|
||||
else:
|
||||
# dest_shard_ranges_iter was exhausted before reaching the end of
|
||||
# the source namespace
|
||||
sub_src_range = src_shard_range.copy(lower=src_shard_range_marker)
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, None, info
|
||||
|
||||
def _post_replicate_hook(self, broker, info, responses):
|
||||
# override superclass behaviour
|
||||
|
@ -427,13 +427,13 @@ class TestSharder(BaseTestSharder):
|
||||
'container-sharder-6021-ic')
|
||||
|
||||
def _assert_stats(self, expected, sharder, category):
|
||||
# assertEqual doesn't work with a defaultdict
|
||||
# assertEqual doesn't work with a stats defaultdict so copy to a dict
|
||||
# before comparing
|
||||
stats = sharder.stats['sharding'][category]
|
||||
actual = {}
|
||||
for k, v in expected.items():
|
||||
actual = stats[k]
|
||||
self.assertEqual(
|
||||
v, actual, 'Expected %s but got %s for %s in %s' %
|
||||
(v, actual, k, stats))
|
||||
actual[k] = stats[k]
|
||||
self.assertEqual(expected, actual)
|
||||
return stats
|
||||
|
||||
def _assert_recon_stats(self, expected, sharder, category):
|
||||
@ -1181,6 +1181,204 @@ class TestSharder(BaseTestSharder):
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
|
||||
def test_yield_objects(self):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
|
||||
i % 2, 0) for i in range(30)]
|
||||
for obj in objects:
|
||||
broker.put_object(*obj)
|
||||
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([15, 15], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 15, [1] * 15],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# custom batch size
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range, batch_size=10)]
|
||||
self.assertEqual([10, 5, 10, 5], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 10, [0] * 5, [1] * 10, [1] * 5],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# restricted source range
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o10', upper='o20')
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([5, 5], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 5, [1] * 5],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# null source range
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower=ShardRange.MAX)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([], batches)
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
upper=ShardRange.MIN)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([], batches)
|
||||
|
||||
def test_yield_objects_to_shard_range_no_objects(self):
|
||||
# verify that dest_shard_ranges func is not called if the source
|
||||
# broker has no objects
|
||||
broker = self._make_broker()
|
||||
dest_shard_ranges = mock.MagicMock()
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([], batches)
|
||||
dest_shard_ranges.assert_not_called()
|
||||
|
||||
def test_yield_objects_to_shard_range(self):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
|
||||
i % 2, 0) for i in range(30)]
|
||||
for obj in objects:
|
||||
broker.put_object(*obj)
|
||||
orig_info = broker.get_info()
|
||||
# yield_objects annotates the info dict...
|
||||
orig_info['max_row'] = 30
|
||||
dest_ranges = [
|
||||
ShardRange('shard/0', Timestamp.now(), upper='o09'),
|
||||
ShardRange('shard/1', Timestamp.now(), lower='o09', upper='o19'),
|
||||
ShardRange('shard/2', Timestamp.now(), lower='o19'),
|
||||
]
|
||||
|
||||
# complete overlap of src and dest, multiple batches per dest shard
|
||||
# range per deleted/not deleted
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={'cleave_row_batch_size': 4}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[0], dest_ranges[0],
|
||||
dest_ranges[0], dest_ranges[0],
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[0:8:2]],
|
||||
[o[0] for o in objects[8:10:2]],
|
||||
[o[0] for o in objects[1:8:2]],
|
||||
[o[0] for o in objects[9:10:2]],
|
||||
[o[0] for o in objects[10:18:2]],
|
||||
[o[0] for o in objects[18:20:2]],
|
||||
[o[0] for o in objects[11:18:2]],
|
||||
[o[0] for o in objects[19:20:2]],
|
||||
[o[0] for o in objects[20:28:2]],
|
||||
[o[0] for o in objects[28:30:2]],
|
||||
[o[0] for o in objects[21:28:2]],
|
||||
[o[0] for o in objects[29:30:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 12, [info for _, _, info in yielded])
|
||||
|
||||
# src narrower than dest
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o15', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[16:20:2]],
|
||||
[o[0] for o in objects[17:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
|
||||
|
||||
# src much narrower than dest
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o15', upper='o18')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[1], dest_ranges[1]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[16:19:2]],
|
||||
[o[0] for o in objects[17:19:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 2, [info for _, _, info in yielded])
|
||||
|
||||
# dest narrower than src
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o05', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:])
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None, None,
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[6:10:2]],
|
||||
[o[0] for o in objects[7:10:2]],
|
||||
[o[0] for o in objects[10:20:2]],
|
||||
[o[0] for o in objects[11:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
|
||||
|
||||
# dest much narrower than src
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o05', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:2])
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None, None,
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
None, None],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[6:10:2]],
|
||||
[o[0] for o in objects[7:10:2]],
|
||||
[o[0] for o in objects[10:20:2]],
|
||||
[o[0] for o in objects[11:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
|
||||
|
||||
# no dest, source is entire namespace, multiple batches
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
dest_shard_ranges = mock.MagicMock(return_value=[])
|
||||
with self._mock_sharder(conf={'cleave_row_batch_size': 10}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None] * 4,
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[:20:2]],
|
||||
[o[0] for o in objects[20::2]],
|
||||
[o[0] for o in objects[1:20:2]],
|
||||
[o[0] for o in objects[21::2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
|
||||
|
||||
def _check_cleave_root(self, conf=None):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
|
Loading…
Reference in New Issue
Block a user