Merge "sharding: Better-handle newlines in container names"

This commit is contained in:
Zuul 2020-01-05 20:02:30 +00:00 committed by Gerrit Code Review
commit fb538a9afe
17 changed files with 1059 additions and 53 deletions

View File

@ -59,19 +59,23 @@ def utf8encode(*args):
for s in args]
def native_str_keys(metadata):
def native_str_keys_and_values(metadata):
if six.PY2:
uni_keys = [k for k in metadata if isinstance(k, six.text_type)]
for k in uni_keys:
sv = metadata[k]
del metadata[k]
metadata[k.encode('utf-8')] = sv
metadata[k.encode('utf-8')] = [
x.encode('utf-8') if isinstance(x, six.text_type) else x
for x in sv]
else:
bin_keys = [k for k in metadata if isinstance(k, six.binary_type)]
for k in bin_keys:
sv = metadata[k]
del metadata[k]
metadata[k.decode('utf-8')] = sv
metadata[k.decode('utf-8')] = [
x.decode('utf-8') if isinstance(x, six.binary_type) else x
for x in sv]
ZERO_LIKE_VALUES = {None, '', 0, '0'}
@ -878,7 +882,7 @@ class DatabaseBroker(object):
metadata = self.get_raw_metadata()
if metadata:
metadata = json.loads(metadata)
native_str_keys(metadata)
native_str_keys_and_values(metadata)
else:
metadata = {}
return metadata
@ -940,7 +944,7 @@ class DatabaseBroker(object):
self.db_type)
md = row[0]
md = json.loads(md) if md else {}
native_str_keys(md)
native_str_keys_and_values(md)
except sqlite3.OperationalError as err:
if 'no such column: metadata' not in str(err):
raise

View File

@ -75,7 +75,7 @@ from six.moves import cPickle as pickle
from six.moves.configparser import (ConfigParser, NoSectionError,
NoOptionError, RawConfigParser)
from six.moves import range, http_client
from six.moves.urllib.parse import quote as _quote
from six.moves.urllib.parse import quote as _quote, unquote
from six.moves.urllib.parse import urlparse
from swift import gettext_ as _
@ -5699,6 +5699,9 @@ def get_redirect_data(response):
if 'Location' not in headers:
return None
location = urlparse(headers['Location']).path
if config_true_value(headers.get('X-Backend-Location-Is-Quoted',
'false')):
location = unquote(location)
account, container, _junk = split_path(location, 2, 3, True)
timestamp_val = headers.get('X-Backend-Redirect-Timestamp')
try:

View File

@ -22,6 +22,7 @@ from uuid import uuid4
import six
from six.moves import range
from six.moves.urllib.parse import unquote
import sqlite3
from eventlet import tpool
@ -2040,7 +2041,14 @@ class ContainerBroker(DatabaseBroker):
``container`` attributes respectively.
"""
path = self.get_sharding_sysmeta('Root')
path = self.get_sharding_sysmeta('Quoted-Root')
hdr = 'X-Container-Sysmeta-Shard-Quoted-Root'
if path:
path = unquote(path)
else:
path = self.get_sharding_sysmeta('Root')
hdr = 'X-Container-Sysmeta-Shard-Root'
if not path:
# Ensure account/container get populated
self._populate_instance_cache()
@ -2052,8 +2060,8 @@ class ContainerBroker(DatabaseBroker):
self._root_account, self._root_container = split_path(
'/' + path, 2, 2)
except ValueError:
raise ValueError("Expected X-Container-Sysmeta-Shard-Root to be "
"of the form 'account/container', got %r" % path)
raise ValueError("Expected %s to be of the form "
"'account/container', got %r" % (hdr, path))
@property
def root_account(self):

View File

@ -23,6 +23,7 @@ from swift import gettext_ as _
from eventlet import Timeout
import six
from six.moves.urllib.parse import quote
import swift.common.db
from swift.container.sync_store import ContainerSyncStore
@ -312,6 +313,11 @@ class ContainerController(BaseStorageServer):
"""
if not config_true_value(
req.headers.get('x-backend-accept-redirect', False)):
# We want to avoid fetching shard ranges for the (more
# time-sensitive) object-server update, so allow some misplaced
# objects to land between when we've started sharding and when the
# proxy learns about it. Note that this path is also used by old,
# pre-sharding updaters during a rolling upgrade.
return None
shard_ranges = broker.get_shard_ranges(
@ -324,7 +330,15 @@ class ContainerController(BaseStorageServer):
# in preference to the parent, which is the desired result.
containing_range = shard_ranges[0]
location = "/%s/%s" % (containing_range.name, obj_name)
headers = {'Location': location,
if location != quote(location) and not config_true_value(
req.headers.get('x-backend-accept-quoted-location', False)):
# Sender expects the destination to be unquoted, but it isn't safe
# to send unquoted. Eat the update for now and let the sharder
# move it later. Should only come up during rolling upgrades.
return None
headers = {'Location': quote(location),
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp':
containing_range.timestamp.internal}

View File

@ -656,7 +656,14 @@ class ContainerSharder(ContainerReplicator):
# Get the valid info into the broker.container, etc
shard_broker.get_info()
shard_broker.merge_shard_ranges(shard_range)
shard_broker.set_sharding_sysmeta('Root', root_path)
shard_broker.set_sharding_sysmeta('Quoted-Root', quote(root_path))
# NB: we *used* to do
# shard_broker.set_sharding_sysmeta('Root', root_path)
# but that isn't safe for container names with nulls or newlines (or
# possibly some other characters). We consciously *don't* make any
# attempt to set the old meta; during an upgrade, some shards may think
# they are in fact roots, but it cleans up well enough once everyone's
# upgraded.
shard_broker.update_metadata({
'X-Container-Sysmeta-Sharding':
('True', Timestamp.now().internal)})
@ -1129,8 +1136,16 @@ class ContainerSharder(ContainerReplicator):
shard_range.update_state(ShardRange.CREATED)
headers = {
'X-Backend-Storage-Policy-Index': broker.storage_policy_index,
'X-Container-Sysmeta-Shard-Root': broker.root_path,
'X-Container-Sysmeta-Shard-Quoted-Root': quote(
broker.root_path),
'X-Container-Sysmeta-Sharding': True}
# NB: we *used* to send along
# 'X-Container-Sysmeta-Shard-Root': broker.root_path
# but that isn't safe for container names with nulls or newlines
# (or possibly some other characters). We consciously *don't* make
# any attempt to set the old meta; during an upgrade, some shards
# may think they are in fact roots, but it cleans up well enough
# once everyone's upgraded.
success = self._send_shard_ranges(
shard_range.account, shard_range.container,
[shard_range], headers=headers)

View File

@ -17,6 +17,7 @@
import six
import six.moves.cPickle as pickle
from six.moves.urllib.parse import unquote
import json
import os
import multiprocessing
@ -366,7 +367,6 @@ class ObjectController(BaseStorageServer):
contdevices = [d.strip() for d in
headers_in.get('X-Container-Device', '').split(',')]
contpartition = headers_in.get('X-Container-Partition', '')
contpath = headers_in.get('X-Backend-Container-Path')
if len(conthosts) != len(contdevices):
# This shouldn't happen unless there's a bug in the proxy,
@ -379,6 +379,12 @@ class ObjectController(BaseStorageServer):
'devices': headers_in.get('X-Container-Device', '')})
return
contpath = headers_in.get('X-Backend-Quoted-Container-Path')
if contpath:
contpath = unquote(contpath)
else:
contpath = headers_in.get('X-Backend-Container-Path')
if contpath:
try:
# TODO: this is very late in request handling to be validating

