sharder: make misplaced objects lookup faster
The related change fixed a bug in the ContainerSharder yield_objects_to_shard_range method by replacing two DB lookups for misplaced objects, one for undeleted rows and one for deleted rows, with a single lookup for both deleted and undeleted rows. This significantly increased the time to make the lookup because it could no longer take advantage of the object table 'deleted' field index. This patch reinstates the separate lookups for undeleted and deleted rows in yield_objects. In isolation that change would re-introduce the bug that was fixed by the Related-Change. The bug is therefore now addressed by changing the yield_objects_to_shard_range implementation so that both undeleted and deleted objects are consumed from the yield_objects generator. Change-Id: I337f4e54d1bcd4c5484fe56cfc886b16077982f5 Related-Change: Ie8404f0c7e84d3916f0e0fa62afc54f1f43a4d06
This commit is contained in:
parent
5de745c2bc
commit
983879421f
@ -1525,106 +1525,119 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
return self._audit_root_container(broker)
|
||||
return self._audit_shard_container(broker)
|
||||
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None):
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None,
|
||||
batch_size=None):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both
|
||||
deleted and undeleted objects are included.
|
||||
Iterates through all object rows in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to ``batch_size`` in length. All batches
|
||||
of rows that are not marked deleted are yielded before all batches of
|
||||
rows that are marked deleted.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
describing the source range.
|
||||
:param since_row: include only items whose ROWID is greater than
|
||||
the given row id; by default all rows are included.
|
||||
:return: a generator of tuples of (list of objects, broker info dict)
|
||||
:param since_row: include only object rows whose ROWID is greater than
|
||||
the given row id; by default all object rows are included.
|
||||
:param batch_size: The maximum number of object rows to include in each
|
||||
yielded batch; defaults to cleave_row_batch_size.
|
||||
:return: a generator of tuples of (list of rows, broker info dict)
|
||||
"""
|
||||
if (src_shard_range.lower == ShardRange.MAX or
|
||||
src_shard_range.upper == ShardRange.MIN):
|
||||
# this is an unexpected condition but handled with an early return
|
||||
# just in case, because:
|
||||
# lower == ShardRange.MAX -> marker == ''
|
||||
# which could result in rows being erroneously yielded.
|
||||
return
|
||||
|
||||
batch_size = batch_size or self.cleave_row_batch_size
|
||||
for include_deleted in (False, True):
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
limit=batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=None, # give me everything
|
||||
include_deleted=include_deleted,
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
self.logger.debug(
|
||||
'got %s rows (deleted=%s) from %s in %ss',
|
||||
len(objects),
|
||||
include_deleted,
|
||||
broker.db_file,
|
||||
time.time() - start)
|
||||
if objects:
|
||||
yield objects, info
|
||||
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
if len(objects) < batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
|
||||
def yield_objects_to_shard_range(self, broker, src_shard_range,
|
||||
dest_shard_ranges):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` to place them in
|
||||
destination shard ranges provided by the ``next_shard_range`` function.
|
||||
Yields tuples of (object list, destination shard range in which those
|
||||
objects belong). Note that the same destination shard range may be
|
||||
referenced in more than one yielded tuple.
|
||||
Iterates through all object rows in ``src_shard_range`` to place them
|
||||
in destination shard ranges provided by the ``dest_shard_ranges``
|
||||
function. Yields tuples of ``(batch of object rows, destination shard
|
||||
range in which those object rows belong, broker info)``.
|
||||
|
||||
If no destination shard range exists for a batch of object rows then
|
||||
tuples are yielded of ``(batch of object rows, None, broker info)``.
|
||||
This indicates to the caller that there are a non-zero number of object
|
||||
rows for which no destination shard range was found.
|
||||
|
||||
Note that the same destination shard range may be referenced in more
|
||||
than one yielded tuple.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
describing the source range.
|
||||
:param dest_shard_ranges: A function which should return a list of
|
||||
destination shard ranges in name order.
|
||||
:return: a generator of tuples of
|
||||
(object list, shard range, broker info dict)
|
||||
destination shard ranges sorted in the order defined by
|
||||
:meth:`~swift.common.utils.ShardRange.sort_key`.
|
||||
:return: a generator of tuples of ``(object row list, shard range,
|
||||
broker info dict)`` where ``shard_range`` may be ``None``.
|
||||
"""
|
||||
dest_shard_range_iter = dest_shard_range = None
|
||||
for objs, info in self.yield_objects(broker, src_shard_range):
|
||||
if not objs:
|
||||
# calling dest_shard_ranges() may result in a request to fetch shard
|
||||
# ranges, so first check that the broker actually has misplaced object
|
||||
# rows in the source namespace
|
||||
for _ in self.yield_objects(broker, src_shard_range, batch_size=1):
|
||||
break
|
||||
else:
|
||||
return
|
||||
|
||||
def next_or_none(it):
|
||||
try:
|
||||
return next(it)
|
||||
except StopIteration:
|
||||
return None
|
||||
|
||||
if dest_shard_range_iter is None:
|
||||
dest_shard_range_iter = iter(dest_shard_ranges())
|
||||
dest_shard_range = next_or_none(dest_shard_range_iter)
|
||||
src_shard_range_marker = src_shard_range.lower
|
||||
for dest_shard_range in dest_shard_range_iter:
|
||||
if dest_shard_range.upper <= src_shard_range.lower:
|
||||
continue
|
||||
|
||||
unplaced = False
|
||||
last_index = next_index = 0
|
||||
for obj in objs:
|
||||
if dest_shard_range is None:
|
||||
# no more destinations: yield remainder of batch and bail
|
||||
# NB there may be more batches of objects but none of them
|
||||
# will be placed so no point fetching them
|
||||
yield objs[last_index:], None, info
|
||||
return
|
||||
if obj['name'] <= dest_shard_range.lower:
|
||||
unplaced = True
|
||||
elif unplaced:
|
||||
# end of run of unplaced objects, yield them
|
||||
yield objs[last_index:next_index], None, info
|
||||
last_index = next_index
|
||||
unplaced = False
|
||||
while (dest_shard_range and
|
||||
obj['name'] > dest_shard_range.upper):
|
||||
if next_index != last_index:
|
||||
# yield the objects in current dest_shard_range
|
||||
yield (objs[last_index:next_index],
|
||||
dest_shard_range,
|
||||
info)
|
||||
last_index = next_index
|
||||
dest_shard_range = next_or_none(dest_shard_range_iter)
|
||||
next_index += 1
|
||||
if dest_shard_range.lower > src_shard_range_marker:
|
||||
# no destination for a sub-namespace of the source namespace
|
||||
sub_src_range = src_shard_range.copy(
|
||||
lower=src_shard_range_marker, upper=dest_shard_range.lower)
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, None, info
|
||||
|
||||
if next_index != last_index:
|
||||
# yield tail of current batch of objects
|
||||
# NB there may be more objects for the current
|
||||
# dest_shard_range in the next batch from yield_objects
|
||||
yield (objs[last_index:next_index],
|
||||
None if unplaced else dest_shard_range,
|
||||
info)
|
||||
sub_src_range = src_shard_range.copy(
|
||||
lower=max(dest_shard_range.lower, src_shard_range.lower),
|
||||
upper=min(dest_shard_range.upper, src_shard_range.upper))
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, dest_shard_range, info
|
||||
|
||||
src_shard_range_marker = dest_shard_range.upper
|
||||
if dest_shard_range.upper >= src_shard_range.upper:
|
||||
# the entire source namespace has been traversed
|
||||
break
|
||||
else:
|
||||
# dest_shard_ranges_iter was exhausted before reaching the end of
|
||||
# the source namespace
|
||||
sub_src_range = src_shard_range.copy(lower=src_shard_range_marker)
|
||||
for objs, info in self.yield_objects(broker, sub_src_range):
|
||||
yield objs, None, info
|
||||
|
||||
def _post_replicate_hook(self, broker, info, responses):
|
||||
# override superclass behaviour
|
||||
|
@ -427,13 +427,13 @@ class TestSharder(BaseTestSharder):
|
||||
'container-sharder-6021-ic')
|
||||
|
||||
def _assert_stats(self, expected, sharder, category):
|
||||
# assertEqual doesn't work with a defaultdict
|
||||
# assertEqual doesn't work with a stats defaultdict so copy to a dict
|
||||
# before comparing
|
||||
stats = sharder.stats['sharding'][category]
|
||||
actual = {}
|
||||
for k, v in expected.items():
|
||||
actual = stats[k]
|
||||
self.assertEqual(
|
||||
v, actual, 'Expected %s but got %s for %s in %s' %
|
||||
(v, actual, k, stats))
|
||||
actual[k] = stats[k]
|
||||
self.assertEqual(expected, actual)
|
||||
return stats
|
||||
|
||||
def _assert_recon_stats(self, expected, sharder, category):
|
||||
@ -1181,6 +1181,204 @@ class TestSharder(BaseTestSharder):
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
|
||||
def test_yield_objects(self):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
|
||||
i % 2, 0) for i in range(30)]
|
||||
for obj in objects:
|
||||
broker.put_object(*obj)
|
||||
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([15, 15], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 15, [1] * 15],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# custom batch size
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range, batch_size=10)]
|
||||
self.assertEqual([10, 5, 10, 5], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 10, [0] * 5, [1] * 10, [1] * 5],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# restricted source range
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o10', upper='o20')
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([5, 5], [len(b) for b in batches])
|
||||
self.assertEqual([[0] * 5, [1] * 5],
|
||||
[[o['deleted'] for o in b] for b in batches])
|
||||
|
||||
# null source range
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower=ShardRange.MAX)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([], batches)
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
upper=ShardRange.MIN)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects(broker, src_range)]
|
||||
self.assertEqual([], batches)
|
||||
|
||||
def test_yield_objects_to_shard_range_no_objects(self):
|
||||
# verify that dest_shard_ranges func is not called if the source
|
||||
# broker has no objects
|
||||
broker = self._make_broker()
|
||||
dest_shard_ranges = mock.MagicMock()
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
batches = [b for b, _ in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([], batches)
|
||||
dest_shard_ranges.assert_not_called()
|
||||
|
||||
def test_yield_objects_to_shard_range(self):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
('o%02d' % i, self.ts_encoded(), 10, 'text/plain', 'etag_a',
|
||||
i % 2, 0) for i in range(30)]
|
||||
for obj in objects:
|
||||
broker.put_object(*obj)
|
||||
orig_info = broker.get_info()
|
||||
# yield_objects annotates the info dict...
|
||||
orig_info['max_row'] = 30
|
||||
dest_ranges = [
|
||||
ShardRange('shard/0', Timestamp.now(), upper='o09'),
|
||||
ShardRange('shard/1', Timestamp.now(), lower='o09', upper='o19'),
|
||||
ShardRange('shard/2', Timestamp.now(), lower='o19'),
|
||||
]
|
||||
|
||||
# complete overlap of src and dest, multiple batches per dest shard
|
||||
# range per deleted/not deleted
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={'cleave_row_batch_size': 4}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[0], dest_ranges[0],
|
||||
dest_ranges[0], dest_ranges[0],
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[0:8:2]],
|
||||
[o[0] for o in objects[8:10:2]],
|
||||
[o[0] for o in objects[1:8:2]],
|
||||
[o[0] for o in objects[9:10:2]],
|
||||
[o[0] for o in objects[10:18:2]],
|
||||
[o[0] for o in objects[18:20:2]],
|
||||
[o[0] for o in objects[11:18:2]],
|
||||
[o[0] for o in objects[19:20:2]],
|
||||
[o[0] for o in objects[20:28:2]],
|
||||
[o[0] for o in objects[28:30:2]],
|
||||
[o[0] for o in objects[21:28:2]],
|
||||
[o[0] for o in objects[29:30:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 12, [info for _, _, info in yielded])
|
||||
|
||||
# src narrower than dest
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o15', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[16:20:2]],
|
||||
[o[0] for o in objects[17:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
|
||||
|
||||
# src much narrower than dest
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o15', upper='o18')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges)
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([dest_ranges[1], dest_ranges[1]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[16:19:2]],
|
||||
[o[0] for o in objects[17:19:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 2, [info for _, _, info in yielded])
|
||||
|
||||
# dest narrower than src
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o05', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:])
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None, None,
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
dest_ranges[2], dest_ranges[2]],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[6:10:2]],
|
||||
[o[0] for o in objects[7:10:2]],
|
||||
[o[0] for o in objects[10:20:2]],
|
||||
[o[0] for o in objects[11:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
|
||||
|
||||
# dest much narrower than src
|
||||
src_range = ShardRange('dont/care', Timestamp.now(),
|
||||
lower='o05', upper='o25')
|
||||
dest_shard_ranges = mock.MagicMock(return_value=dest_ranges[1:2])
|
||||
with self._mock_sharder(conf={}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None, None,
|
||||
dest_ranges[1], dest_ranges[1],
|
||||
None, None],
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[6:10:2]],
|
||||
[o[0] for o in objects[7:10:2]],
|
||||
[o[0] for o in objects[10:20:2]],
|
||||
[o[0] for o in objects[11:20:2]],
|
||||
[o[0] for o in objects[20:26:2]],
|
||||
[o[0] for o in objects[21:26:2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 6, [info for _, _, info in yielded])
|
||||
|
||||
# no dest, source is entire namespace, multiple batches
|
||||
src_range = ShardRange('dont/care', Timestamp.now())
|
||||
dest_shard_ranges = mock.MagicMock(return_value=[])
|
||||
with self._mock_sharder(conf={'cleave_row_batch_size': 10}) as sharder:
|
||||
yielded = [y for y in
|
||||
sharder.yield_objects_to_shard_range(
|
||||
broker, src_range, dest_shard_ranges)]
|
||||
self.assertEqual([None] * 4,
|
||||
[dest for _, dest, _ in yielded])
|
||||
self.assertEqual([[o[0] for o in objects[:20:2]],
|
||||
[o[0] for o in objects[20::2]],
|
||||
[o[0] for o in objects[1:20:2]],
|
||||
[o[0] for o in objects[21::2]]],
|
||||
[[o['name'] for o in objs] for objs, _, _ in yielded])
|
||||
self.assertEqual([orig_info] * 4, [info for _, _, info in yielded])
|
||||
|
||||
def _check_cleave_root(self, conf=None):
|
||||
broker = self._make_broker()
|
||||
objects = [
|
||||
|
Loading…
Reference in New Issue
Block a user