Merge "container-updater: Always report zero objects/bytes used for shards"
This commit is contained in:
commit
fc9b9e55c5
@ -251,6 +251,13 @@ class ContainerUpdater(Daemon):
|
|||||||
return
|
return
|
||||||
if self.account_suppressions.get(info['account'], 0) > time.time():
|
if self.account_suppressions.get(info['account'], 0) > time.time():
|
||||||
return
|
return
|
||||||
|
|
||||||
|
if not broker.is_root_container():
|
||||||
|
# Don't double-up account stats.
|
||||||
|
# The sharder should get these stats to the root container,
|
||||||
|
# and the root's updater will get them to the right account.
|
||||||
|
info['object_count'] = info['bytes_used'] = 0
|
||||||
|
|
||||||
if info['put_timestamp'] > info['reported_put_timestamp'] or \
|
if info['put_timestamp'] > info['reported_put_timestamp'] or \
|
||||||
info['delete_timestamp'] > info['reported_delete_timestamp'] \
|
info['delete_timestamp'] > info['reported_delete_timestamp'] \
|
||||||
or info['object_count'] != info['reported_object_count'] or \
|
or info['object_count'] != info['reported_object_count'] or \
|
||||||
|
@ -132,9 +132,11 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
for ipport in ipports:
|
for ipport in ipports:
|
||||||
wait_for_server_to_hangup(ipport)
|
wait_for_server_to_hangup(ipport)
|
||||||
|
|
||||||
def put_objects(self, obj_names):
|
def put_objects(self, obj_names, contents=None):
|
||||||
for obj in obj_names:
|
for obj in obj_names:
|
||||||
client.put_object(self.url, self.token, self.container_name, obj)
|
client.put_object(self.url, token=self.token,
|
||||||
|
container=self.container_name, name=obj,
|
||||||
|
contents=contents)
|
||||||
|
|
||||||
def delete_objects(self, obj_names):
|
def delete_objects(self, obj_names):
|
||||||
for obj in obj_names:
|
for obj in obj_names:
|
||||||
@ -1206,7 +1208,9 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
shard_cont_count, shard_obj_count = int_client.get_account_info(
|
shard_cont_count, shard_obj_count = int_client.get_account_info(
|
||||||
orig_shard_ranges[0].account, [204])
|
orig_shard_ranges[0].account, [204])
|
||||||
self.assertEqual(2 * repeat[0], shard_cont_count)
|
self.assertEqual(2 * repeat[0], shard_cont_count)
|
||||||
self.assertEqual(len(obj_names), shard_obj_count)
|
# the shards account should always have zero object count to avoid
|
||||||
|
# double accounting
|
||||||
|
self.assertEqual(0, shard_obj_count)
|
||||||
|
|
||||||
# checking the listing also refreshes proxy container info cache so
|
# checking the listing also refreshes proxy container info cache so
|
||||||
# that the proxy becomes aware that container is sharded and will
|
# that the proxy becomes aware that container is sharded and will
|
||||||
@ -2060,3 +2064,37 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assert_container_state(node, 'sharded', 3)
|
self.assert_container_state(node, 'sharded', 3)
|
||||||
|
|
||||||
self.assert_container_listing(obj_names)
|
self.assert_container_listing(obj_names)
|
||||||
|
|
||||||
|
def test_sharded_account_updates(self):
|
||||||
|
# verify that .shards account updates have zero object count and bytes
|
||||||
|
# to avoid double accounting
|
||||||
|
all_obj_names = self._make_object_names(self.max_shard_size)
|
||||||
|
self.put_objects(all_obj_names, contents='xyz')
|
||||||
|
# Shard the container into 2 shards
|
||||||
|
client.post_container(self.url, self.admin_token, self.container_name,
|
||||||
|
headers={'X-Container-Sharding': 'on'})
|
||||||
|
for n in self.brain.node_numbers:
|
||||||
|
self.sharders.once(
|
||||||
|
number=n, additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
# sanity checks
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
shard_ranges = self.assert_container_state(node, 'sharded', 2)
|
||||||
|
self.assert_container_delete_fails()
|
||||||
|
self.assert_container_has_shard_sysmeta()
|
||||||
|
self.assert_container_post_ok('sharded')
|
||||||
|
self.assert_container_listing(all_obj_names)
|
||||||
|
# run the updaters to get account stats updated
|
||||||
|
self.updaters.once()
|
||||||
|
# check user account stats
|
||||||
|
metadata = self.internal_client.get_account_metadata(self.account)
|
||||||
|
self.assertEqual(1, int(metadata.get('x-account-container-count')))
|
||||||
|
self.assertEqual(self.max_shard_size,
|
||||||
|
int(metadata.get('x-account-object-count')))
|
||||||
|
self.assertEqual(3 * self.max_shard_size,
|
||||||
|
int(metadata.get('x-account-bytes-used')))
|
||||||
|
# check hidden .shards account stats
|
||||||
|
metadata = self.internal_client.get_account_metadata(
|
||||||
|
shard_ranges[0].account)
|
||||||
|
self.assertEqual(2, int(metadata.get('x-account-container-count')))
|
||||||
|
self.assertEqual(0, int(metadata.get('x-account-object-count')))
|
||||||
|
self.assertEqual(0, int(metadata.get('x-account-bytes-used')))
|
||||||
|
@ -208,6 +208,7 @@ class TestContainerUpdater(unittest.TestCase):
|
|||||||
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
|
cb = ContainerBroker(os.path.join(subdir, 'hash.db'), account='a',
|
||||||
container='c')
|
container='c')
|
||||||
cb.initialize(normalize_timestamp(1), 0)
|
cb.initialize(normalize_timestamp(1), 0)
|
||||||
|
self.assertTrue(cb.is_root_container())
|
||||||
cu.run_once()
|
cu.run_once()
|
||||||
info = cb.get_info()
|
info = cb.get_info()
|
||||||
self.assertEqual(info['object_count'], 0)
|
self.assertEqual(info['object_count'], 0)
|
||||||
@ -344,5 +345,94 @@ class TestContainerUpdater(unittest.TestCase):
|
|||||||
self.assertEqual(info['reported_object_count'], 1)
|
self.assertEqual(info['reported_object_count'], 1)
|
||||||
self.assertEqual(info['reported_bytes_used'], 3)
|
self.assertEqual(info['reported_bytes_used'], 3)
|
||||||
|
|
||||||
|
def test_shard_container(self):
|
||||||
|
cu = self._get_container_updater()
|
||||||
|
cu.run_once()
|
||||||
|
containers_dir = os.path.join(self.sda1, DATADIR)
|
||||||
|
os.mkdir(containers_dir)
|
||||||
|
cu.run_once()
|
||||||
|
self.assertTrue(os.path.exists(containers_dir))
|
||||||
|
subdir = os.path.join(containers_dir, 'subdir')
|
||||||
|
os.mkdir(subdir)
|
||||||
|
cb = ContainerBroker(os.path.join(subdir, 'hash.db'),
|
||||||
|
account='.shards_a', container='c')
|
||||||
|
cb.initialize(normalize_timestamp(1), 0)
|
||||||
|
cb.set_sharding_sysmeta('Root', 'a/c')
|
||||||
|
self.assertFalse(cb.is_root_container())
|
||||||
|
cu.run_once()
|
||||||
|
info = cb.get_info()
|
||||||
|
self.assertEqual(info['object_count'], 0)
|
||||||
|
self.assertEqual(info['bytes_used'], 0)
|
||||||
|
self.assertEqual(info['reported_put_timestamp'], '0')
|
||||||
|
self.assertEqual(info['reported_delete_timestamp'], '0')
|
||||||
|
self.assertEqual(info['reported_object_count'], 0)
|
||||||
|
self.assertEqual(info['reported_bytes_used'], 0)
|
||||||
|
|
||||||
|
cb.put_object('o', normalize_timestamp(2), 3, 'text/plain',
|
||||||
|
'68b329da9893e34099c7d8ad5cb9c940')
|
||||||
|
# Fake us having already reported *bad* stats under swift 2.18.0
|
||||||
|
cb.reported('0', '0', 1, 3)
|
||||||
|
|
||||||
|
# Should fail with a bunch of connection-refused
|
||||||
|
cu.run_once()
|
||||||
|
info = cb.get_info()
|
||||||
|
self.assertEqual(info['object_count'], 1)
|
||||||
|
self.assertEqual(info['bytes_used'], 3)
|
||||||
|
self.assertEqual(info['reported_put_timestamp'], '0')
|
||||||
|
self.assertEqual(info['reported_delete_timestamp'], '0')
|
||||||
|
self.assertEqual(info['reported_object_count'], 1)
|
||||||
|
self.assertEqual(info['reported_bytes_used'], 3)
|
||||||
|
|
||||||
|
def accept(sock, addr, return_code):
|
||||||
|
try:
|
||||||
|
with Timeout(3):
|
||||||
|
inc = sock.makefile('rb')
|
||||||
|
out = sock.makefile('wb')
|
||||||
|
out.write('HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
|
||||||
|
return_code)
|
||||||
|
out.flush()
|
||||||
|
self.assertEqual(inc.readline(),
|
||||||
|
'PUT /sda1/2/.shards_a/c HTTP/1.1\r\n')
|
||||||
|
headers = {}
|
||||||
|
line = inc.readline()
|
||||||
|
while line and line != '\r\n':
|
||||||
|
headers[line.split(':')[0].lower()] = \
|
||||||
|
line.split(':')[1].strip()
|
||||||
|
line = inc.readline()
|
||||||
|
self.assertTrue('x-put-timestamp' in headers)
|
||||||
|
self.assertTrue('x-delete-timestamp' in headers)
|
||||||
|
self.assertTrue('x-object-count' in headers)
|
||||||
|
self.assertTrue('x-bytes-used' in headers)
|
||||||
|
except BaseException as err:
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
return err
|
||||||
|
return None
|
||||||
|
bindsock = listen_zero()
|
||||||
|
|
||||||
|
def spawn_accepts():
|
||||||
|
events = []
|
||||||
|
for _junk in range(2):
|
||||||
|
sock, addr = bindsock.accept()
|
||||||
|
events.append(spawn(accept, sock, addr, 201))
|
||||||
|
return events
|
||||||
|
|
||||||
|
spawned = spawn(spawn_accepts)
|
||||||
|
for dev in cu.get_account_ring().devs:
|
||||||
|
if dev is not None:
|
||||||
|
dev['port'] = bindsock.getsockname()[1]
|
||||||
|
cu.run_once()
|
||||||
|
for event in spawned.wait():
|
||||||
|
err = event.wait()
|
||||||
|
if err:
|
||||||
|
raise err
|
||||||
|
info = cb.get_info()
|
||||||
|
self.assertEqual(info['object_count'], 1)
|
||||||
|
self.assertEqual(info['bytes_used'], 3)
|
||||||
|
self.assertEqual(info['reported_put_timestamp'], '0000000001.00000')
|
||||||
|
self.assertEqual(info['reported_delete_timestamp'], '0')
|
||||||
|
self.assertEqual(info['reported_object_count'], 0)
|
||||||
|
self.assertEqual(info['reported_bytes_used'], 0)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user