Merge "sharder: update own_sr stats explicitly"
This commit is contained in:
commit
9063ea0ac7
@ -173,7 +173,8 @@ from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
||||
CleavingContext, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, find_overlapping_ranges, \
|
||||
find_paths, rank_paths, finalize_shrinking, DEFAULT_SHARDER_CONF, \
|
||||
ContainerSharderConf, find_paths_with_gaps, combine_shard_ranges
|
||||
ContainerSharderConf, find_paths_with_gaps, combine_shard_ranges, \
|
||||
update_own_shard_range_stats
|
||||
|
||||
EXIT_SUCCESS = 0
|
||||
EXIT_ERROR = 1
|
||||
@ -380,6 +381,9 @@ def db_info(broker, args):
|
||||
if own_sr else None))
|
||||
db_state = broker.get_db_state()
|
||||
print('db_state = %s' % db_state)
|
||||
info = broker.get_info()
|
||||
print('object_count = %d' % info['object_count'])
|
||||
print('bytes_used = %d' % info['bytes_used'])
|
||||
if db_state == 'sharding':
|
||||
print('Retiring db id: %s' % broker.get_brokers()[0].get_info()['id'])
|
||||
print('Cleaving context: %s' %
|
||||
@ -511,6 +515,8 @@ def _enable_sharding(broker, own_shard_range, args):
|
||||
if own_shard_range.update_state(ShardRange.SHARDING):
|
||||
own_shard_range.epoch = Timestamp.now()
|
||||
own_shard_range.state_timestamp = own_shard_range.epoch
|
||||
# initialise own_shard_range with current broker object stats...
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
|
||||
with broker.updated_timeout(args.enable_timeout):
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
|
@ -442,7 +442,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
if self.db_epoch is None:
|
||||
# never been sharded
|
||||
return UNSHARDED
|
||||
if self.db_epoch != self._own_shard_range().epoch:
|
||||
if self.db_epoch != self.get_own_shard_range().epoch:
|
||||
return UNSHARDED
|
||||
if not self.get_shard_ranges():
|
||||
return COLLAPSED
|
||||
@ -1870,7 +1870,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
marker, end_marker)
|
||||
|
||||
if fill_gaps:
|
||||
own_shard_range = self._own_shard_range()
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
if shard_ranges:
|
||||
last_upper = shard_ranges[-1].upper
|
||||
else:
|
||||
@ -1879,7 +1879,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
required_upper = min(end_marker or own_shard_range.upper,
|
||||
own_shard_range.upper)
|
||||
if required_upper > last_upper:
|
||||
filler_sr = self.get_own_shard_range()
|
||||
filler_sr = own_shard_range
|
||||
filler_sr.lower = last_upper
|
||||
filler_sr.upper = required_upper
|
||||
shard_ranges.append(filler_sr)
|
||||
@ -1889,7 +1889,22 @@ class ContainerBroker(DatabaseBroker):
|
||||
|
||||
return shard_ranges
|
||||
|
||||
def _own_shard_range(self, no_default=False):
|
||||
def get_own_shard_range(self, no_default=False):
|
||||
"""
|
||||
Returns a shard range representing this broker's own shard range. If no
|
||||
such range has been persisted in the broker's shard ranges table then a
|
||||
default shard range representing the entire namespace will be returned.
|
||||
|
||||
The ``object_count`` and ``bytes_used`` of the returned shard range are
|
||||
not guaranteed to be up-to-date with the current object stats for this
|
||||
broker. Callers that require up-to-date stats should use the
|
||||
``get_info`` method.
|
||||
|
||||
:param no_default: if True and the broker's own shard range is not
|
||||
found in the shard ranges table then None is returned, otherwise a
|
||||
default shard range is returned.
|
||||
:return: an instance of :class:`~swift.common.utils.ShardRange`
|
||||
"""
|
||||
shard_ranges = self.get_shard_ranges(include_own=True,
|
||||
include_deleted=True,
|
||||
exclude_others=True)
|
||||
@ -1903,28 +1918,6 @@ class ContainerBroker(DatabaseBroker):
|
||||
state=ShardRange.ACTIVE)
|
||||
return own_shard_range
|
||||
|
||||
def get_own_shard_range(self, no_default=False):
|
||||
"""
|
||||
Returns a shard range representing this broker's own shard range. If no
|
||||
such range has been persisted in the broker's shard ranges table then a
|
||||
default shard range representing the entire namespace will be returned.
|
||||
|
||||
The returned shard range will be updated with the current object stats
|
||||
for this broker and a meta timestamp set to the current time. For these
|
||||
values to be persisted the caller must merge the shard range.
|
||||
|
||||
:param no_default: if True and the broker's own shard range is not
|
||||
found in the shard ranges table then None is returned, otherwise a
|
||||
default shard range is returned.
|
||||
:return: an instance of :class:`~swift.common.utils.ShardRange`
|
||||
"""
|
||||
own_shard_range = self._own_shard_range(no_default=no_default)
|
||||
if own_shard_range:
|
||||
info = self.get_info()
|
||||
own_shard_range.update_meta(
|
||||
info['object_count'], info['bytes_used'])
|
||||
return own_shard_range
|
||||
|
||||
def is_own_shard_range(self, shard_range):
|
||||
return shard_range.name == self.path
|
||||
|
||||
@ -1936,7 +1929,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
:param epoch: a :class:`~swift.utils.common.Timestamp`
|
||||
:return: the broker's updated own shard range.
|
||||
"""
|
||||
own_shard_range = self._own_shard_range()
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
own_shard_range.update_state(ShardRange.SHARDING, epoch)
|
||||
own_shard_range.epoch = epoch
|
||||
self.merge_shard_ranges(own_shard_range)
|
||||
@ -2232,9 +2225,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
|
||||
# Else, we're either a root or a legacy deleted shard whose sharding
|
||||
# sysmeta was deleted
|
||||
|
||||
# Use internal method so we don't try to update stats.
|
||||
own_shard_range = self._own_shard_range(no_default=True)
|
||||
own_shard_range = self.get_own_shard_range(no_default=True)
|
||||
if not own_shard_range:
|
||||
return True # Never been sharded
|
||||
|
||||
|
@ -559,6 +559,26 @@ def combine_shard_ranges(new_shard_ranges, existing_shard_ranges):
|
||||
key=ShardRange.sort_key)
|
||||
|
||||
|
||||
def update_own_shard_range_stats(broker, own_shard_range):
|
||||
"""
|
||||
Update the ``own_shard_range`` with the up-to-date object stats from
|
||||
the ``broker``.
|
||||
|
||||
Note: this method does not persist the updated ``own_shard_range``;
|
||||
callers should use ``broker.merge_shard_ranges`` if the updated stats
|
||||
need to be persisted.
|
||||
|
||||
:param broker: an instance of ``ContainerBroker``.
|
||||
:param own_shard_range: and instance of ``ShardRange``.
|
||||
:returns: ``own_shard_range`` with up-to-date ``object_count``
|
||||
and ``bytes_used``.
|
||||
"""
|
||||
info = broker.get_info()
|
||||
own_shard_range.update_meta(
|
||||
info['object_count'], info['bytes_used'])
|
||||
return own_shard_range
|
||||
|
||||
|
||||
class CleavingContext(object):
|
||||
"""
|
||||
Encapsulates metadata associated with the process of cleaving a retiring
|
||||
@ -943,6 +963,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
|
||||
def _identify_sharding_candidate(self, broker, node):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
if is_sharding_candidate(
|
||||
own_shard_range, self.shard_container_threshold):
|
||||
self.sharding_candidates.append(
|
||||
@ -957,6 +978,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
|
||||
if compactible_ranges:
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
shrink_candidate = self._make_stats_info(
|
||||
broker, node, own_shard_range)
|
||||
# The number of ranges/donors that can be shrunk if the
|
||||
@ -992,6 +1014,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
# broker to be recorded
|
||||
return
|
||||
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
info = self._make_stats_info(broker, node, own_shard_range)
|
||||
info['state'] = own_shard_range.state_text
|
||||
info['db_state'] = broker.get_db_state()
|
||||
@ -2244,11 +2267,12 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.logger.debug('tombstones in %s = %d',
|
||||
quote(broker.path), tombstones)
|
||||
own_shard_range.update_tombstones(tombstones)
|
||||
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
if own_shard_range.reported:
|
||||
# no change to the stats metadata
|
||||
return
|
||||
|
||||
# persist the reported shard metadata
|
||||
# stats metadata has been updated so persist it
|
||||
broker.merge_shard_ranges(own_shard_range)
|
||||
# now get a consistent list of own and other shard ranges
|
||||
shard_ranges = broker.get_shard_ranges(
|
||||
@ -2285,8 +2309,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
if state in (UNSHARDED, COLLAPSED):
|
||||
if is_leader and broker.is_root_container():
|
||||
# bootstrap sharding of root container
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
update_own_shard_range_stats(broker, own_shard_range)
|
||||
self._find_and_enable_sharding_candidates(
|
||||
broker, shard_ranges=[broker.get_own_shard_range()])
|
||||
broker, shard_ranges=[own_shard_range])
|
||||
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if own_shard_range.state in ShardRange.CLEAVING_STATES:
|
||||
|
@ -1864,7 +1864,7 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
||||
orig_shard_ranges[1].account, orig_shard_ranges[1].container)
|
||||
check_shard_nodes_data(
|
||||
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
||||
exp_obj_count=1)
|
||||
exp_obj_count=0)
|
||||
|
||||
# check root container
|
||||
root_nodes_data = self.direct_get_container_shard_ranges()
|
||||
@ -3183,7 +3183,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.maxDiff = None
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
|
||||
# Sadly, the first replica to start sharding is still reporting its db
|
||||
# state to be 'unsharded' because, although it has sharded, its shard
|
||||
@ -3219,7 +3219,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.assertLengthEqual(shard_ranges, len(exp_shard_ranges))
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
self.assertEqual(epoch_1, broker.db_epoch)
|
||||
self.assertIn(brokers[0].get_db_state(), (SHARDING, SHARDED))
|
||||
self.assertEqual(SHARDED, brokers[1].get_db_state())
|
||||
@ -3263,7 +3263,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.assertLengthEqual(shard_ranges, len(exp_shard_ranges))
|
||||
self.assertEqual(exp_shard_ranges, shard_ranges)
|
||||
self.assertEqual(ShardRange.SHARDED,
|
||||
broker._own_shard_range().state)
|
||||
broker.get_own_shard_range().state)
|
||||
self.assertEqual(epoch_1, broker.db_epoch)
|
||||
self.assertEqual(SHARDED, broker.get_db_state())
|
||||
|
||||
|
@ -590,6 +590,13 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
|
||||
def test_info(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
(True, Timestamp.now().internal)})
|
||||
out = StringIO()
|
||||
@ -600,6 +607,8 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
expected = ['Sharding enabled = True',
|
||||
'Own shard range: None',
|
||||
'db_state = unsharded',
|
||||
'object_count = 100',
|
||||
'bytes_used = 900',
|
||||
'Metadata:',
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected, out.getvalue().splitlines())
|
||||
@ -635,13 +644,15 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharding',
|
||||
'object_count = 100',
|
||||
'bytes_used = 900',
|
||||
'Retiring db id: %s' % retiring_db_id,
|
||||
'Cleaving context: {',
|
||||
' "cleave_to_row": null,',
|
||||
' "cleaving_done": false,',
|
||||
' "cursor": "",',
|
||||
' "last_cleave_to_row": null,',
|
||||
' "max_row": -1,',
|
||||
' "max_row": 100,',
|
||||
' "misplaced_done": false,',
|
||||
' "ranges_done": 0,',
|
||||
' "ranges_todo": 0,',
|
||||
@ -679,6 +690,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "upper": ""',
|
||||
'}',
|
||||
'db_state = sharded',
|
||||
# in sharded state the object stats are determined by the
|
||||
# shard ranges, and we haven't created any in the test...
|
||||
'object_count = 0',
|
||||
'bytes_used = 0',
|
||||
'Metadata:',
|
||||
' X-Container-Sysmeta-Sharding = True']
|
||||
self.assertEqual(expected,
|
||||
@ -1088,6 +1103,13 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
|
||||
def test_enable(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
(True, Timestamp.now().internal)})
|
||||
# no shard ranges
|
||||
@ -1110,7 +1132,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
'.shards_a', 'c', 'c', Timestamp.now(), data['index'])
|
||||
shard_ranges.append(
|
||||
ShardRange(path, Timestamp.now(), data['lower'],
|
||||
data['upper'], data['object_count']))
|
||||
data['upper'], data['object_count'], bytes_used=9))
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
@ -1126,6 +1148,10 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
self.assertEqual(['Loaded db broker for a/c'],
|
||||
err.getvalue().splitlines())
|
||||
self._assert_enabled(broker, now)
|
||||
self.assertEqual(100, broker.get_info()['object_count'])
|
||||
self.assertEqual(100, broker.get_own_shard_range().object_count)
|
||||
self.assertEqual(900, broker.get_info()['bytes_used'])
|
||||
self.assertEqual(900, broker.get_own_shard_range().bytes_used)
|
||||
|
||||
# already enabled
|
||||
out = StringIO()
|
||||
|
@ -372,7 +372,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
@ -461,7 +461,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
@ -543,7 +543,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(1, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
self.assertFalse(broker.empty())
|
||||
broker.delete_object('o', next(self.ts).internal)
|
||||
@ -4117,24 +4117,16 @@ class TestContainerBroker(unittest.TestCase):
|
||||
# fill gaps
|
||||
filler = own_shard_range.copy()
|
||||
filler.lower = 'h'
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True)
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True)
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, marker='a')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, marker='a')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='z')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='z')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
with mock_timestamp_now() as now:
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='k')
|
||||
filler.meta_timestamp = now
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='k')
|
||||
filler.upper = 'k'
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
@ -4342,14 +4334,15 @@ class TestContainerBroker(unittest.TestCase):
|
||||
db_path, account='.shards_a', container='shard_c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
# no row for own shard range - expect entire namespace default
|
||||
# no row for own shard range - expect a default own shard range
|
||||
# covering the entire namespace default
|
||||
now = Timestamp.now()
|
||||
expected = ShardRange(broker.path, now, '', '', 0, 0, now,
|
||||
state=ShardRange.ACTIVE)
|
||||
own_sr = ShardRange(broker.path, now, '', '', 0, 0, now,
|
||||
state=ShardRange.ACTIVE)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
actual = broker.get_own_shard_range(no_default=True)
|
||||
self.assertIsNone(actual)
|
||||
@ -4361,52 +4354,44 @@ class TestContainerBroker(unittest.TestCase):
|
||||
[own_sr,
|
||||
ShardRange('.a/c1', next(self.ts), 'b', 'c'),
|
||||
ShardRange('.a/c2', next(self.ts), 'c', 'd')])
|
||||
expected = ShardRange(broker.path, ts_1, 'l', 'u', 0, 0, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# check stats get updated
|
||||
# check stats are not automatically updated
|
||||
broker.put_object(
|
||||
'o1', next(self.ts).internal, 100, 'text/plain', 'etag1')
|
||||
broker.put_object(
|
||||
'o2', next(self.ts).internal, 99, 'text/plain', 'etag2')
|
||||
expected = ShardRange(
|
||||
broker.path, ts_1, 'l', 'u', 2, 199, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# check non-zero stats returned
|
||||
own_sr.update_meta(object_count=2, bytes_used=199,
|
||||
meta_timestamp=next(self.ts))
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# still returned when deleted
|
||||
own_sr.update_meta(object_count=0, bytes_used=0,
|
||||
meta_timestamp=next(self.ts))
|
||||
delete_ts = next(self.ts)
|
||||
own_sr.set_deleted(timestamp=delete_ts)
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
expected = ShardRange(
|
||||
broker.path, delete_ts, 'l', 'u', 2, 199, now, deleted=True)
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# still in table after reclaim_age
|
||||
broker.reclaim(next(self.ts).internal, next(self.ts).internal)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
# entire namespace
|
||||
ts_2 = next(self.ts)
|
||||
broker.merge_shard_ranges(
|
||||
[ShardRange(broker.path, ts_2, '', '')])
|
||||
expected = ShardRange(
|
||||
broker.path, ts_2, '', '', 2, 199, now)
|
||||
with mock.patch('swift.container.backend.Timestamp.now',
|
||||
return_value=now):
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(expected), dict(actual))
|
||||
own_sr = ShardRange(broker.path, ts_2, '', '')
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
actual = broker.get_own_shard_range()
|
||||
self.assertEqual(dict(own_sr), dict(actual))
|
||||
|
||||
@with_tempdir
|
||||
def test_enable_sharding(self, tempdir):
|
||||
|
@ -2778,7 +2778,7 @@ class TestContainerController(unittest.TestCase):
|
||||
|
||||
# listing shards don't cover entire namespace so expect an extra filler
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, shard_ranges[2].upper, ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, shard_ranges[2].upper, ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
expected = shard_ranges[:3] + [extra_shard_range]
|
||||
check_shard_GET(expected, 'a/c', params='&states=listing')
|
||||
@ -2792,7 +2792,7 @@ class TestContainerController(unittest.TestCase):
|
||||
params='&states=listing&reverse=true&end_marker=pickle')
|
||||
# updating shards don't cover entire namespace so expect a filler
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, shard_ranges[3].upper, ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, shard_ranges[3].upper, ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
expected = shard_ranges[1:4] + [extra_shard_range]
|
||||
check_shard_GET(expected, 'a/c', params='&states=updating')
|
||||
@ -2801,7 +2801,7 @@ class TestContainerController(unittest.TestCase):
|
||||
# when no listing shard ranges cover the requested namespace range then
|
||||
# filler is for entire requested namespace
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, 'treacle', ShardRange.MAX, 2, 1024,
|
||||
'a/c', ts_now, 'treacle', ShardRange.MAX, 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
check_shard_GET([extra_shard_range], 'a/c',
|
||||
params='&states=listing&marker=treacle')
|
||||
@ -2809,7 +2809,7 @@ class TestContainerController(unittest.TestCase):
|
||||
[extra_shard_range], 'a/c',
|
||||
params='&states=listing&reverse=true&end_marker=treacle')
|
||||
extra_shard_range = ShardRange(
|
||||
'a/c', ts_now, 'treacle', 'walnut', 2, 1024,
|
||||
'a/c', ts_now, 'treacle', 'walnut', 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
params = '&states=listing&marker=treacle&end_marker=walnut'
|
||||
check_shard_GET([extra_shard_range], 'a/c', params=params)
|
||||
@ -3073,7 +3073,6 @@ class TestContainerController(unittest.TestCase):
|
||||
def test_GET_shard_ranges_using_state_aliases(self):
|
||||
# make a shard container
|
||||
ts_iter = make_timestamp_iter()
|
||||
ts_now = Timestamp.now() # used when mocking Timestamp.now()
|
||||
shard_ranges = []
|
||||
lower = ''
|
||||
for state in sorted(ShardRange.STATES.keys()):
|
||||
@ -3090,8 +3089,7 @@ class TestContainerController(unittest.TestCase):
|
||||
sr for sr in shard_ranges if sr.state in expected_states]
|
||||
own_shard_range = ShardRange(path, next(ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE)
|
||||
expected.append(own_shard_range.copy(
|
||||
lower=expected[-1].upper, meta_timestamp=ts_now))
|
||||
expected.append(own_shard_range.copy(lower=expected[-1].upper))
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in expected]
|
||||
headers = {'X-Timestamp': next(ts_iter).normal}
|
||||
@ -3114,8 +3112,7 @@ class TestContainerController(unittest.TestCase):
|
||||
req = Request.blank('/sda1/p/%s?format=json%s' %
|
||||
(path, params), method='GET',
|
||||
headers={'X-Backend-Record-Type': 'shard'})
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
@ -43,7 +43,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||
find_shrinking_candidates, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences, is_shrinking_candidate, \
|
||||
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
|
||||
find_paths_with_gaps, combine_shard_ranges, find_overlapping_ranges
|
||||
find_paths_with_gaps, combine_shard_ranges, find_overlapping_ranges, \
|
||||
update_own_shard_range_stats
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
|
||||
ShardName
|
||||
@ -5001,14 +5002,13 @@ class TestSharder(BaseTestSharder):
|
||||
with mock.patch.object(
|
||||
broker, 'set_sharding_state') as mock_set_sharding_state:
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
with mock.patch.object(sharder, '_audit_container'):
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(
|
||||
no_default=True)
|
||||
mock_set_sharding_state.assert_not_called()
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||
@ -5112,12 +5112,11 @@ class TestSharder(BaseTestSharder):
|
||||
own_sr.epoch = epoch
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(
|
||||
no_default=True)
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
if epoch:
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
@ -5148,15 +5147,14 @@ class TestSharder(BaseTestSharder):
|
||||
own_sr.epoch = epoch
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
with mock_timestamp_now():
|
||||
# we're not testing rest of the process here so prevent any
|
||||
# attempt to progress shard range states
|
||||
sharder._create_shard_containers = lambda *args: 0
|
||||
sharder._process_broker(broker, node, 99)
|
||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||
|
||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
||||
dict(own_shard_range))
|
||||
self.assertEqual(dict(own_sr), dict(own_shard_range))
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||
@ -5752,14 +5750,13 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=root_ts))
|
||||
shard_ranges[1].timestamp = root_ts
|
||||
with mock_timestamp_now() as ts_now:
|
||||
with mock_timestamp_now():
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
expected = shard_ranges[1].copy()
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
@ -5854,15 +5851,14 @@ class TestSharder(BaseTestSharder):
|
||||
root_ts = next(self.ts_iter)
|
||||
shard_ranges[1].update_state(ShardRange.SHARDING,
|
||||
state_timestamp=root_ts)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
with mock_timestamp_now():
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self.assert_no_audit_messages(sharder, mock_swift)
|
||||
self.assertFalse(broker.is_deleted())
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
expected = shard_ranges[1].copy()
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
@ -6783,8 +6779,7 @@ class TestSharder(BaseTestSharder):
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
@ -6801,8 +6796,7 @@ class TestSharder(BaseTestSharder):
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
@ -6822,8 +6816,7 @@ class TestSharder(BaseTestSharder):
|
||||
# children ranges from root are NOT merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertEqual(dict(parent_sr), dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
@ -6950,8 +6943,10 @@ class TestSharder(BaseTestSharder):
|
||||
with self._mock_sharder(
|
||||
conf={'shard_container_threshold': 2}) as sharder:
|
||||
with mock_timestamp_now() as now:
|
||||
own_sr = update_own_shard_range_stats(
|
||||
broker, broker.get_own_shard_range())
|
||||
sharder._find_and_enable_sharding_candidates(
|
||||
broker, [broker.get_own_shard_range()])
|
||||
broker, [own_sr])
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||
self.assertEqual(now, own_sr.state_timestamp)
|
||||
@ -6961,8 +6956,10 @@ class TestSharder(BaseTestSharder):
|
||||
with self._mock_sharder(
|
||||
conf={'shard_container_threshold': 2}) as sharder:
|
||||
with mock_timestamp_now():
|
||||
own_sr = update_own_shard_range_stats(
|
||||
broker, broker.get_own_shard_range())
|
||||
sharder._find_and_enable_sharding_candidates(
|
||||
broker, [broker.get_own_shard_range()])
|
||||
broker, [own_sr])
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||
self.assertEqual(now, own_sr.state_timestamp)
|
||||
@ -7341,7 +7338,7 @@ class TestSharder(BaseTestSharder):
|
||||
'found': 3,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 500000,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
@ -7351,7 +7348,7 @@ class TestSharder(BaseTestSharder):
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 2500000,
|
||||
'account': brokers[C2].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C2].container,
|
||||
@ -7361,7 +7358,7 @@ class TestSharder(BaseTestSharder):
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'object_count': 2999999,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
@ -8850,6 +8847,31 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
ranges[3])},
|
||||
overlapping_ranges)
|
||||
|
||||
def test_update_own_shard_range_stats(self):
|
||||
broker = self._make_broker()
|
||||
ts = next(self.ts_iter)
|
||||
broker.merge_items([
|
||||
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 9,
|
||||
'content_type': 'application/octet-stream', 'etag': 'not-really',
|
||||
'deleted': 0, 'storage_policy_index': 0,
|
||||
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
|
||||
for i in range(100)])
|
||||
|
||||
self.assertEqual(100, broker.get_info()['object_count'])
|
||||
self.assertEqual(900, broker.get_info()['bytes_used'])
|
||||
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.bytes_used)
|
||||
# own_sr is updated...
|
||||
update_own_shard_range_stats(broker, own_sr)
|
||||
self.assertEqual(100, own_sr.object_count)
|
||||
self.assertEqual(900, own_sr.bytes_used)
|
||||
# ...but not persisted
|
||||
own_sr = broker.get_own_shard_range()
|
||||
self.assertEqual(0, own_sr.object_count)
|
||||
self.assertEqual(0, own_sr.bytes_used)
|
||||
|
||||
|
||||
class TestContainerSharderConf(unittest.TestCase):
|
||||
def test_default(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user