View File

@ -358,6 +358,7 @@ class ObjectUpdater(Daemon):
headers_out.setdefault('X-Backend-Storage-Policy-Index',
str(int(policy)))
headers_out.setdefault('X-Backend-Accept-Redirect', 'true')
headers_out.setdefault('X-Backend-Accept-Quoted-Location', 'true')
container_path = update.get('container_path')
if container_path:
acct, cont = split_path('/' + container_path, minsegs=2)

View File

@ -343,7 +343,15 @@ class BaseObjectController(Controller):
headers[index].get('X-Container-Device'),
container['device'])
if container_path:
headers[index]['X-Backend-Container-Path'] = container_path
headers[index]['X-Backend-Quoted-Container-Path'] = quote(
container_path)
# NB: we used to send
# 'X-Backend-Container-Path': container_path
# but that isn't safe for container names with nulls or
# newlines (or possibly some other characters). We consciously
# *don't* make any attempt to set the old meta; during an
# upgrade, old object-servers will talk to the root which
# will eat the update and move it as a misplaced object.
def set_delete_at_headers(index, delete_at_node):
headers[index]['X-Delete-At-Container'] = delete_at_container

View File

@ -20,6 +20,7 @@ import uuid
from nose import SkipTest
import six
from six.moves.urllib.parse import quote
from swift.common import direct_client, utils
from swift.common.manager import Manager
@ -57,6 +58,7 @@ class ShardCollector(object):
class BaseTestContainerSharding(ReplProbeTest):
DELIM = '-'
def _maybe_skip_test(self):
try:
@ -101,10 +103,10 @@ class BaseTestContainerSharding(ReplProbeTest):
self._maybe_skip_test()
def _make_object_names(self, number):
return ['obj-%04d' % x for x in range(number)]
return ['obj%s%04d' % (self.DELIM, x) for x in range(number)]
def _setup_container_name(self):
self.container_name = 'container-%s' % uuid.uuid4()
self.container_name = 'container%s%s' % (self.DELIM, uuid.uuid4())
def setUp(self):
client.logger.setLevel(client.logging.WARNING)
@ -415,7 +417,8 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.max_shard_size // 2)]
def check_listing(objects, **params):
qs = '&'.join(['%s=%s' % param for param in params.items()])
qs = '&'.join('%s=%s' % (k, quote(str(v)))
for k, v in params.items())
headers, listing = client.get_container(
self.url, self.token, self.container_name, query_string=qs)
listing = [x['name'].encode('utf-8') if six.PY2 else x['name']
@ -468,12 +471,12 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
# delimiter
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=-')
self.assertEqual([{'subdir': 'obj-'}], listing)
query_string='delimiter=' + quote(self.DELIM))
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
headers, listing = client.get_container(
self.url, self.token, self.container_name,
query_string='delimiter=j-')
self.assertEqual([{'subdir': 'obj-'}], listing)
query_string='delimiter=j' + quote(self.DELIM))
self.assertEqual([{'subdir': 'obj' + self.DELIM}], listing)
limit = self.cluster_info['swift']['container_listing_limit']
exc = check_listing_fails(412, limit=limit + 1)
@ -546,13 +549,23 @@ class TestContainerShardingNonUTF8(BaseTestContainerSharding):
self.assert_container_post_ok('sharded')
class TestContainerShardingFunkyNames(TestContainerShardingNonUTF8):
DELIM = '\n'
def _make_object_names(self, number):
return ['obj\n%04d%%Ff' % x for x in range(number)]
def _setup_container_name(self):
self.container_name = 'container\n%%Ff\n%s' % uuid.uuid4()
class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
def _make_object_names(self, number):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
name = name.decode('utf8')
@ -563,10 +576,11 @@ class TestContainerShardingUTF8(TestContainerShardingNonUTF8):
# override default with max length name that includes non-ascii chars
super(TestContainerShardingUTF8, self)._setup_container_name()
name_length = self.cluster_info['swift']['max_container_name_length']
cont_name = self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb'
self.conainer_name = cont_name.ljust(name_length, 'x')
if six.PY2:
self.conainer_name = self.container_name.encode('utf8')
cont_name = \
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
if not six.PY2:
self.container_name = self.container_name.decode('utf8')
class TestContainerSharding(BaseTestContainerSharding):
@ -1114,7 +1128,9 @@ class TestContainerSharding(BaseTestContainerSharding):
shard_listings = self.direct_get_container(sr.account,
sr.container)
for node, (hdrs, listing) in shard_listings.items():
shard_listing_names = [o['name'] for o in listing]
shard_listing_names = [
o['name'].encode('utf-8') if six.PY2 else o['name']
for o in listing]
for obj in obj_names[4::5]:
if obj in sr:
self.assertIn(obj, shard_listing_names)
@ -1178,8 +1194,9 @@ class TestContainerSharding(BaseTestContainerSharding):
expected_shards=0, exp_obj_count=0):
# checks that shard range is consistent on all nodes
root_path = '%s/%s' % (self.account, self.container_name)
exp_shard_hdrs = {'X-Container-Sysmeta-Shard-Root': root_path,
'X-Backend-Sharding-State': expected_state}
exp_shard_hdrs = {
'X-Container-Sysmeta-Shard-Quoted-Root': quote(root_path),
'X-Backend-Sharding-State': expected_state}
object_counts = []
bytes_used = []
for node_id, node_data in node_data.items():
@ -2178,3 +2195,27 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual(2, int(metadata.get('x-account-container-count')))
self.assertEqual(0, int(metadata.get('x-account-object-count')))
self.assertEqual(0, int(metadata.get('x-account-bytes-used')))
class TestContainerShardingMoreUTF8(TestContainerSharding):
def _make_object_names(self, number):
# override default with names that include non-ascii chars
name_length = self.cluster_info['swift']['max_object_name_length']
obj_names = []
for x in range(number):
name = (u'obj-\u00e4\u00ea\u00ec\u00f2\u00fb-%04d' % x)
name = name.encode('utf8').ljust(name_length, b'o')
if not six.PY2:
name = name.decode('utf8')
obj_names.append(name)
return obj_names
def _setup_container_name(self):
# override default with max length name that includes non-ascii chars
super(TestContainerShardingMoreUTF8, self)._setup_container_name()
name_length = self.cluster_info['swift']['max_container_name_length']
cont_name = \
self.container_name + u'-\u00e4\u00ea\u00ec\u00f2\u00fb\u1234'
self.container_name = cont_name.encode('utf8').ljust(name_length, b'x')
if not six.PY2:
self.container_name = self.container_name.decode('utf8')

