sharder/replicator: emit stats for DBs created or existing
The number of container partitions that are in handoff locations can be used to track progress of a container ring rebalance. However, container DBs are sometimes deliberately created in handoff locations: the sharder creates shard container DBs while cleaving and also while moving misplaced objects; the container-replicator creates DBs to feed misplaced objects to the reconciler. These DBs can distorts insights into rebalance progress. Emitting stats will help quantify any such distortion. New sharder stats are: cleaved_db_created cleaved_db_exists misplaced_db_created misplaced_db_exists New container-replicator stats are: reconciler_db_created reconciler_db_exists Change-Id: Ia43e91a545a822cace41a0e814ab6c3bd89f8402
This commit is contained in:
parent
ec2bbc0e14
commit
517738ac9a
@ -377,7 +377,10 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
:param put_timestamp: initial timestamp if broker needs to be
|
:param put_timestamp: initial timestamp if broker needs to be
|
||||||
initialized
|
initialized
|
||||||
:param storage_policy_index: the storage policy index
|
:param storage_policy_index: the storage policy index
|
||||||
:return: a :class:`swift.container.backend.ContainerBroker` instance
|
:return: a tuple of (``broker``, ``initialized``) where ``broker`` is
|
||||||
|
an instance of :class:`swift.container.backend.ContainerBroker` and
|
||||||
|
``initialized`` is True if the db file was initialized, False
|
||||||
|
otherwise.
|
||||||
"""
|
"""
|
||||||
hsh = hash_path(account, container)
|
hsh = hash_path(account, container)
|
||||||
db_dir = storage_directory(DATADIR, part, hsh)
|
db_dir = storage_directory(DATADIR, part, hsh)
|
||||||
@ -385,12 +388,14 @@ class ContainerBroker(DatabaseBroker):
|
|||||||
os.path.join(device_path, db_dir, hsh + '.db'), epoch)
|
os.path.join(device_path, db_dir, hsh + '.db'), epoch)
|
||||||
broker = ContainerBroker(db_path, account=account, container=container,
|
broker = ContainerBroker(db_path, account=account, container=container,
|
||||||
logger=logger)
|
logger=logger)
|
||||||
|
initialized = False
|
||||||
if not os.path.exists(broker.db_file):
|
if not os.path.exists(broker.db_file):
|
||||||
try:
|
try:
|
||||||
broker.initialize(put_timestamp, storage_policy_index)
|
broker.initialize(put_timestamp, storage_policy_index)
|
||||||
|
initialized = True
|
||||||
except DatabaseAlreadyExists:
|
except DatabaseAlreadyExists:
|
||||||
pass
|
pass
|
||||||
return broker
|
return broker, initialized
|
||||||
|
|
||||||
def get_db_state(self):
|
def get_db_state(self):
|
||||||
"""
|
"""
|
||||||
|
@ -191,10 +191,12 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
account = MISPLACED_OBJECTS_ACCOUNT
|
account = MISPLACED_OBJECTS_ACCOUNT
|
||||||
part = self.ring.get_part(account, container)
|
part = self.ring.get_part(account, container)
|
||||||
node = self.find_local_handoff_for_part(part)
|
node = self.find_local_handoff_for_part(part)
|
||||||
broker = ContainerBroker.create_broker(
|
broker, initialized = ContainerBroker.create_broker(
|
||||||
os.path.join(self.root, node['device']), part, account, container,
|
os.path.join(self.root, node['device']), part, account, container,
|
||||||
logger=self.logger, put_timestamp=timestamp,
|
logger=self.logger, put_timestamp=timestamp,
|
||||||
storage_policy_index=0)
|
storage_policy_index=0)
|
||||||
|
self.logger.increment('reconciler_db_created' if initialized
|
||||||
|
else 'reconciler_db_exists')
|
||||||
if self.reconciler_containers is not None:
|
if self.reconciler_containers is not None:
|
||||||
self.reconciler_containers[container] = part, broker, node['id']
|
self.reconciler_containers[container] = part, broker, node['id']
|
||||||
return broker
|
return broker
|
||||||
|
@ -1104,7 +1104,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
node = self.find_local_handoff_for_part(part)
|
node = self.find_local_handoff_for_part(part)
|
||||||
|
|
||||||
put_timestamp = Timestamp.now().internal
|
put_timestamp = Timestamp.now().internal
|
||||||
shard_broker = ContainerBroker.create_broker(
|
shard_broker, initialized = ContainerBroker.create_broker(
|
||||||
os.path.join(self.root, node['device']), part, shard_range.account,
|
os.path.join(self.root, node['device']), part, shard_range.account,
|
||||||
shard_range.container, epoch=shard_range.epoch,
|
shard_range.container, epoch=shard_range.epoch,
|
||||||
storage_policy_index=policy_index, put_timestamp=put_timestamp)
|
storage_policy_index=policy_index, put_timestamp=put_timestamp)
|
||||||
@ -1124,6 +1124,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
'X-Container-Sysmeta-Sharding':
|
'X-Container-Sysmeta-Sharding':
|
||||||
('True', Timestamp.now().internal)})
|
('True', Timestamp.now().internal)})
|
||||||
|
|
||||||
|
put_timestamp = put_timestamp if initialized else None
|
||||||
return part, shard_broker, node['id'], put_timestamp
|
return part, shard_broker, node['id'], put_timestamp
|
||||||
|
|
||||||
def _audit_root_container(self, broker):
|
def _audit_root_container(self, broker):
|
||||||
@ -1466,8 +1467,11 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if dest_shard_range not in dest_brokers:
|
if dest_shard_range not in dest_brokers:
|
||||||
part, dest_broker, node_id, _junk = self._get_shard_broker(
|
part, dest_broker, node_id, put_timestamp = \
|
||||||
|
self._get_shard_broker(
|
||||||
dest_shard_range, src_broker.root_path, policy_index)
|
dest_shard_range, src_broker.root_path, policy_index)
|
||||||
|
stat = 'db_exists' if put_timestamp is None else 'db_created'
|
||||||
|
self._increment_stat('misplaced', stat, statsd=True)
|
||||||
# save the broker info that was sampled prior to the *first*
|
# save the broker info that was sampled prior to the *first*
|
||||||
# yielded objects for this destination
|
# yielded objects for this destination
|
||||||
destination = {'part': part,
|
destination = {'part': part,
|
||||||
@ -1836,6 +1840,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
shard_part, shard_broker, node_id, put_timestamp = \
|
shard_part, shard_broker, node_id, put_timestamp = \
|
||||||
self._get_shard_broker(shard_range, broker.root_path,
|
self._get_shard_broker(shard_range, broker.root_path,
|
||||||
policy_index)
|
policy_index)
|
||||||
|
stat = 'db_exists' if put_timestamp is None else 'db_created'
|
||||||
|
self._increment_stat('cleaved', stat, statsd=True)
|
||||||
return self._cleave_shard_broker(
|
return self._cleave_shard_broker(
|
||||||
broker, cleaving_context, shard_range, own_shard_range,
|
broker, cleaving_context, shard_range, own_shard_range,
|
||||||
shard_broker, put_timestamp, shard_part, node_id)
|
shard_broker, put_timestamp, shard_part, node_id)
|
||||||
|
@ -3702,15 +3702,19 @@ class TestContainerBroker(unittest.TestCase):
|
|||||||
|
|
||||||
@with_tempdir
|
@with_tempdir
|
||||||
def test_create_broker(self, tempdir):
|
def test_create_broker(self, tempdir):
|
||||||
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c')
|
broker, init = ContainerBroker.create_broker(tempdir, 0, 'a', 'c')
|
||||||
hsh = hash_path('a', 'c')
|
hsh = hash_path('a', 'c')
|
||||||
expected_path = os.path.join(
|
expected_path = os.path.join(
|
||||||
tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db')
|
tempdir, 'containers', '0', hsh[-3:], hsh, hsh + '.db')
|
||||||
self.assertEqual(expected_path, broker.db_file)
|
self.assertEqual(expected_path, broker.db_file)
|
||||||
self.assertTrue(os.path.isfile(expected_path))
|
self.assertTrue(os.path.isfile(expected_path))
|
||||||
|
self.assertTrue(init)
|
||||||
|
broker, init = ContainerBroker.create_broker(tempdir, 0, 'a', 'c')
|
||||||
|
self.assertEqual(expected_path, broker.db_file)
|
||||||
|
self.assertFalse(init)
|
||||||
|
|
||||||
ts = Timestamp.now()
|
ts = Timestamp.now()
|
||||||
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c1',
|
broker, init = ContainerBroker.create_broker(tempdir, 0, 'a', 'c1',
|
||||||
put_timestamp=ts.internal)
|
put_timestamp=ts.internal)
|
||||||
hsh = hash_path('a', 'c1')
|
hsh = hash_path('a', 'c1')
|
||||||
expected_path = os.path.join(
|
expected_path = os.path.join(
|
||||||
@ -3719,15 +3723,17 @@ class TestContainerBroker(unittest.TestCase):
|
|||||||
self.assertTrue(os.path.isfile(expected_path))
|
self.assertTrue(os.path.isfile(expected_path))
|
||||||
self.assertEqual(ts.internal, broker.get_info()['put_timestamp'])
|
self.assertEqual(ts.internal, broker.get_info()['put_timestamp'])
|
||||||
self.assertEqual(0, broker.get_info()['storage_policy_index'])
|
self.assertEqual(0, broker.get_info()['storage_policy_index'])
|
||||||
|
self.assertTrue(init)
|
||||||
|
|
||||||
epoch = Timestamp.now()
|
epoch = Timestamp.now()
|
||||||
broker = ContainerBroker.create_broker(tempdir, 0, 'a', 'c3',
|
broker, init = ContainerBroker.create_broker(tempdir, 0, 'a', 'c3',
|
||||||
epoch=epoch)
|
epoch=epoch)
|
||||||
hsh = hash_path('a', 'c3')
|
hsh = hash_path('a', 'c3')
|
||||||
expected_path = os.path.join(
|
expected_path = os.path.join(
|
||||||
tempdir, 'containers', '0', hsh[-3:],
|
tempdir, 'containers', '0', hsh[-3:],
|
||||||
hsh, '%s_%s.db' % (hsh, epoch.internal))
|
hsh, '%s_%s.db' % (hsh, epoch.internal))
|
||||||
self.assertEqual(expected_path, broker.db_file)
|
self.assertEqual(expected_path, broker.db_file)
|
||||||
|
self.assertTrue(init)
|
||||||
|
|
||||||
@with_tempdir
|
@with_tempdir
|
||||||
def test_pending_file_name(self, tempdir):
|
def test_pending_file_name(self, tempdir):
|
||||||
|
@ -883,6 +883,10 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
daemon = self._run_once(node)
|
daemon = self._run_once(node)
|
||||||
# push to remote, and third node was missing (also maybe reconciler)
|
# push to remote, and third node was missing (also maybe reconciler)
|
||||||
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
|
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, self.logger.get_stats_counts().get('reconciler_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
self.logger.get_stats_counts().get('reconciler_db_exists'))
|
||||||
|
|
||||||
# grab the rsynced instance of remote_broker
|
# grab the rsynced instance of remote_broker
|
||||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||||
@ -902,7 +906,12 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
|
|
||||||
# and we should have also enqueued these rows in a single reconciler,
|
# and we should have also enqueued these rows in a single reconciler,
|
||||||
# since we forced the object timestamps to be in the same hour.
|
# since we forced the object timestamps to be in the same hour.
|
||||||
|
self.logger.clear()
|
||||||
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
|
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
|
||||||
|
self.assertFalse(
|
||||||
|
self.logger.get_stats_counts().get('reconciler_db_created'))
|
||||||
|
self.assertEqual(
|
||||||
|
1, self.logger.get_stats_counts().get('reconciler_db_exists'))
|
||||||
# but it may not be on the same node as us anymore though...
|
# but it may not be on the same node as us anymore though...
|
||||||
reconciler = self._get_broker(reconciler.account,
|
reconciler = self._get_broker(reconciler.account,
|
||||||
reconciler.container, node_index=0)
|
reconciler.container, node_index=0)
|
||||||
|
@ -1254,11 +1254,16 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertFalse(sharder._cleave(broker))
|
self.assertFalse(sharder._cleave(broker))
|
||||||
|
|
||||||
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'min_time': mock.ANY, 'max_time': mock.ANY}
|
'min_time': mock.ANY, 'max_time': mock.ANY,
|
||||||
|
'db_created': 1, 'db_exists': 0}
|
||||||
stats = self._assert_stats(expected, sharder, 'cleaved')
|
stats = self._assert_stats(expected, sharder, 'cleaved')
|
||||||
self.assertIsInstance(stats['min_time'], float)
|
self.assertIsInstance(stats['min_time'], float)
|
||||||
self.assertIsInstance(stats['max_time'], float)
|
self.assertIsInstance(stats['max_time'], float)
|
||||||
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('cleaved_db_exists'))
|
||||||
self.assertEqual(SHARDING, broker.get_db_state())
|
self.assertEqual(SHARDING, broker.get_db_state())
|
||||||
sharder._replicate_object.assert_called_once_with(
|
sharder._replicate_object.assert_called_once_with(
|
||||||
0, expected_shard_dbs[0], 0)
|
0, expected_shard_dbs[0], 0)
|
||||||
@ -1316,6 +1321,15 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder._replicate_object.assert_called_once_with(
|
sharder._replicate_object.assert_called_once_with(
|
||||||
0, expected_shard_dbs[1], 0)
|
0, expected_shard_dbs[1], 0)
|
||||||
|
|
||||||
|
expected = {'attempted': 1, 'success': 0, 'failure': 1,
|
||||||
|
'min_time': mock.ANY, 'max_time': mock.ANY,
|
||||||
|
'db_created': 1, 'db_exists': 0}
|
||||||
|
self._assert_stats(expected, sharder, 'cleaved')
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('cleaved_db_exists'))
|
||||||
|
|
||||||
# cleaving state is unchanged
|
# cleaving state is unchanged
|
||||||
updated_shard_ranges = broker.get_shard_ranges()
|
updated_shard_ranges = broker.get_shard_ranges()
|
||||||
self.assertEqual(4, len(updated_shard_ranges))
|
self.assertEqual(4, len(updated_shard_ranges))
|
||||||
@ -1344,11 +1358,16 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertFalse(sharder._cleave(broker))
|
self.assertFalse(sharder._cleave(broker))
|
||||||
|
|
||||||
expected = {'attempted': 2, 'success': 2, 'failure': 0,
|
expected = {'attempted': 2, 'success': 2, 'failure': 0,
|
||||||
'min_time': mock.ANY, 'max_time': mock.ANY}
|
'min_time': mock.ANY, 'max_time': mock.ANY,
|
||||||
|
'db_created': 1, 'db_exists': 1}
|
||||||
stats = self._assert_stats(expected, sharder, 'cleaved')
|
stats = self._assert_stats(expected, sharder, 'cleaved')
|
||||||
self.assertIsInstance(stats['min_time'], float)
|
self.assertIsInstance(stats['min_time'], float)
|
||||||
self.assertIsInstance(stats['max_time'], float)
|
self.assertIsInstance(stats['max_time'], float)
|
||||||
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_created'))
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_exists'))
|
||||||
|
|
||||||
self.assertEqual(SHARDING, broker.get_db_state())
|
self.assertEqual(SHARDING, broker.get_db_state())
|
||||||
sharder._replicate_object.assert_has_calls(
|
sharder._replicate_object.assert_has_calls(
|
||||||
@ -1405,11 +1424,16 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertFalse(sharder._cleave(broker))
|
self.assertFalse(sharder._cleave(broker))
|
||||||
|
|
||||||
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'min_time': mock.ANY, 'max_time': mock.ANY}
|
'min_time': mock.ANY, 'max_time': mock.ANY,
|
||||||
|
'db_created': 1, 'db_exists': 0}
|
||||||
stats = self._assert_stats(expected, sharder, 'cleaved')
|
stats = self._assert_stats(expected, sharder, 'cleaved')
|
||||||
self.assertIsInstance(stats['min_time'], float)
|
self.assertIsInstance(stats['min_time'], float)
|
||||||
self.assertIsInstance(stats['max_time'], float)
|
self.assertIsInstance(stats['max_time'], float)
|
||||||
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('cleaved_db_exists'))
|
||||||
|
|
||||||
self.assertEqual(SHARDING, broker.get_db_state())
|
self.assertEqual(SHARDING, broker.get_db_state())
|
||||||
sharder._replicate_object.assert_called_once_with(
|
sharder._replicate_object.assert_called_once_with(
|
||||||
@ -1473,11 +1497,16 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertTrue(sharder._cleave(broker))
|
self.assertTrue(sharder._cleave(broker))
|
||||||
|
|
||||||
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'min_time': mock.ANY, 'max_time': mock.ANY}
|
'min_time': mock.ANY, 'max_time': mock.ANY,
|
||||||
|
'db_created': 1, 'db_exists': 0}
|
||||||
stats = self._assert_stats(expected, sharder, 'cleaved')
|
stats = self._assert_stats(expected, sharder, 'cleaved')
|
||||||
self.assertIsInstance(stats['min_time'], float)
|
self.assertIsInstance(stats['min_time'], float)
|
||||||
self.assertIsInstance(stats['max_time'], float)
|
self.assertIsInstance(stats['max_time'], float)
|
||||||
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
self.assertLessEqual(stats['min_time'], stats['max_time'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts().get('cleaved_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('cleaved_db_exists'))
|
||||||
|
|
||||||
sharder._replicate_object.assert_called_once_with(
|
sharder._replicate_object.assert_called_once_with(
|
||||||
0, expected_shard_dbs[4], 0)
|
0, expected_shard_dbs[4], 0)
|
||||||
@ -3308,7 +3337,8 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder._move_misplaced_objects(broker)
|
sharder._move_misplaced_objects(broker)
|
||||||
sharder._replicate_object.assert_not_called()
|
sharder._replicate_object.assert_not_called()
|
||||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'found': 0, 'placed': 0, 'unplaced': 0}
|
'found': 0, 'placed': 0, 'unplaced': 0,
|
||||||
|
'db_created': 0, 'db_exists': 0}
|
||||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_found'))
|
sharder.logger.get_stats_counts().get('misplaced_found'))
|
||||||
@ -3316,6 +3346,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# sharding - no misplaced objects
|
# sharding - no misplaced objects
|
||||||
self.assertTrue(broker.set_sharding_state())
|
self.assertTrue(broker.set_sharding_state())
|
||||||
@ -3329,6 +3363,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# pretend we cleaved up to end of second shard range
|
# pretend we cleaved up to end of second shard range
|
||||||
context = CleavingContext.load(broker)
|
context = CleavingContext.load(broker)
|
||||||
@ -3344,6 +3382,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# sharding - misplaced objects
|
# sharding - misplaced objects
|
||||||
for obj in objects:
|
for obj in objects:
|
||||||
@ -3361,6 +3403,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
sharder.logger.get_stats_counts().get('misplaced_unplaced'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
self.assertFalse(os.path.exists(expected_shard_dbs[0]))
|
self.assertFalse(os.path.exists(expected_shard_dbs[0]))
|
||||||
self.assertFalse(os.path.exists(expected_shard_dbs[1]))
|
self.assertFalse(os.path.exists(expected_shard_dbs[1]))
|
||||||
self.assertFalse(os.path.exists(expected_shard_dbs[2]))
|
self.assertFalse(os.path.exists(expected_shard_dbs[2]))
|
||||||
@ -3376,12 +3422,17 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder._replicate_object.assert_called_once_with(
|
sharder._replicate_object.assert_called_once_with(
|
||||||
0, expected_shard_dbs[1], 0)
|
0, expected_shard_dbs[1], 0)
|
||||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'found': 1, 'placed': 2, 'unplaced': 0}
|
'found': 1, 'placed': 2, 'unplaced': 0,
|
||||||
|
'db_created': 1, 'db_exists': 0}
|
||||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
2, sharder.logger.get_stats_counts()['misplaced_placed'])
|
2, sharder.logger.get_stats_counts()['misplaced_placed'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts()['misplaced_db_created'])
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# check misplaced objects were moved
|
# check misplaced objects were moved
|
||||||
self._check_objects(objects[:2], expected_shard_dbs[1])
|
self._check_objects(objects[:2], expected_shard_dbs[1])
|
||||||
@ -3409,7 +3460,8 @@ class TestSharder(BaseTestSharder):
|
|||||||
with self._mock_sharder(conf={'cleave_row_batch_size': 2}) as sharder:
|
with self._mock_sharder(conf={'cleave_row_batch_size': 2}) as sharder:
|
||||||
sharder._move_misplaced_objects(broker)
|
sharder._move_misplaced_objects(broker)
|
||||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'found': 1, 'placed': 4, 'unplaced': 0}
|
'found': 1, 'placed': 4, 'unplaced': 0,
|
||||||
|
'db_created': 3, 'db_exists': 0}
|
||||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||||
sharder._replicate_object.assert_has_calls(
|
sharder._replicate_object.assert_has_calls(
|
||||||
[mock.call(0, db, 0) for db in expected_shard_dbs[2:4]],
|
[mock.call(0, db, 0) for db in expected_shard_dbs[2:4]],
|
||||||
@ -3420,6 +3472,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
4, sharder.logger.get_stats_counts()['misplaced_placed'])
|
4, sharder.logger.get_stats_counts()['misplaced_placed'])
|
||||||
|
self.assertEqual(
|
||||||
|
3, sharder.logger.get_stats_counts()['misplaced_db_created'])
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# check misplaced objects were moved
|
# check misplaced objects were moved
|
||||||
self._check_objects(new_objects, expected_shard_dbs[0])
|
self._check_objects(new_objects, expected_shard_dbs[0])
|
||||||
@ -3436,12 +3492,17 @@ class TestSharder(BaseTestSharder):
|
|||||||
sharder._move_misplaced_objects(broker)
|
sharder._move_misplaced_objects(broker)
|
||||||
sharder._replicate_object.assert_not_called()
|
sharder._replicate_object.assert_not_called()
|
||||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'found': 0, 'placed': 0, 'unplaced': 0}
|
'found': 0, 'placed': 0, 'unplaced': 0,
|
||||||
|
'db_created': 0, 'db_exists': 0}
|
||||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_found'))
|
sharder.logger.get_stats_counts().get('misplaced_found'))
|
||||||
self.assertFalse(
|
self.assertFalse(
|
||||||
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
sharder.logger.get_stats_counts().get('misplaced_placed'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_created'))
|
||||||
|
self.assertFalse(
|
||||||
|
sharder.logger.get_stats_counts().get('misplaced_db_exists'))
|
||||||
|
|
||||||
# and then more misplaced updates arrive
|
# and then more misplaced updates arrive
|
||||||
newer_objects = [
|
newer_objects = [
|
||||||
@ -3462,13 +3523,21 @@ class TestSharder(BaseTestSharder):
|
|||||||
for db in (expected_shard_dbs[0], expected_shard_dbs[-1])],
|
for db in (expected_shard_dbs[0], expected_shard_dbs[-1])],
|
||||||
any_order=True
|
any_order=True
|
||||||
)
|
)
|
||||||
|
# shard broker for first shard range was already created but not
|
||||||
|
# removed due to mocked _replicate_object so expect one created and one
|
||||||
|
# existed db stat...
|
||||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||||
'found': 1, 'placed': 3, 'unplaced': 0}
|
'found': 1, 'placed': 3, 'unplaced': 0,
|
||||||
|
'db_created': 1, 'db_exists': 1}
|
||||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
1, sharder.logger.get_stats_counts()['misplaced_found'])
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
3, sharder.logger.get_stats_counts()['misplaced_placed'])
|
3, sharder.logger.get_stats_counts()['misplaced_placed'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts()['misplaced_db_created'])
|
||||||
|
self.assertEqual(
|
||||||
|
1, sharder.logger.get_stats_counts()['misplaced_db_exists'])
|
||||||
|
|
||||||
# check new misplaced objects were moved
|
# check new misplaced objects were moved
|
||||||
self._check_objects(newer_objects[:1] + new_objects,
|
self._check_objects(newer_objects[:1] + new_objects,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user