View File

@ -526,13 +526,14 @@ class TestExampleBroker(unittest.TestCase):
# This is not obvious. The actual JSON in the database is the same:
# '{"test\\u062a": ["value\\u062a", "0000000001.00000"]}'
# The only difference is what reading it produces on py2 and py3.
# We use native strings for metadata keys (see native_str_keys()),
# so keys are different.
# We use native strings for metadata (see native_str_keys_and_values),
# so types are different.
if six.PY2:
key = u'test\u062a'.encode('utf-8')
value = u'value\u062a'.encode('utf-8')
else:
key = u'test\u062a'
value = u'value\u062a'
value = u'value\u062a'
metadata = {
key: [value, Timestamp(1).internal]
}

View File

@ -339,7 +339,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertTrue(broker.empty())
@with_tempdir
def test_empty_shard_container(self, tempdir):
def test_empty_old_style_shard_container(self, tempdir):
# Test ContainerBroker.empty for a shard container where shard range
# usage should not be considered
db_path = os.path.join(
@ -418,6 +418,86 @@ class TestContainerBroker(unittest.TestCase):
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
@with_tempdir
def test_empty_shard_container(self, tempdir):
# Test ContainerBroker.empty for a shard container where shard range
# usage should not be considered
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(db_path, account='.shards_a', container='cc')
broker.initialize(next(self.ts).internal, 0)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertFalse(broker.is_root_container())
def check_object_counted(broker_to_test, broker_with_object):
obj = {'name': 'o', 'created_at': next(self.ts).internal,
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
'deleted': 0}
broker_with_object.merge_items([dict(obj)])
self.assertFalse(broker_to_test.empty())
# and delete it
obj.update({'created_at': next(self.ts).internal, 'deleted': 1})
broker_with_object.merge_items([dict(obj)])
self.assertTrue(broker_to_test.empty())
self.assertTrue(broker.empty())
check_object_counted(broker, broker)
# own shard range is not considered for object count
own_sr = broker.get_own_shard_range()
self.assertEqual(0, own_sr.object_count)
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
broker.put_object('o', next(self.ts).internal, 0, 'text/plain',
EMPTY_ETAG)
own_sr = broker.get_own_shard_range()
self.assertEqual(1, own_sr.object_count)
broker.merge_shard_ranges([own_sr])
self.assertFalse(broker.empty())
broker.delete_object('o', next(self.ts).internal)
self.assertTrue(broker.empty())
def check_shard_ranges_not_counted():
sr = ShardRange('.shards_a/shard_c', next(self.ts), object_count=0)
sr.update_meta(13, 99, meta_timestamp=next(self.ts))
for state in ShardRange.STATES:
sr.update_state(state, state_timestamp=next(self.ts))
broker.merge_shard_ranges([sr])
self.assertTrue(broker.empty())
# empty other shard ranges do not influence result
sr.update_meta(0, 0, meta_timestamp=next(self.ts))
for state in ShardRange.STATES:
sr.update_state(state, state_timestamp=next(self.ts))
broker.merge_shard_ranges([sr])
self.assertTrue(broker.empty())
check_shard_ranges_not_counted()
# move to sharding state
broker.enable_sharding(next(self.ts))
self.assertTrue(broker.set_sharding_state())
# check object in retiring db is considered
check_object_counted(broker, broker.get_brokers()[0])
self.assertTrue(broker.empty())
# as well as misplaced objects in fresh db
check_object_counted(broker, broker)
check_shard_ranges_not_counted()
# move to sharded state
self.assertTrue(broker.set_sharded_state())
self.assertTrue(broker.empty())
check_object_counted(broker, broker)
check_shard_ranges_not_counted()
# own shard range still has no influence
own_sr = broker.get_own_shard_range()
own_sr.update_meta(3, 4, meta_timestamp=next(self.ts))
broker.merge_shard_ranges([own_sr])
self.assertTrue(broker.empty())
def test_reclaim(self):
broker = ContainerBroker(':memory:', account='test_account',
container='test_container')
@ -3361,7 +3441,7 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual('myaccount/mycontainer', broker.path)
@with_tempdir
def test_root_account_container_path(self, tempdir):
def test_old_style_root_account_container_path(self, tempdir):
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
@ -3442,6 +3522,88 @@ class TestContainerBroker(unittest.TestCase):
check_validation('/root_a/root_c/blah')
check_validation('/')
@with_tempdir
def test_root_account_container_path(self, tempdir):
db_path = os.path.join(tempdir, 'container.db')
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
broker.initialize(next(self.ts).internal, 1)
# make sure we can cope with unitialized account and container
broker.account = broker.container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
self.assertEqual('root_a', broker.account) # sanity check
self.assertEqual('root_c', broker.container) # sanity check
# we don't expect root containers to have this sysmeta set but if it is
# the broker should still behave like a root container
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
('root_a/root_c', next(self.ts).internal)}
broker = ContainerBroker(
db_path, account='root_a', container='root_c')
broker.update_metadata(metadata)
broker.account = broker.container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# if root is marked deleted, it still considers itself to be a root
broker.delete_db(next(self.ts).internal)
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# check the values are not just being cached
broker = ContainerBroker(db_path)
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertTrue(broker.is_root_container())
# check a shard container
db_path = os.path.join(tempdir, 'shard_container.db')
broker = ContainerBroker(
db_path, account='.shards_root_a', container='c_shard')
broker.initialize(next(self.ts).internal, 1)
# now the metadata is significant...
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
('root_a/root_c', next(self.ts).internal)}
broker.update_metadata(metadata)
broker.account = broker.container = None
broker._root_account = broker._root_container = None
self.assertEqual('root_a', broker.root_account)
self.assertEqual('root_c', broker.root_container)
self.assertEqual('root_a/root_c', broker.root_path)
self.assertFalse(broker.is_root_container())
# check validation
def check_validation(root_value):
metadata = {
'X-Container-Sysmeta-Shard-Quoted-Root':
(root_value, next(self.ts).internal)}
broker.update_metadata(metadata)
broker.account = broker.container = None
broker._root_account = broker._root_container = None
with self.assertRaises(ValueError) as cm:
broker.root_account
self.assertIn('Expected X-Container-Sysmeta-Shard-Quoted-Root',
str(cm.exception))
with self.assertRaises(ValueError):
broker.root_container
check_validation('root_a')
check_validation('/root_a')
check_validation('/root_a/root_c')
check_validation('/root_a/root_c/blah')
check_validation('/')
def test_resolve_shard_range_states(self):
self.assertIsNone(ContainerBroker.resolve_shard_range_states(None))
self.assertIsNone(ContainerBroker.resolve_shard_range_states([]))
@ -4422,7 +4584,8 @@ class TestContainerBroker(unittest.TestCase):
do_test(orig_state, ts, test_state, ts_newer, test_state,
ts_newer)
def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir):
def _check_object_stats_when_old_style_sharded(
self, a, c, root_a, root_c, tempdir):
# common setup and assertions for root and shard containers
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
@ -4449,6 +4612,51 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(14, broker.get_info()['bytes_used'])
return broker
@with_tempdir
def test_object_stats_old_style_root_container(self, tempdir):
broker = self._check_object_stats_when_old_style_sharded(
'a', 'c', 'a', 'c', tempdir)
self.assertTrue(broker.is_root_container()) # sanity
self.assertTrue(broker.set_sharded_state())
self.assertEqual(120, broker.get_info()['object_count'])
self.assertEqual(1999, broker.get_info()['bytes_used'])
@with_tempdir
def test_object_stats_old_style_shard_container(self, tempdir):
broker = self._check_object_stats_when_old_style_sharded(
'.shard_a', 'c-blah', 'a', 'c', tempdir)
self.assertFalse(broker.is_root_container()) # sanity
self.assertTrue(broker.set_sharded_state())
self.assertEqual(0, broker.get_info()['object_count'])
self.assertEqual(0, broker.get_info()['bytes_used'])
def _check_object_stats_when_sharded(self, a, c, root_a, root_c, tempdir):
# common setup and assertions for root and shard containers
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(
db_path, account=a, container=c)
broker.initialize(next(self.ts).internal, 0)
broker.set_sharding_sysmeta('Quoted-Root', '%s/%s' % (root_a, root_c))
broker.merge_items([{'name': 'obj', 'size': 14, 'etag': 'blah',
'content_type': 'text/plain', 'deleted': 0,
'created_at': Timestamp.now().internal}])
self.assertEqual(1, broker.get_info()['object_count'])
self.assertEqual(14, broker.get_info()['bytes_used'])
broker.enable_sharding(next(self.ts))
self.assertTrue(broker.set_sharding_state())
sr_1 = ShardRange(
'%s/%s1' % (root_a, root_c), Timestamp.now(), lower='', upper='m',
object_count=99, bytes_used=999, state=ShardRange.ACTIVE)
sr_2 = ShardRange(
'%s/%s2' % (root_a, root_c), Timestamp.now(), lower='m', upper='',
object_count=21, bytes_used=1000, state=ShardRange.ACTIVE)
broker.merge_shard_ranges([sr_1, sr_2])
self.assertEqual(1, broker.get_info()['object_count'])
self.assertEqual(14, broker.get_info()['bytes_used'])
return broker
@with_tempdir
def test_object_stats_root_container(self, tempdir):
broker = self._check_object_stats_when_sharded(

View File

@ -924,7 +924,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_misplaced_rows_replicate_and_enqueue_from_shard(self):
def test_misplaced_rows_replicate_and_enqueue_from_old_style_shard(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
@ -1009,6 +1009,91 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_misplaced_rows_replicate_and_enqueue_from_shard(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
policy = random.choice(list(POLICIES))
broker = self._get_broker('.shards_a', 'some-other-c', node_index=0)
broker.initialize(next(ts).internal, policy.idx)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
remote_policy = random.choice([p for p in POLICIES if p is not
policy])
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
remote_broker.initialize(next(ts).internal, remote_policy.idx)
# add a misplaced row to *local* broker
obj_put_timestamp = next(ts).internal
broker.put_object(
'o', obj_put_timestamp, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 1)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# add another misplaced row to *local* broker with composite timestamp
ts_data = next(ts)
ts_ctype = next(ts)
ts_meta = next(ts)
broker.put_object(
'o2', ts_data.internal, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx,
ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# push to remote, and third node was missing (also maybe reconciler)
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
# grab the rsynced instance of remote_broker
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
# remote has misplaced rows too now
misplaced = remote_broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# and the correct policy_index and object_count
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
}
for key, value in expectations.items():
self.assertEqual(info[key], value)
# and we should have also enqueued these rows in a single reconciler,
# since we forced the object timestamps to be in the same hour.
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
# but it may not be on the same node as us anymore though...
reconciler = self._get_broker(reconciler.account,
reconciler.container, node_index=0)
self.assertEqual(reconciler.get_info()['object_count'], 2)
objects = reconciler.list_objects_iter(
10, '', None, None, None, None, storage_policy_index=0)
self.assertEqual(len(objects), 2)
# NB: reconciler work is for the *root* container!
expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
'application/x-put', obj_put_timestamp)
self.assertEqual(objects[0], expected)
# the second object's listing has ts_meta as its last modified time
# but its full composite timestamp is in the hash field.
expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0,
'application/x-put',
encode_timestamps(ts_data, ts_ctype, ts_meta))
self.assertEqual(objects[1], expected)
# having safely enqueued to the reconciler we can advance
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_multiple_out_sync_reconciler_enqueue_normalize(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))

View File

@ -78,10 +78,25 @@ class BaseTestSharder(unittest.TestCase):
broker.initialize()
return broker
def _make_old_style_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'),
('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CLEAVED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
broker = ContainerBroker(broker.db_file, account='a', container='c')
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
def _make_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'), ('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Root', 'a/c')
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
@ -2279,7 +2294,7 @@ class TestSharder(BaseTestSharder):
'.shards_', 'shard_c', (('l', 'mid'), ('mid', 'u')))
self.assertEqual(1, broker.get_own_shard_range().deleted)
def test_identify_sharding_candidate(self):
def test_identify_sharding_old_style_candidate(self):
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
for broker in brokers:
broker.set_sharding_sysmeta('Root', 'a/c')
@ -2333,6 +2348,60 @@ class TestSharder(BaseTestSharder):
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
def test_identify_sharding_candidate(self):
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
for broker in brokers:
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
node = {'index': 2}
# containers are all empty
with self._mock_sharder() as sharder:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
expected_stats = {}
self._assert_stats(expected_stats, sharder, 'sharding_candidates')
objects = [
['obj%3d' % i, next(self.ts_iter).internal, i, 'text/plain',
'etag%s' % i, 0] for i in range(160)]
# one container has 100 objects, which is below the sharding threshold
for obj in objects[:100]:
brokers[0].put_object(*obj)
conf = {'recon_cache_path': self.tempdir}
with self._mock_sharder(conf=conf) as sharder:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
self.assertFalse(sharder.sharding_candidates)
expected_recon = {
'found': 0,
'top': []}
sharder._report_stats()
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
# reduce the sharding threshold and the container is reported
conf = {'shard_container_threshold': 100,
'recon_cache_path': self.tempdir}
with self._mock_sharder(conf=conf) as sharder:
with mock_timestamp_now() as now:
for broker in brokers:
sharder._identify_sharding_candidate(broker, node)
stats_0 = {'path': brokers[0].db_file,
'node_index': 2,
'account': 'a',
'container': 'c000',
'root': 'a/c',
'object_count': 100,
'meta_timestamp': now.internal,
'file_size': os.stat(brokers[0].db_file).st_size}
self.assertEqual([stats_0], sharder.sharding_candidates)
expected_recon = {
'found': 1,
'top': [stats_0]}
sharder._report_stats()
self._assert_recon_stats(
expected_recon, sharder, 'sharding_candidates')
# repeat with handoff node and db_file error
with self._mock_sharder(conf=conf) as sharder:
with mock.patch('os.stat', side_effect=OSError('test error')):
@ -3489,7 +3558,7 @@ class TestSharder(BaseTestSharder):
self._check_objects([expected], expected_shard_dbs[0])
self._check_objects([], broker.db_file)
def _setup_find_ranges(self, account, cont, lower, upper):
def _setup_old_style_find_ranges(self, account, cont, lower, upper):
broker = self._make_broker(account=account, container=cont)
own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(),
lower, upper)
@ -3503,6 +3572,106 @@ class TestSharder(BaseTestSharder):
broker.put_object(*obj)
return broker, objects
def _check_old_style_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertGreater(sharder.split_size, len(objects))
self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
with self._mock_sharder(
conf={'shard_container_threshold': 200}) as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(sharder.split_size, len(objects))
self.assertEqual(0, num_found)
self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_old_style_find_shard_ranges_none_found_root(self):
broker, objects = self._setup_old_style_find_ranges('a', 'c', '', '')
self._check_old_style_find_shard_ranges_none_found(broker, objects)
def test_old_style_find_shard_ranges_none_found_shard(self):
broker, objects = self._setup_old_style_find_ranges(
'.shards_a', 'c', 'lower', 'upper')
self._check_old_style_find_shard_ranges_none_found(broker, objects)
def _check_old_style_find_shard_ranges_finds_two(
self, account, cont, lower, upper):
def check_ranges():
self.assertEqual(2, len(broker.get_shard_ranges()))
expected_ranges = [
ShardRange(
ShardRange.make_path('.int_shards_a', 'c', cont, now, 0),
now, lower, objects[98][0], 99),
ShardRange(
ShardRange.make_path('.int_shards_a', 'c', cont, now, 1),
now, objects[98][0], upper, 1),
]
self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges())
# first invocation finds both ranges
broker, objects = self._setup_old_style_find_ranges(
account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_threshold': 199,
'auto_create_account_prefix': '.int_'}
) as sharder:
with mock_timestamp_now() as now:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(99, sharder.split_size)
self.assertEqual(2, num_found)
check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds none
with self._mock_sharder(conf={'shard_container_threshold': 199,
'auto_create_account_prefix': '.int_'}
) as sharder:
num_found = sharder._find_shard_ranges(broker)
self.assertEqual(0, num_found)
self.assertEqual(2, len(broker.get_shard_ranges()))
check_ranges()
expected_stats = {'attempted': 0, 'success': 0, 'failure': 0,
'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time'])
def test_old_style_find_shard_ranges_finds_two_root(self):
self._check_old_style_find_shard_ranges_finds_two('a', 'c', '', '')
def test_old_style_find_shard_ranges_finds_two_shard(self):
self._check_old_style_find_shard_ranges_finds_two(
'.shards_a', 'c_', 'l', 'u')
def _setup_find_ranges(self, account, cont, lower, upper):
broker = self._make_broker(account=account, container=cont)
own_sr = ShardRange('%s/%s' % (account, cont), Timestamp.now(),
lower, upper)
broker.merge_shard_ranges([own_sr])
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
objects = [
# some of these are misplaced objects
['obj%3d' % i, self.ts_encoded(), i, 'text/plain', 'etag%s' % i, 0]
for i in range(100)]
for obj in objects:
broker.put_object(*obj)
return broker, objects
def _check_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder:
num_found = sharder._find_shard_ranges(broker)
@ -4144,7 +4313,7 @@ class TestSharder(BaseTestSharder):
self._assert_stats(expected_stats, sharder, 'audit_root')
mocked.assert_not_called()
def test_audit_shard_container(self):
def test_audit_old_style_shard_container(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.set_sharding_sysmeta('Root', 'a/c')
# include overlaps to verify correct match for updating own shard range
@ -4282,6 +4451,144 @@ class TestSharder(BaseTestSharder):
assert_ok()
self.assertTrue(broker.is_deleted())
def test_audit_shard_container(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
# include overlaps to verify correct match for updating own shard range
shard_bounds = (
('a', 'j'), ('k', 't'), ('k', 's'), ('l', 's'), ('s', 'z'))
shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE)
shard_ranges[1].name = broker.path
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1}
def call_audit_container(exc=None):
with self._mock_sharder() as sharder:
sharder.logger = debug_logger()
with mock.patch.object(sharder, '_audit_root_container') \
as mocked, mock.patch.object(
sharder, 'int_client') as mock_swift:
mock_response = mock.MagicMock()
mock_response.headers = {'x-backend-record-type':
'shard'}
mock_response.body = json.dumps(
[dict(sr) for sr in shard_ranges])
mock_swift.make_request.return_value = mock_response
mock_swift.make_request.side_effect = exc
mock_swift.make_path = (lambda a, c:
'/v1/%s/%s' % (a, c))
sharder.reclaim_age = 0
sharder._audit_container(broker)
mocked.assert_not_called()
return sharder, mock_swift
# bad account name
broker.account = 'bad_account'
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0])
self.assertIn('account not in shards namespace', lines[0])
self.assertNotIn('root has no matching shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[1])
self.assertIn('missing own shard range', lines[1])
self.assertFalse(lines[2:])
self.assertFalse(broker.is_deleted())
# missing own shard range
broker.get_info()
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0])
self.assertIn('missing own shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self.assertFalse(lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
# create own shard range, no match in root
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
own_shard_range = broker.get_own_shard_range() # get the default
own_shard_range.lower = 'j'
own_shard_range.upper = 'k'
broker.merge_shard_ranges([own_shard_range])
sharder, mock_swift = call_audit_container()
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0])
self.assertNotIn('account not in shards namespace', lines[0])
self.assertNotIn('missing own shard range', lines[0])
self.assertIn('root has no matching shard range', lines[0])
self.assertNotIn('unable to get shard ranges from root', lines[0])
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertFalse(lines[1:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Newest': 'true',
'X-Backend-Include-Deleted': 'True',
'X-Backend-Override-Deleted': 'true'}
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
# create own shard range, failed response from root
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
own_shard_range = broker.get_own_shard_range() # get the default
own_shard_range.lower = 'j'
own_shard_range.upper = 'k'
broker.merge_shard_ranges([own_shard_range])
sharder, mock_swift = call_audit_container(
exc=internal_client.UnexpectedResponse('bad', 'resp'))
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Failed to get shard ranges', lines[0])
self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1])
self.assertNotIn('account not in shards namespace', lines[1])
self.assertNotIn('missing own shard range', lines[1])
self.assertNotIn('root has no matching shard range', lines[1])
self.assertIn('unable to get shard ranges from root', lines[1])
self._assert_stats(expected_stats, sharder, 'audit_shard')
self.assertFalse(lines[2:])
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self.assertFalse(broker.is_deleted())
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
def assert_ok():
sharder, mock_swift = call_audit_container()
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
self._assert_stats(expected_stats, sharder, 'audit_shard')
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
mock_swift.make_request.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
# make own shard range match one in root, but different state
shard_ranges[1].timestamp = Timestamp.now()
broker.merge_shard_ranges([shard_ranges[1]])
now = Timestamp.now()
shard_ranges[1].update_state(ShardRange.SHARDING, state_timestamp=now)
assert_ok()
self.assertFalse(broker.is_deleted())
# own shard range state is updated from root version
own_shard_range = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
self.assertEqual(now, own_shard_range.state_timestamp)
own_shard_range.update_state(ShardRange.SHARDED,
state_timestamp=Timestamp.now())
broker.merge_shard_ranges([own_shard_range])
assert_ok()
own_shard_range.deleted = 1
own_shard_range.timestamp = Timestamp.now()
broker.merge_shard_ranges([own_shard_range])
assert_ok()
self.assertTrue(broker.is_deleted())
def test_find_and_enable_sharding_candidates(self):
broker = self._make_broker()
broker.enable_sharding(next(self.ts_iter))
@ -4760,6 +5067,116 @@ class TestCleavingContext(BaseTestSharder):
else:
self.fail("Deleted context 'Context-%s' not found")
def test_store_old_style(self):
broker = self._make_old_style_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
last_mod = Timestamp.now()
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
with mock_timestamp_now(last_mod):
ctx.store(broker)
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
data = json.loads(broker.metadata[key][0])
expected = {'ref': old_db_id,
'cursor': 'curs',
'max_row': 12,
'cleave_to_row': 11,
'last_cleave_to_row': 2,
'cleaving_done': True,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
self.assertEqual(expected, data)
# last modified is the metadata timestamp
self.assertEqual(broker.metadata[key][1], last_mod.internal)
def test_store_add_row_load_old_style(self):
# adding row to older db changes only max_row in the context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
old_max_row = old_broker.get_max_row()
self.assertEqual(1, old_max_row) # sanity check
ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True)
ctx.store(broker)
# adding a row changes max row
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
new_ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, new_ctx.ref)
self.assertEqual('curs', new_ctx.cursor)
self.assertEqual(2, new_ctx.max_row)
self.assertEqual(1, new_ctx.cleave_to_row)
self.assertEqual(0, new_ctx.last_cleave_to_row)
self.assertTrue(new_ctx.misplaced_done)
self.assertTrue(new_ctx.cleaving_done)
def test_store_reclaim_load_old_style(self):
# reclaiming rows from older db does not change context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
old_broker.merge_items([old_broker._record_to_dict(
('obj', next(self.ts_iter).internal, 0, 'text/plain', 'etag', 1))])
old_max_row = old_broker.get_max_row()
self.assertEqual(1, old_max_row) # sanity check
ctx = CleavingContext(old_db_id, 'curs', 1, 1, 0, True, True)
ctx.store(broker)
self.assertEqual(
1, len(old_broker.get_objects()))
now = next(self.ts_iter).internal
broker.get_brokers()[0].reclaim(now, now)
self.assertFalse(old_broker.get_objects())
new_ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, new_ctx.ref)
self.assertEqual('curs', new_ctx.cursor)
self.assertEqual(1, new_ctx.max_row)
self.assertEqual(1, new_ctx.cleave_to_row)
self.assertEqual(0, new_ctx.last_cleave_to_row)
self.assertTrue(new_ctx.misplaced_done)
self.assertTrue(new_ctx.cleaving_done)
def test_store_modify_db_id_load_old_style(self):
# changing id changes ref, so results in a fresh context
broker = self._make_old_style_sharding_broker()
old_broker = broker.get_brokers()[0]
old_db_id = old_broker.get_info()['id']
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True)
ctx.store(broker)
old_broker.newid('fake_remote_id')
new_db_id = old_broker.get_info()['id']
self.assertNotEqual(old_db_id, new_db_id)
new_ctx = CleavingContext.load(broker)
self.assertEqual(new_db_id, new_ctx.ref)
self.assertEqual('', new_ctx.cursor)
# note max_row is dynamically updated during load
self.assertEqual(-1, new_ctx.max_row)
self.assertEqual(None, new_ctx.cleave_to_row)
self.assertEqual(None, new_ctx.last_cleave_to_row)
self.assertFalse(new_ctx.misplaced_done)
self.assertFalse(new_ctx.cleaving_done)
def test_load_modify_store_load_old_style(self):
broker = self._make_old_style_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, ctx.ref)
self.assertEqual('', ctx.cursor) # sanity check
ctx.cursor = 'curs'
ctx.misplaced_done = True
ctx.store(broker)
ctx = CleavingContext.load(broker)
self.assertEqual(old_db_id, ctx.ref)
self.assertEqual('curs', ctx.cursor)
self.assertTrue(ctx.misplaced_done)
def test_store(self):
broker = self._make_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']

View File

@ -350,7 +350,7 @@ class TestContainerUpdater(unittest.TestCase):
self.assertEqual(info['reported_object_count'], 1)
self.assertEqual(info['reported_bytes_used'], 3)
def test_shard_container(self):
def test_old_style_shard_container(self):
cu = self._get_container_updater()
cu.run_once()
containers_dir = os.path.join(self.sda1, DATADIR)
@ -439,5 +439,94 @@ class TestContainerUpdater(unittest.TestCase):
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
def test_shard_container(self):
cu = self._get_container_updater()
cu.run_once()
containers_dir = os.path.join(self.sda1, DATADIR)
os.mkdir(containers_dir)
cu.run_once()
self.assertTrue(os.path.exists(containers_dir))
subdir = os.path.join(containers_dir, 'subdir')
os.mkdir(subdir)
cb = ContainerBroker(os.path.join(subdir, 'hash.db'),
account='.shards_a', container='c')
cb.initialize(normalize_timestamp(1), 0)
cb.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertFalse(cb.is_root_container())
cu.run_once()
info = cb.get_info()
self.assertEqual(info['object_count'], 0)
self.assertEqual(info['bytes_used'], 0)
self.assertEqual(info['reported_put_timestamp'], '0')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
cb.put_object('o', normalize_timestamp(2), 3, 'text/plain',
'68b329da9893e34099c7d8ad5cb9c940')
# Fake us having already reported *bad* stats under swift 2.18.0
cb.reported('0', '0', 1, 3)
# Should fail with a bunch of connection-refused
cu.run_once()
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
self.assertEqual(info['reported_put_timestamp'], '0')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 1)
self.assertEqual(info['reported_bytes_used'], 3)
def accept(sock, addr, return_code):
try:
with Timeout(3):
inc = sock.makefile('rb')
out = sock.makefile('wb')
out.write(b'HTTP/1.1 %d OK\r\nContent-Length: 0\r\n\r\n' %
return_code)
out.flush()
self.assertEqual(inc.readline(),
b'PUT /sda1/2/.shards_a/c HTTP/1.1\r\n')
headers = {}
line = inc.readline()
while line and line != b'\r\n':
headers[line.split(b':')[0].lower()] = \
line.split(b':')[1].strip()
line = inc.readline()
self.assertIn(b'x-put-timestamp', headers)
self.assertIn(b'x-delete-timestamp', headers)
self.assertIn(b'x-object-count', headers)
self.assertIn(b'x-bytes-used', headers)
except BaseException as err:
import traceback
traceback.print_exc()
return err
return None
bindsock = listen_zero()
def spawn_accepts():
events = []
for _junk in range(2):
sock, addr = bindsock.accept()
events.append(spawn(accept, sock, addr, 201))
return events
spawned = spawn(spawn_accepts)
for dev in cu.get_account_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]
cu.run_once()
for event in spawned.wait():
err = event.wait()
if err:
raise err
info = cb.get_info()
self.assertEqual(info['object_count'], 1)
self.assertEqual(info['bytes_used'], 3)
self.assertEqual(info['reported_put_timestamp'], '0000000001.00000')
self.assertEqual(info['reported_delete_timestamp'], '0')
self.assertEqual(info['reported_object_count'], 0)
self.assertEqual(info['reported_bytes_used'], 0)
if __name__ == '__main__':
unittest.main()

View File

@ -1066,6 +1066,7 @@ class TestObjectController(unittest.TestCase):
# User-Agent is updated.
expected_post_headers['User-Agent'] = 'object-updater %s' % os.getpid()
expected_post_headers['X-Backend-Accept-Redirect'] = 'true'
expected_post_headers['X-Backend-Accept-Quoted-Location'] = 'true'
self.assertDictEqual(expected_post_headers, actual_headers)
self.assertFalse(
os.listdir(os.path.join(
@ -1078,7 +1079,8 @@ class TestObjectController(unittest.TestCase):
self._test_PUT_then_POST_async_pendings(
POLICIES[1], update_etag='override_etag')
def _check_PUT_redirected_async_pending(self, container_path=None):
def _check_PUT_redirected_async_pending(self, container_path=None,
old_style=False):
# When container update is redirected verify that the redirect location
# is persisted in the async pending file.
policy = POLICIES[0]
@ -1097,8 +1099,10 @@ class TestObjectController(unittest.TestCase):
'X-Container-Device': 'cdevice'}
if container_path:
# the proxy may include this header
put_headers['X-Backend-Container-Path'] = container_path
# the proxy may include either header
hdr = ('X-Backend-Container-Path' if old_style
else 'X-Backend-Quoted-Container-Path')
put_headers[hdr] = container_path
expected_update_path = '/cdevice/99/%s/o' % container_path
else:
expected_update_path = '/cdevice/99/a/c/o'
@ -1176,6 +1180,10 @@ class TestObjectController(unittest.TestCase):
def test_PUT_redirected_async_pending_with_container_path(self):
self._check_PUT_redirected_async_pending(container_path='.another/c')
def test_PUT_redirected_async_pending_with_old_style_container_path(self):
self._check_PUT_redirected_async_pending(
container_path='.another/c', old_style=True)
def test_POST_quarantine_zbyte(self):
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
@ -5379,7 +5387,7 @@ class TestObjectController(unittest.TestCase):
'X-Backend-Container-Update-Override-Content-Type': 'ignored',
'X-Backend-Container-Update-Override-Foo': 'ignored'})
def test_PUT_container_update_to_shard(self):
def test_PUT_container_update_to_old_style_shard(self):
# verify that alternate container update path is respected when
# included in request headers
def do_test(container_path, expected_path, expected_container_path):
@ -5469,6 +5477,96 @@ class TestObjectController(unittest.TestCase):
do_test('too/many/parts', 'a/c', None)
do_test('/leading/slash', 'a/c', None)
def test_PUT_container_update_to_shard(self):
# verify that alternate container update path is respected when
# included in request headers
def do_test(container_path, expected_path, expected_container_path):
policy = random.choice(list(POLICIES))
container_updates = []
def capture_updates(
ip, port, method, path, headers, *args, **kwargs):
container_updates.append((ip, port, method, path, headers))
pickle_async_update_args = []
def fake_pickle_async_update(*args):
pickle_async_update_args.append(args)
diskfile_mgr = self.object_controller._diskfile_router[policy]
diskfile_mgr.pickle_async_update = fake_pickle_async_update
ts_put = next(self.ts)
headers = {
'X-Timestamp': ts_put.internal,
'X-Trans-Id': '123',
'X-Container-Host': 'chost:cport',
'X-Container-Partition': 'cpartition',
'X-Container-Device': 'cdevice',
'Content-Type': 'text/plain',
'X-Object-Sysmeta-Ec-Frag-Index': 0,
'X-Backend-Storage-Policy-Index': int(policy),
}
if container_path is not None:
headers['X-Backend-Quoted-Container-Path'] = container_path
req = Request.blank('/sda1/0/a/c/o', method='PUT',
headers=headers, body='')
with mocked_http_conn(
500, give_connect=capture_updates) as fake_conn:
with fake_spawn():
resp = req.get_response(self.object_controller)
with self.assertRaises(StopIteration):
next(fake_conn.code_iter)
self.assertEqual(resp.status_int, 201)
self.assertEqual(len(container_updates), 1)
# verify expected path used in update request
ip, port, method, path, headers = container_updates[0]
self.assertEqual(ip, 'chost')
self.assertEqual(port, 'cport')
self.assertEqual(method, 'PUT')
self.assertEqual(path, '/cdevice/cpartition/%s/o' % expected_path)
# verify that the picked update *always* has root container
self.assertEqual(1, len(pickle_async_update_args))
(objdevice, account, container, obj, data, timestamp,
policy) = pickle_async_update_args[0]
self.assertEqual(objdevice, 'sda1')
self.assertEqual(account, 'a') # NB user account
self.assertEqual(container, 'c') # NB root container
self.assertEqual(obj, 'o')
self.assertEqual(timestamp, ts_put.internal)
self.assertEqual(policy, policy)
expected_data = {
'headers': HeaderKeyDict({
'X-Size': '0',
'User-Agent': 'object-server %s' % os.getpid(),
'X-Content-Type': 'text/plain',
'X-Timestamp': ts_put.internal,
'X-Trans-Id': '123',
'Referer': 'PUT http://localhost/sda1/0/a/c/o',
'X-Backend-Storage-Policy-Index': int(policy),
'X-Etag': 'd41d8cd98f00b204e9800998ecf8427e'}),
'obj': 'o',
'account': 'a',
'container': 'c',
'op': 'PUT'}
if expected_container_path:
expected_data['container_path'] = expected_container_path
self.assertEqual(expected_data, data)
do_test('a_shard/c_shard', 'a_shard/c_shard', 'a_shard/c_shard')
do_test('', 'a/c', None)
do_test(None, 'a/c', None)
# TODO: should these cases trigger a 400 response rather than
# defaulting to root path?
do_test('garbage', 'a/c', None)
do_test('/', 'a/c', None)
do_test('/no-acct', 'a/c', None)
do_test('no-cont/', 'a/c', None)
do_test('too/many/parts', 'a/c', None)
do_test('/leading/slash', 'a/c', None)
def test_container_update_async(self):
policy = random.choice(list(POLICIES))
req = Request.blank(

View File

@ -711,19 +711,24 @@ class TestObjectUpdater(unittest.TestCase):
'X-Backend-Storage-Policy-Index': str(int(policies[0])),
'User-Agent': 'object-updater %s' % os.getpid(),
'X-Backend-Accept-Redirect': 'true',
'X-Backend-Accept-Quoted-Location': 'true',
}
# always expect X-Backend-Accept-Redirect to be true
# always expect X-Backend-Accept-Redirect and
# X-Backend-Accept-Quoted-Location to be true
do_test(headers_out, expected, container_path='.shards_a/shard_c')
do_test(headers_out, expected)
# ...unless X-Backend-Accept-Redirect is already set
# ...unless they're already set
expected['X-Backend-Accept-Redirect'] = 'false'
expected['X-Backend-Accept-Quoted-Location'] = 'false'
headers_out_2 = dict(headers_out)
headers_out_2['X-Backend-Accept-Redirect'] = 'false'
headers_out_2['X-Backend-Accept-Quoted-Location'] = 'false'
do_test(headers_out_2, expected)
# updater should add policy header if missing
expected['X-Backend-Accept-Redirect'] = 'true'
expected['X-Backend-Accept-Quoted-Location'] = 'true'
headers_out['X-Backend-Storage-Policy-Index'] = None
do_test(headers_out, expected)
@ -747,7 +752,8 @@ class TestObjectUpdater(unittest.TestCase):
'X-Timestamp': timestamp.internal,
'X-Backend-Storage-Policy-Index': str(int(policy)),
'User-Agent': 'object-updater %s' % os.getpid(),
'X-Backend-Accept-Redirect': 'true'}
'X-Backend-Accept-Redirect': 'true',
'X-Backend-Accept-Quoted-Location': 'true'}
for request in requests:
self.assertEqual('PUT', request['method'])
self.assertDictEqual(expected_headers, request['headers'])
@ -954,9 +960,11 @@ class TestObjectUpdater(unittest.TestCase):
# 1st round of redirects, newest redirect should be chosen
(301, {'Location': '/.shards_a/c_shard_old/o',
'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}),
(301, {'Location': '/.shards_a/c_shard_new/o',
(301, {'Location': '/.shards_a/c%5Fshard%5Fnew/o',
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp': ts_redirect_2.internal}),
(301, {'Location': '/.shards_a/c_shard_old/o',
(301, {'Location': '/.shards_a/c%5Fshard%5Fold/o',
'X-Backend-Location-Is-Quoted': 'true',
'X-Backend-Redirect-Timestamp': ts_redirect_1.internal}),
# 2nd round of redirects
(301, {'Location': '/.shards_a/c_shard_newer/o',

View File

@ -3832,7 +3832,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_range.name
'X-Backend-Quoted-Container-Path': shard_range.name
},
}
check_request(request, **expectations)
@ -3943,7 +3943,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)
@ -4045,7 +4045,7 @@ class TestReplicatedObjectController(
'Host': 'localhost:80',
'Referer': '%s http://localhost/v1/a/c/o' % method,
'X-Backend-Storage-Policy-Index': '1',
'X-Backend-Container-Path': shard_ranges[1].name
'X-Backend-Quoted-Container-Path': shard_ranges[1].name
},
}
check_request(request, **expectations)