Add sharder daemon, manage_shard_ranges tool and probe tests

The sharder daemon visits container dbs and when necessary executes
the sharding workflow on the db.

The workflow is, in overview:

- perform an audit of the container for sharding purposes.

- move any misplaced objects that do not belong in the container
  to their correct shard.

- move shard ranges from FOUND state to CREATED state by creating
  shard containers.

- move shard ranges from CREATED to CLEAVED state by cleaving objects
  to shard dbs and replicating those dbs. By default this is done in
  batches of 2 shard ranges per visit.

Additionally, when the auto_shard option is True (NOT yet recommeneded
in production), the sharder will identify shard ranges for containers
that have exceeded the threshold for sharding, and will also manage
the sharding and shrinking of shard containers.

The manage_shard_ranges tool provides a means to manually identify
shard ranges and merge them to a container in order to trigger
sharding. This is currently the recommended way to shard a container.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>

Change-Id: I7f192209d4d5580f5a0aa6838f9f04e436cf6b1f
This commit is contained in:
Matthew Oliver 2018-05-02 10:47:51 +01:00 committed by Alistair Coles
parent 4a3efe61a9
commit 2641814010
24 changed files with 9640 additions and 48 deletions

33
bin/swift-container-sharder Executable file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
# Copyright (c) 2010-2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from swift.container.sharder import ContainerSharder
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-d', '--devices',
help='Shard containers only on given devices. '
'Comma-separated list. '
'Only has effect if --once is used.')
parser.add_option('-p', '--partitions',
help='Shard containers only in given partitions. '
'Comma-separated list. '
'Only has effect if --once is used.')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ContainerSharder, conf_file, **options)

View File

@ -27,3 +27,13 @@ rsync_module = {replication_ip}::container{replication_port}
[container-auditor] [container-auditor]
[container-sync] [container-sync]
[container-sharder]
auto_shard = true
rsync_module = {replication_ip}::container{replication_port}
# This is intentionally much smaller than the default of 1,000,000 so tests
# can run in a reasonable amount of time
shard_container_threshold = 100
# The probe tests make explicit assumptions about the batch sizes
shard_scanner_batch_size = 10
cleave_batch_size = 2

View File

@ -27,3 +27,13 @@ rsync_module = {replication_ip}::container{replication_port}
[container-auditor] [container-auditor]
[container-sync] [container-sync]
[container-sharder]
auto_shard = true
rsync_module = {replication_ip}::container{replication_port}
# This is intentionally much smaller than the default of 1,000,000 so tests
# can run in a reasonable amount of time
shard_container_threshold = 100
# The probe tests make explicit assumptions about the batch sizes
shard_scanner_batch_size = 10
cleave_batch_size = 2

View File

@ -27,3 +27,13 @@ rsync_module = {replication_ip}::container{replication_port}
[container-auditor] [container-auditor]
[container-sync] [container-sync]
[container-sharder]
auto_shard = true
rsync_module = {replication_ip}::container{replication_port}
# This is intentionally much smaller than the default of 1,000,000 so tests
# can run in a reasonable amount of time
shard_container_threshold = 100
# The probe tests make explicit assumptions about the batch sizes
shard_scanner_batch_size = 10
cleave_batch_size = 2

View File

@ -27,3 +27,13 @@ rsync_module = {replication_ip}::container{replication_port}
[container-auditor] [container-auditor]
[container-sync] [container-sync]
[container-sharder]
auto_shard = true
rsync_module = {replication_ip}::container{replication_port}
# This is intentionally much smaller than the default of 1,000,000 so tests
# can run in a reasonable amount of time
shard_container_threshold = 100
# The probe tests make explicit assumptions about the batch sizes
shard_scanner_batch_size = 10
cleave_batch_size = 2

View File

@ -0,0 +1,24 @@
[DEFAULT]
[pipeline:main]
pipeline = catch_errors proxy-logging cache symlink proxy-server
[app:proxy-server]
use = egg:swift#proxy
account_autocreate = true
# See proxy-server.conf-sample for options
[filter:symlink]
use = egg:swift#symlink
# See proxy-server.conf-sample for options
[filter:cache]
use = egg:swift#memcache
# See proxy-server.conf-sample for options
[filter:proxy-logging]
use = egg:swift#proxy_logging
[filter:catch_errors]
use = egg:swift#catch_errors
# See proxy-server.conf-sample for options

View File

@ -69,6 +69,10 @@ bind_port = 6201
# Work only with ionice_class. # Work only with ionice_class.
# ionice_class = # ionice_class =
# ionice_priority = # ionice_priority =
#
# The prefix used for hidden auto-created accounts, for example accounts in
# which shard containers are created. Defaults to '.'.
# auto_create_account_prefix = .
[pipeline:main] [pipeline:main]
pipeline = healthcheck recon container-server pipeline = healthcheck recon container-server
@ -323,3 +327,117 @@ use = egg:swift#xprofile
# #
# unwind the iterator of applications # unwind the iterator of applications
# unwind = false # unwind = false
[container-sharder]
# You can override the default log routing for this app here (don't use set!):
# log_name = container-sharder
# log_facility = LOG_LOCAL0
# log_level = INFO
# log_address = /dev/log
#
# Container sharder specific settings
#
# If the auto_shard option is true then the sharder will automatically select
# containers to shard, scan for shard ranges, and select shards to shrink.
# The default is false.
# Warning: auto-sharding is still under development and should not be used in
# production; do not set this option to true in a production cluster.
# auto_shard = false
#
# When auto-sharding is enabled shard_container_threshold defines the object
# count at which a container with container-sharding enabled will start to
# shard. shard_container_threshold also indirectly determines the initial
# nominal size of shard containers, which is shard_container_threshold // 2, as
# well as determining the thresholds for shrinking and merging shard
# containers.
# shard_container_threshold = 1000000
#
# When auto-sharding is enabled shard_shrink_point defines the object count
# below which a 'donor' shard container will be considered for shrinking into
# another 'acceptor' shard container. shard_shrink_point is a percentage of
# shard_container_threshold e.g. the default value of 5 means 5% of the
# shard_container_threshold.
# shard_shrink_point = 5
#
# When auto-sharding is enabled shard_shrink_merge_point defines the maximum
# allowed size of an acceptor shard container after having a donor merged into
# it. Shard_shrink_merge_point is a percentage of shard_container_threshold.
# e.g. the default value of 75 means that the projected sum of a donor object
# count and acceptor count must be less than 75% of shard_container_threshold
# for the donor to be allowed to merge into the acceptor.
#
# For example, if the shard_container_threshold is 1 million,
# shard_shrink_point is 5, and shard_shrink_merge_point is 75 then a shard will
# be considered for shrinking if it has less than or equal to 50 thousand
# objects but will only merge into an acceptor if the combined object count
# would be less than or equal to 750 thousand objects.
# shard_shrink_merge_point = 75
#
# When auto-sharding is enabled shard_scanner_batch_size defines the maximum
# number of shard ranges that will be found each time the sharder daemon visits
# a sharding container. If necessary the sharder daemon will continue to search
# for more shard ranges each time it visits the container.
# shard_scanner_batch_size = 10
#
# cleave_batch_size defines the number of shard ranges that will be cleaved
# each time the sharder daemon visits a sharding container.
# cleave_batch_size = 2
#
# cleave_row_batch_size defines the size of batches of object rows read from a
# sharding container and merged to a shard container during cleaving.
# cleave_row_batch_size = 10000
#
# Defines the number of successfully replicated shard dbs required when
# cleaving a previously uncleaved shard range before the sharder will progress
# to the next shard range. The value should be less than or equal to the
# container ring replica count. The default of 'auto' causes the container ring
# quorum value to be used. This option only applies to the container-sharder
# replication and does not affect the number of shard container replicas that
# will eventually be replicated by the container-replicator.
# shard_replication_quorum = auto
#
# Defines the number of successfully replicated shard dbs required when
# cleaving a shard range that has been previously cleaved on another node
# before the sharder will progress to the next shard range. The value should be
# less than or equal to the container ring replica count. The default of 'auto'
# causes the shard_replication_quorum value to be used. This option only
# applies to the container-sharder replication and does not affect the number
# of shard container replicas that will eventually be replicated by the
# container-replicator.
# existing_shard_replication_quorum = auto
#
# The sharder uses an internal client to create and make requests to
# containers. The absolute path to the client config file can be configured.
# internal_client_conf_path = /etc/swift/internal-client.conf
#
# The number of time the internal client will retry requests.
# request_tries = 3
#
# Each time the sharder dumps stats to the recon cache file it includes a list
# of containers that appear to need sharding but are not yet sharding. By
# default this list is limited to the top 5 containers, ordered by object
# count. The limit may be changed by setting recon_candidates_limit to an
# integer value. A negative value implies no limit.
# recon_candidates_limit = 5
#
# Large databases tend to take a while to work with, but we want to make sure
# we write down our progress. Use a larger-than-normal broker timeout to make
# us less likely to bomb out on a LockTimeout.
# broker_timeout = 60
#
# Time in seconds to wait between sharder cycles
# interval = 30
#
# The container-sharder accepts the following configuration options as defined
# in the container-replicator section:
#
# per_diff = 1000
# max_diffs = 100
# concurrency = 8
# node_timeout = 10
# conn_timeout = 0.5
# reclaim_age = 604800
# rsync_compress = no
# rsync_module = {replication_ip}::container
# recon_cache_path = /var/cache/swift
#

View File

@ -36,6 +36,7 @@ scripts =
bin/swift-container-info bin/swift-container-info
bin/swift-container-replicator bin/swift-container-replicator
bin/swift-container-server bin/swift-container-server
bin/swift-container-sharder
bin/swift-container-sync bin/swift-container-sync
bin/swift-container-updater bin/swift-container-updater
bin/swift-container-reconciler bin/swift-container-reconciler
@ -71,6 +72,9 @@ keystone =
keystonemiddleware>=4.17.0 keystonemiddleware>=4.17.0
[entry_points] [entry_points]
console_scripts =
swift-manage-shard-ranges = swift.cli.manage_shard_ranges:main
paste.app_factory = paste.app_factory =
proxy = swift.proxy.server:app_factory proxy = swift.proxy.server:app_factory
object = swift.obj.server:app_factory object = swift.obj.server:app_factory

View File

@ -0,0 +1,370 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import argparse
import json
import sys
import time
from six.moves import input
from swift.common.utils import Timestamp, get_logger, ShardRange
from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext
def _load_and_validate_shard_data(args):
try:
with open(args.input, 'rb') as fd:
try:
data = json.load(fd)
if not isinstance(data, list):
raise ValueError('Shard data must be a list of dicts')
for k in ('lower', 'upper', 'index', 'object_count'):
for shard in data:
shard[k]
return data
except (TypeError, ValueError, KeyError) as err:
print('Failed to load valid shard range data: %r' % err,
file=sys.stderr)
exit(2)
except IOError as err:
print('Failed to open file %s: %s' % (args.input, err),
file=sys.stderr)
exit(2)
def _check_shard_ranges(own_shard_range, shard_ranges):
reasons = []
def reason(x, y):
if x != y:
reasons.append('%s != %s' % (x, y))
if not shard_ranges:
reasons.append('No shard ranges.')
else:
reason(own_shard_range.lower, shard_ranges[0].lower)
reason(own_shard_range.upper, shard_ranges[-1].upper)
for x, y in zip(shard_ranges, shard_ranges[1:]):
reason(x.upper, y.lower)
if reasons:
print('WARNING: invalid shard ranges: %s.' % reasons)
print('Aborting.')
exit(2)
def _check_own_shard_range(broker, args):
# TODO: this check is weak - if the shards prefix changes then we may not
# identify a shard container. The goal is to not inadvertently create an
# entire namespace default shard range for a shard container.
is_shard = broker.account.startswith(args.shards_account_prefix)
own_shard_range = broker.get_own_shard_range(no_default=is_shard)
if not own_shard_range:
print('WARNING: shard container missing own shard range.')
print('Aborting.')
exit(2)
return own_shard_range
def _find_ranges(broker, args, status_file=None):
start = last_report = time.time()
limit = 5 if status_file else -1
shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, limit=limit)
if shard_data:
while not last_found:
if last_report + 10 < time.time():
print('Found %d ranges in %gs; looking for more...' % (
len(shard_data), time.time() - start), file=status_file)
last_report = time.time()
# prefix doesn't matter since we aren't persisting it
found_ranges = make_shard_ranges(broker, shard_data, '.shards_')
more_shard_data, last_found = broker.find_shard_ranges(
args.rows_per_shard, existing_ranges=found_ranges, limit=5)
shard_data.extend(more_shard_data)
return shard_data, time.time() - start
def find_ranges(broker, args):
shard_data, delta_t = _find_ranges(broker, args, sys.stderr)
print(json.dumps(shard_data, sort_keys=True, indent=2))
print('Found %d ranges in %gs (total object count %s)' %
(len(shard_data), delta_t,
sum(r['object_count'] for r in shard_data)),
file=sys.stderr)
return 0
def show_shard_ranges(broker, args):
shard_ranges = broker.get_shard_ranges(
include_deleted=getattr(args, 'include_deleted', False))
shard_data = [dict(sr, state=sr.state_text)
for sr in shard_ranges]
if not shard_data:
print("No shard data found.", file=sys.stderr)
elif getattr(args, 'brief', False):
print("Existing shard ranges:", file=sys.stderr)
print(json.dumps([(sd['lower'], sd['upper']) for sd in shard_data],
sort_keys=True, indent=2))
else:
print("Existing shard ranges:", file=sys.stderr)
print(json.dumps(shard_data, sort_keys=True, indent=2))
return 0
def db_info(broker, args):
print('Sharding enabled = %s' % sharding_enabled(broker))
own_sr = broker.get_own_shard_range(no_default=True)
print('Own shard range: %s' %
(json.dumps(dict(own_sr, state=own_sr.state_text),
sort_keys=True, indent=2)
if own_sr else None))
db_state = broker.get_db_state()
print('db_state = %s' % db_state)
if db_state == 'sharding':
print('Retiring db id: %s' % broker.get_brokers()[0].get_info()['id'])
print('Cleaving context: %s' %
json.dumps(dict(CleavingContext.load(broker)),
sort_keys=True, indent=2))
print('Metadata:')
for k, (v, t) in broker.metadata.items():
print(' %s = %s' % (k, v))
def delete_shard_ranges(broker, args):
shard_ranges = broker.get_shard_ranges()
if not shard_ranges:
print("No shard ranges found to delete.")
return 0
while not args.force:
print('This will delete existing %d shard ranges.' % len(shard_ranges))
if broker.get_db_state() != UNSHARDED:
print('WARNING: Be very cautious about deleting existing shard '
'ranges. Deleting all ranges in this db does not guarantee '
'deletion of all ranges on all replicas of the db.')
print(' - this db is in state %s' % broker.get_db_state())
print(' - %d existing shard ranges have started sharding' %
[sr.state != ShardRange.FOUND
for sr in shard_ranges].count(True))
choice = input('Do you want to show the existing ranges [s], '
'delete the existing ranges [yes] '
'or quit without deleting [q]? ')
if choice == 's':
show_shard_ranges(broker, args)
continue
elif choice == 'q':
return 1
elif choice == 'yes':
break
else:
print('Please make a valid choice.')
print()
now = Timestamp.now()
for sr in shard_ranges:
sr.deleted = 1
sr.timestamp = now
broker.merge_shard_ranges(shard_ranges)
print('Deleted %s existing shard ranges.' % len(shard_ranges))
return 0
def _replace_shard_ranges(broker, args, shard_data, timeout=None):
own_shard_range = _check_own_shard_range(broker, args)
shard_ranges = make_shard_ranges(
broker, shard_data, args.shards_account_prefix)
_check_shard_ranges(own_shard_range, shard_ranges)
if args.verbose > 0:
print('New shard ranges to be injected:')
print(json.dumps([dict(sr) for sr in shard_ranges],
sort_keys=True, indent=2))
# Crank up the timeout in an effort to *make sure* this succeeds
with broker.updated_timeout(max(timeout, args.replace_timeout)):
delete_shard_ranges(broker, args)
broker.merge_shard_ranges(shard_ranges)
print('Injected %d shard ranges.' % len(shard_ranges))
print('Run container-replicator to replicate them to other nodes.')
if args.enable:
return enable_sharding(broker, args)
else:
print('Use the enable sub-command to enable sharding.')
return 0
def replace_shard_ranges(broker, args):
shard_data = _load_and_validate_shard_data(args)
return _replace_shard_ranges(broker, args, shard_data)
def find_replace_shard_ranges(broker, args):
shard_data, delta_t = _find_ranges(broker, args, sys.stdout)
# Since we're trying to one-shot this, and the previous step probably
# took a while, make the timeout for writing *at least* that long
return _replace_shard_ranges(broker, args, shard_data, timeout=delta_t)
def _enable_sharding(broker, own_shard_range, args):
if own_shard_range.update_state(ShardRange.SHARDING):
own_shard_range.epoch = Timestamp.now()
own_shard_range.state_timestamp = own_shard_range.epoch
with broker.updated_timeout(args.enable_timeout):
broker.merge_shard_ranges([own_shard_range])
broker.update_metadata({'X-Container-Sysmeta-Sharding':
('True', Timestamp.now().normal)})
return own_shard_range
def enable_sharding(broker, args):
own_shard_range = _check_own_shard_range(broker, args)
_check_shard_ranges(own_shard_range, broker.get_shard_ranges())
if own_shard_range.state == ShardRange.ACTIVE:
own_shard_range = _enable_sharding(broker, own_shard_range, args)
print('Container moved to state %r with epoch %s.' %
(own_shard_range.state_text, own_shard_range.epoch.internal))
elif own_shard_range.state == ShardRange.SHARDING:
if own_shard_range.epoch:
print('Container already in state %r with epoch %s.' %
(own_shard_range.state_text, own_shard_range.epoch.internal))
print('No action required.')
else:
print('Container already in state %r but missing epoch.' %
own_shard_range.state_text)
own_shard_range = _enable_sharding(broker, own_shard_range, args)
print('Container in state %r given epoch %s.' %
(own_shard_range.state_text, own_shard_range.epoch.internal))
else:
print('WARNING: container in state %s (should be active or sharding).'
% own_shard_range.state_text)
print('Aborting.')
return 2
print('Run container-sharder on all nodes to shard the container.')
return 0
def _add_find_args(parser):
parser.add_argument('rows_per_shard', nargs='?', type=int, default=500000)
def _add_replace_args(parser):
parser.add_argument(
'--shards_account_prefix', metavar='shards_account_prefix', type=str,
required=False, help='Prefix for shards account', default='.shards_')
parser.add_argument(
'--replace-timeout', type=int, default=600,
help='Minimum DB timeout to use when replacing shard ranges.')
parser.add_argument(
'--force', '-f', action='store_true', default=False,
help='Delete existing shard ranges; no questions asked.')
parser.add_argument(
'--enable', action='store_true', default=False,
help='Enable sharding after adding shard ranges.')
def _add_enable_args(parser):
parser.add_argument(
'--enable-timeout', type=int, default=300,
help='DB timeout to use when enabling sharding.')
def _make_parser():
parser = argparse.ArgumentParser(description='Manage shard ranges')
parser.add_argument('container_db')
parser.add_argument('--verbose', '-v', action='count',
help='Increase output verbosity')
subparsers = parser.add_subparsers(
help='Sub-command help', title='Sub-commands')
# find
find_parser = subparsers.add_parser(
'find', help='Find and display shard ranges')
_add_find_args(find_parser)
find_parser.set_defaults(func=find_ranges)
# delete
delete_parser = subparsers.add_parser(
'delete', help='Delete all existing shard ranges from db')
delete_parser.add_argument(
'--force', '-f', action='store_true', default=False,
help='Delete existing shard ranges; no questions asked.')
delete_parser.set_defaults(func=delete_shard_ranges)
# show
show_parser = subparsers.add_parser(
'show', help='Print shard range data')
show_parser.add_argument(
'--include_deleted', '-d', action='store_true', default=False,
help='Include deleted shard ranges in output.')
show_parser.add_argument(
'--brief', '-b', action='store_true', default=False,
help='Show only shard range bounds in output.')
show_parser.set_defaults(func=show_shard_ranges)
# info
info_parser = subparsers.add_parser(
'info', help='Print container db info')
info_parser.set_defaults(func=db_info)
# replace
replace_parser = subparsers.add_parser(
'replace',
help='Replace existing shard ranges. User will be prompted before '
'deleting any existing shard ranges.')
replace_parser.add_argument('input', metavar='input_file',
type=str, help='Name of file')
_add_replace_args(replace_parser)
replace_parser.set_defaults(func=replace_shard_ranges)
# find_and_replace
find_replace_parser = subparsers.add_parser(
'find_and_replace',
help='Find new shard ranges and replace existing shard ranges. '
'User will be prompted before deleting any existing shard ranges.'
)
_add_find_args(find_replace_parser)
_add_replace_args(find_replace_parser)
_add_enable_args(find_replace_parser)
find_replace_parser.set_defaults(func=find_replace_shard_ranges)
# enable
enable_parser = subparsers.add_parser(
'enable', help='Enable sharding and move db to sharding state.')
_add_enable_args(enable_parser)
enable_parser.set_defaults(func=enable_sharding)
_add_replace_args(enable_parser)
return parser
def main(args=None):
parser = _make_parser()
args = parser.parse_args(args)
logger = get_logger({}, name='ContainerBroker', log_to_console=True)
broker = ContainerBroker(args.container_db, logger=logger,
skip_commits=True)
broker.get_info()
print('Loaded db broker for %s.' % broker.path, file=sys.stderr)
return args.func(broker, args)
if __name__ == '__main__':
exit(main())

195
swift/cli/shard-info.py Normal file
View File

@ -0,0 +1,195 @@
# Copyright (c) 2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from collections import defaultdict
from swift.common import utils
from swift.common.db_replicator import roundrobin_datadirs
from swift.common.ring import ring
from swift.common.utils import Timestamp
from swift.container.backend import ContainerBroker, DATADIR
TAB = ' '
def broker_key(broker):
broker.get_info()
return broker.path
def container_type(broker):
return 'ROOT' if broker.is_root_container() else 'SHARD'
def collect_brokers(conf_path, names2nodes):
conf = utils.readconf(conf_path, 'container-replicator')
root = conf.get('devices', '/srv/node')
swift_dir = conf.get('swift_dir', '/etc/swift')
c_ring = ring.Ring(swift_dir, ring_name='container')
dirs = []
brokers = defaultdict(dict)
for node in c_ring.devs:
if node is None:
continue
datadir = os.path.join(root, node['device'], DATADIR)
if os.path.isdir(datadir):
dirs.append((datadir, node['id'], lambda *args: True))
for part, object_file, node_id in roundrobin_datadirs(dirs):
broker = ContainerBroker(object_file)
for node in c_ring.get_part_nodes(int(part)):
if node['id'] == node_id:
node_index = str(node['index'])
break
else:
node_index = 'handoff'
names2nodes[broker_key(broker)][(node_id, node_index)] = broker
return brokers
def print_broker_info(node, broker, indent_level=0):
indent = indent_level * TAB
info = broker.get_info()
raw_info = broker._get_info()
deleted_at = float(info['delete_timestamp'])
if deleted_at:
deleted_at = Timestamp(info['delete_timestamp']).isoformat
else:
deleted_at = ' - '
print('%s(%s) %s, objs: %s, bytes: %s, actual_objs: %s, put: %s, '
'deleted: %s' %
(indent, node[1][0], broker.get_db_state(),
info['object_count'], info['bytes_used'], raw_info['object_count'],
Timestamp(info['put_timestamp']).isoformat, deleted_at))
def print_db(node, broker, expect_type='ROOT', indent_level=0):
indent = indent_level * TAB
print('%s(%s) %s node id: %s, node index: %s' %
(indent, node[1][0], broker.db_file, node[0], node[1]))
actual_type = container_type(broker)
if actual_type != expect_type:
print('%s ERROR expected %s but found %s' %
(indent, expect_type, actual_type))
def print_own_shard_range(node, sr, indent_level):
indent = indent_level * TAB
range = '%r - %r' % (sr.lower, sr.upper)
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
'modified: %s (%s), %7s: %s (%s), deleted: %s epoch: %s' %
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
sr.timestamp.isoformat, sr.timestamp.internal,
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
sr.state_text, sr.state_timestamp.isoformat,
sr.state_timestamp.internal, sr.deleted,
sr.epoch.internal if sr.epoch else None))
def print_own_shard_range_info(node, shard_ranges, indent_level=0):
shard_ranges.sort(key=lambda x: x.deleted)
for sr in shard_ranges:
print_own_shard_range(node, sr, indent_level)
def print_shard_range(node, sr, indent_level):
indent = indent_level * TAB
range = '%r - %r' % (sr.lower, sr.upper)
print('%s(%s) %23s, objs: %3s, bytes: %3s, timestamp: %s (%s), '
'modified: %s (%s), %7s: %s (%s), deleted: %s %s' %
(indent, node[1][0], range, sr.object_count, sr.bytes_used,
sr.timestamp.isoformat, sr.timestamp.internal,
sr.meta_timestamp.isoformat, sr.meta_timestamp.internal,
sr.state_text, sr.state_timestamp.isoformat,
sr.state_timestamp.internal, sr.deleted, sr.name))
def print_shard_range_info(node, shard_ranges, indent_level=0):
shard_ranges.sort(key=lambda x: x.deleted)
for sr in shard_ranges:
print_shard_range(node, sr, indent_level)
def print_sharding_info(node, broker, indent_level=0):
indent = indent_level * TAB
print('%s(%s) %s' % (indent, node[1][0], broker.get_sharding_sysmeta()))
def print_container(name, name2nodes2brokers, expect_type='ROOT',
indent_level=0, used_names=None):
used_names = used_names or set()
indent = indent_level * TAB
node2broker = name2nodes2brokers[name]
ordered_by_index = sorted(node2broker.keys(), key=lambda x: x[1])
brokers = [(node, node2broker[node]) for node in ordered_by_index]
print('%sName: %s' % (indent, name))
if name in used_names:
print('%s (Details already listed)\n' % indent)
return
used_names.add(name)
print(indent + 'DB files:')
for node, broker in brokers:
print_db(node, broker, expect_type, indent_level=indent_level + 1)
print(indent + 'Info:')
for node, broker in brokers:
print_broker_info(node, broker, indent_level=indent_level + 1)
print(indent + 'Sharding info:')
for node, broker in brokers:
print_sharding_info(node, broker, indent_level=indent_level + 1)
print(indent + 'Own shard range:')
for node, broker in brokers:
shard_ranges = broker.get_shard_ranges(
include_deleted=True, include_own=True, exclude_others=True)
print_own_shard_range_info(node, shard_ranges,
indent_level=indent_level + 1)
print(indent + 'Shard ranges:')
shard_names = set()
for node, broker in brokers:
shard_ranges = broker.get_shard_ranges(include_deleted=True)
for sr_name in shard_ranges:
shard_names.add(sr_name.name)
print_shard_range_info(node, shard_ranges,
indent_level=indent_level + 1)
print(indent + 'Shards:')
for sr_name in shard_names:
print_container(sr_name, name2nodes2brokers, expect_type='SHARD',
indent_level=indent_level + 1, used_names=used_names)
print('\n')
def run(conf_paths):
# container_name -> (node id, node index) -> broker
name2nodes2brokers = defaultdict(dict)
for conf_path in conf_paths:
collect_brokers(conf_path, name2nodes2brokers)
print('First column on each line is (node index)\n')
for name, node2broker in name2nodes2brokers.items():
expect_root = False
for node, broker in node2broker.items():
expect_root = broker.is_root_container() or expect_root
if expect_root:
print_container(name, name2nodes2brokers)
if __name__ == '__main__':
conf_dir = '/etc/swift/container-server'
conf_paths = [os.path.join(conf_dir, p) for p in os.listdir(conf_dir)
if p.endswith(('conf', 'conf.d'))]
run(conf_paths)

View File

@ -34,7 +34,7 @@ PROC_DIR = '/proc'
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor', ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-reconciler', 'container-replicator', 'container-reconciler',
'container-server', 'container-sync', 'container-server', 'container-sharder', 'container-sync',
'container-updater', 'object-auditor', 'object-server', 'container-updater', 'object-auditor', 'object-server',
'object-expirer', 'object-replicator', 'object-expirer', 'object-replicator',
'object-reconstructor', 'object-updater', 'object-reconstructor', 'object-updater',
@ -637,13 +637,16 @@ class Server(object):
{'server': self.server, 'pid': pid, 'conf': conf_file}) {'server': self.server, 'pid': pid, 'conf': conf_file})
return 0 return 0
def spawn(self, conf_file, once=False, wait=True, daemon=True, **kwargs): def spawn(self, conf_file, once=False, wait=True, daemon=True,
additional_args=None, **kwargs):
"""Launch a subprocess for this server. """Launch a subprocess for this server.
:param conf_file: path to conf_file to use as first arg :param conf_file: path to conf_file to use as first arg
:param once: boolean, add once argument to command :param once: boolean, add once argument to command
:param wait: boolean, if true capture stdout with a pipe :param wait: boolean, if true capture stdout with a pipe
:param daemon: boolean, if false ask server to log to console :param daemon: boolean, if false ask server to log to console
:param additional_args: list of additional arguments to pass
on the command line
:returns: the pid of the spawned process :returns: the pid of the spawned process
""" """
@ -653,6 +656,10 @@ class Server(object):
if not daemon: if not daemon:
# ask the server to log to console # ask the server to log to console
args.append('verbose') args.append('verbose')
if additional_args:
if isinstance(additional_args, str):
additional_args = [additional_args]
args.extend(additional_args)
# figure out what we're going to do with stdio # figure out what we're going to do with stdio
if not daemon: if not daemon:

View File

@ -412,6 +412,21 @@ def config_positive_int_value(value):
return result return result
def config_float_value(value, minimum=None, maximum=None):
try:
val = float(value)
if minimum is not None and val < minimum:
raise ValueError()
if maximum is not None and val > maximum:
raise ValueError()
return val
except (TypeError, ValueError):
min_ = ', greater than %s' % minimum if minimum is not None else ''
max_ = ', less than %s' % maximum if maximum is not None else ''
raise ValueError('Config option must be a number%s%s, not "%s".' %
(min_, max_, value))
def config_auto_int_value(value, default): def config_auto_int_value(value, default):
""" """
Returns default if value is None or 'auto'. Returns default if value is None or 'auto'.

View File

@ -746,6 +746,43 @@ class ContainerBroker(DatabaseBroker):
'meta_timestamp': meta_timestamp} 'meta_timestamp': meta_timestamp}
self.put_record(record) self.put_record(record)
def remove_objects(self, lower, upper, max_row=None):
"""
Removes object records in the given namespace range from the object
table.
Note that objects are removed regardless of their storage_policy_index.
:param lower: defines the lower bound of object names that will be
removed; names greater than this value will be removed; names less
than or equal to this value will not be removed.
:param upper: defines the upper bound of object names that will be
removed; names less than or equal to this value will be removed;
names greater than this value will not be removed. The empty string
is interpreted as there being no upper bound.
:param max_row: if specified only rows less than or equal to max_row
will be removed
"""
query_conditions = []
query_args = []
if max_row is not None:
query_conditions.append('ROWID <= ?')
query_args.append(str(max_row))
if lower:
query_conditions.append('name > ?')
query_args.append(lower)
if upper:
query_conditions.append('name <= ?')
query_args.append(upper)
query = 'DELETE FROM object WHERE deleted in (0, 1)'
if query_conditions:
query += ' AND ' + ' AND '.join(query_conditions)
with self.get() as conn:
conn.execute(query, query_args)
conn.commit()
def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp, def _is_deleted_info(self, object_count, put_timestamp, delete_timestamp,
**kwargs): **kwargs):
""" """

1568
swift/container/sharder.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -2007,7 +2007,7 @@ class Controller(object):
:param req: original Request instance. :param req: original Request instance.
:param account: account in which `container` is stored. :param account: account in which `container` is stored.
:param container: container from which listing should be fetched. :param container: container from listing should be fetched.
:param headers: headers to be included with the request :param headers: headers to be included with the request
:param params: query string parameters to be used. :param params: query string parameters to be used.
:return: a tuple of (deserialized json data structure, swob Response) :return: a tuple of (deserialized json data structure, swob Response)

View File

@ -21,6 +21,7 @@ from swift.common.utils import public, csv_append, Timestamp, \
config_true_value, ShardRange config_true_value, ShardRange
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
from swift.common.http import HTTP_ACCEPTED, is_success from swift.common.http import HTTP_ACCEPTED, is_success
from swift.common.request_helpers import get_sys_meta_prefix
from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.proxy.controllers.base import Controller, delay_denial, \
cors_validation, set_info_cache, clear_info_cache cors_validation, set_info_cache, clear_info_cache
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
@ -136,6 +137,11 @@ class ContainerController(Controller):
for key in self.app.swift_owner_headers: for key in self.app.swift_owner_headers:
if key in resp.headers: if key in resp.headers:
del resp.headers[key] del resp.headers[key]
# Expose sharding state in reseller requests
if req.environ.get('reseller_request', False):
resp.headers['X-Container-Sharding'] = config_true_value(
resp.headers.get(get_sys_meta_prefix('container') + 'Sharding',
'False'))
return resp return resp
def _get_from_shards(self, req, resp): def _get_from_shards(self, req, resp):
@ -257,6 +263,10 @@ class ContainerController(Controller):
if not req.environ.get('swift_owner'): if not req.environ.get('swift_owner'):
for key in self.app.swift_owner_headers: for key in self.app.swift_owner_headers:
req.headers.pop(key, None) req.headers.pop(key, None)
if req.environ.get('reseller_request', False) and \
'X-Container-Sharding' in req.headers:
req.headers[get_sys_meta_prefix('container') + 'Sharding'] = \
str(config_true_value(req.headers['X-Container-Sharding']))
length_limit = self.get_name_length_limit() length_limit = self.get_name_length_limit()
if len(self.container_name) > length_limit: if len(self.container_name) > length_limit:
resp = HTTPBadRequest(request=req) resp = HTTPBadRequest(request=req)
@ -305,6 +315,10 @@ class ContainerController(Controller):
if not req.environ.get('swift_owner'): if not req.environ.get('swift_owner'):
for key in self.app.swift_owner_headers: for key in self.app.swift_owner_headers:
req.headers.pop(key, None) req.headers.pop(key, None)
if req.environ.get('reseller_request', False) and \
'X-Container-Sharding' in req.headers:
req.headers[get_sys_meta_prefix('container') + 'Sharding'] = \
str(config_true_value(req.headers['X-Container-Sharding']))
account_partition, accounts, container_count = \ account_partition, accounts, container_count = \
self.account_info(self.account_name, req) self.account_info(self.account_name, req)
if not accounts: if not accounts:

2025
test/probe/test_sharder.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -1343,3 +1343,46 @@ def unlink_files(paths):
except OSError as err: except OSError as err:
if err.errno != errno.ENOENT: if err.errno != errno.ENOENT:
raise raise
class FakeHTTPResponse(object):
def __init__(self, resp):
self.resp = resp
@property
def status(self):
return self.resp.status_int
@property
def data(self):
return self.resp.body
def attach_fake_replication_rpc(rpc, replicate_hook=None, errors=None):
class FakeReplConnection(object):
def __init__(self, node, partition, hash_, logger):
self.logger = logger
self.node = node
self.partition = partition
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
self.host = node['replication_ip']
def replicate(self, op, *sync_args):
print('REPLICATE: %s, %s, %r' % (self.path, op, sync_args))
resp = None
if errors and op in errors and errors[op]:
resp = errors[op].pop(0)
if not resp:
replicate_args = self.path.lstrip('/').split('/')
args = [op] + copy.deepcopy(list(sync_args))
with mock_check_drive(isdir=not rpc.mount_check,
ismount=rpc.mount_check):
swob_response = rpc.dispatch(replicate_args, args)
resp = FakeHTTPResponse(swob_response)
if replicate_hook:
replicate_hook(op, *sync_args)
return resp
return FakeReplConnection

View File

@ -0,0 +1,362 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import unicode_literals
import json
import os
import unittest
import mock
from shutil import rmtree
from tempfile import mkdtemp
from six.moves import cStringIO as StringIO
from swift.cli.manage_shard_ranges import main
from swift.common import utils
from swift.common.utils import Timestamp, ShardRange
from swift.container.backend import ContainerBroker
from test.unit import mock_timestamp_now
class TestManageShardRanges(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_find_shards')
utils.mkdirs(self.testdir)
rmtree(self.testdir)
self.shard_data = [
{'index': 0, 'lower': '', 'upper': 'obj09', 'object_count': 10},
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
'object_count': 10},
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
'object_count': 10},
{'index': 3, 'lower': 'obj29', 'upper': 'obj39',
'object_count': 10},
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
'object_count': 10},
{'index': 5, 'lower': 'obj49', 'upper': 'obj59',
'object_count': 10},
{'index': 6, 'lower': 'obj59', 'upper': 'obj69',
'object_count': 10},
{'index': 7, 'lower': 'obj69', 'upper': 'obj79',
'object_count': 10},
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
'object_count': 10},
{'index': 9, 'lower': 'obj89', 'upper': '', 'object_count': 10},
]
def tearDown(self):
rmtree(os.path.dirname(self.testdir))
def assert_starts_with(self, value, prefix):
self.assertTrue(value.startswith(prefix),
"%r does not start with %r" % (value, prefix))
def assert_formatted_json(self, output, expected):
try:
loaded = json.loads(output)
except ValueError as err:
self.fail('Invalid JSON: %s\n%r' % (err, output))
# Check this one first, for a prettier diff
self.assertEqual(loaded, expected)
formatted = json.dumps(expected, sort_keys=True, indent=2) + '\n'
self.assertEqual(output, formatted)
def _make_broker(self, account='a', container='c',
device='sda', part=0):
datadir = os.path.join(
self.testdir, device, 'containers', str(part), 'ash', 'hash')
db_file = os.path.join(datadir, 'hash.db')
broker = ContainerBroker(
db_file, account=account, container=container)
broker.initialize()
return broker
def test_find_shard_ranges(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
broker.merge_items([
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(100)])
# Default uses a large enough value that sharding isn't required
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find'])
self.assert_formatted_json(out.getvalue(), [])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 0 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '100'])
self.assert_formatted_json(out.getvalue(), [])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 0 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '99'])
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99},
{'index': 1, 'lower': 'obj98', 'upper': '', 'object_count': 1},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 2 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '10'])
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj09', 'object_count': 10},
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
'object_count': 10},
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
'object_count': 10},
{'index': 3, 'lower': 'obj29', 'upper': 'obj39',
'object_count': 10},
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
'object_count': 10},
{'index': 5, 'lower': 'obj49', 'upper': 'obj59',
'object_count': 10},
{'index': 6, 'lower': 'obj59', 'upper': 'obj69',
'object_count': 10},
{'index': 7, 'lower': 'obj69', 'upper': 'obj79',
'object_count': 10},
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
'object_count': 10},
{'index': 9, 'lower': 'obj89', 'upper': '', 'object_count': 10},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 10 ranges in ')
def test_info(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: None',
'db_state = unsharded',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
retiring_db_id = broker.get_info()['id']
broker.merge_shard_ranges(ShardRange('.shards/cc', Timestamp.now()))
epoch = Timestamp.now()
with mock_timestamp_now(epoch) as now:
broker.enable_sharding(epoch)
self.assertTrue(broker.set_sharding_state())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now(now):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: {',
' "bytes_used": 0, ',
' "deleted": 0, ',
' "epoch": "%s", ' % epoch.internal,
' "lower": "", ',
' "meta_timestamp": "%s", ' % now.internal,
' "name": "a/c", ',
' "object_count": 0, ',
' "state": "sharding", ',
' "state_timestamp": "%s", ' % now.internal,
' "timestamp": "%s", ' % now.internal,
' "upper": ""',
'}',
'db_state = sharding',
'Retiring db id: %s' % retiring_db_id,
'Cleaving context: {',
' "cleave_to_row": null, ',
' "cleaving_done": false, ',
' "cursor": "", ',
' "last_cleave_to_row": null, ',
' "max_row": -1, ',
' "misplaced_done": false, ',
' "ranges_done": 0, ',
' "ranges_todo": 0, ',
' "ref": "%s"' % retiring_db_id,
'}',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self.assertTrue(broker.set_sharded_state())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now(now):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: {',
' "bytes_used": 0, ',
' "deleted": 0, ',
' "epoch": "%s", ' % epoch.internal,
' "lower": "", ',
' "meta_timestamp": "%s", ' % now.internal,
' "name": "a/c", ',
' "object_count": 0, ',
' "state": "sharding", ',
' "state_timestamp": "%s", ' % now.internal,
' "timestamp": "%s", ' % now.internal,
' "upper": ""',
'}',
'db_state = sharded',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
def test_replace(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'wb') as fd:
json.dump(self.shard_data, fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'replace', input_file])
expected = [
'No shard ranges found to delete.',
'Injected 10 shard ranges.',
'Run container-replicator to replicate them to other nodes.',
'Use the enable sub-command to enable sharding.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self.assertEqual(
[(data['lower'], data['upper']) for data in self.shard_data],
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
def _assert_enabled(self, broker, epoch):
own_sr = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_sr.state)
self.assertEqual(epoch, own_sr.epoch)
self.assertEqual(ShardRange.MIN, own_sr.lower)
self.assertEqual(ShardRange.MAX, own_sr.upper)
self.assertEqual(
'True', broker.metadata['X-Container-Sysmeta-Sharding'][0])
def test_enable(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
# no shard ranges
out = StringIO()
err = StringIO()
with self.assertRaises(SystemExit):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'enable'])
expected = ["WARNING: invalid shard ranges: ['No shard ranges.'].",
'Aborting.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
# success
shard_ranges = []
for data in self.shard_data:
path = ShardRange.make_path(
'.shards_a', 'c', 'c', Timestamp.now(), data['index'])
shard_ranges.append(
ShardRange(path, Timestamp.now(), data['lower'],
data['upper'], data['object_count']))
broker.merge_shard_ranges(shard_ranges)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now() as now:
main([broker.db_file, 'enable'])
expected = [
"Container moved to state 'sharding' with epoch %s." %
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
# already enabled
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'enable'])
expected = [
"Container already in state 'sharding' with epoch %s." %
now.internal,
'No action required.',
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
def test_find_replace_enable(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
broker.merge_items([
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(100)])
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now() as now:
main([broker.db_file, 'find_and_replace', '10', '--enable'])
expected = [
'No shard ranges found to delete.',
'Injected 10 shard ranges.',
'Run container-replicator to replicate them to other nodes.',
"Container moved to state 'sharding' with epoch %s." %
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
self.assertEqual(
[(data['lower'], data['upper']) for data in self.shard_data],
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])

View File

@ -28,7 +28,6 @@ from tempfile import mkdtemp, NamedTemporaryFile
import json import json
import mock import mock
from copy import deepcopy
from mock import patch, call from mock import patch, call
from six.moves import reload_module from six.moves import reload_module
@ -40,7 +39,7 @@ from swift.common.exceptions import DriveNotMounted
from swift.common.swob import HTTPException from swift.common.swob import HTTPException
from test import unit from test import unit
from test.unit import FakeLogger from test.unit import FakeLogger, attach_fake_replication_rpc
from test.unit.common.test_db import ExampleBroker from test.unit.common.test_db import ExampleBroker
@ -2054,49 +2053,6 @@ class TestReplToNode(unittest.TestCase):
]) ])
class FakeHTTPResponse(object):
def __init__(self, resp):
self.resp = resp
@property
def status(self):
return self.resp.status_int
@property
def data(self):
return self.resp.body
def attach_fake_replication_rpc(rpc, replicate_hook=None, errors=None):
class FakeReplConnection(object):
def __init__(self, node, partition, hash_, logger):
self.logger = logger
self.node = node
self.partition = partition
self.path = '/%s/%s/%s' % (node['device'], partition, hash_)
self.host = node['replication_ip']
def replicate(self, op, *sync_args):
print('REPLICATE: %s, %s, %r' % (self.path, op, sync_args))
resp = None
if errors and op in errors and errors[op]:
resp = errors[op].pop(0)
if not resp:
replicate_args = self.path.lstrip('/').split('/')
args = [op] + deepcopy(list(sync_args))
with unit.mock_check_drive(isdir=not rpc.mount_check,
ismount=rpc.mount_check):
swob_response = rpc.dispatch(replicate_args, args)
resp = FakeHTTPResponse(swob_response)
if replicate_hook:
replicate_hook(op, *sync_args)
return resp
return FakeReplConnection
class ExampleReplicator(db_replicator.Replicator): class ExampleReplicator(db_replicator.Replicator):
server_type = 'fake' server_type = 'fake'
brokerclass = ExampleBroker brokerclass = ExampleBroker

View File

@ -2766,6 +2766,53 @@ cluster_dfw1 = http://dfw1.host/v1/
else: else:
self.assertEqual(expected, rv) self.assertEqual(expected, rv)
def test_config_float_value(self):
for args, expected in (
((99, None, None), 99.0),
((99.01, None, None), 99.01),
(('99', None, None), 99.0),
(('99.01', None, None), 99.01),
((99, 99, None), 99.0),
((99.01, 99.01, None), 99.01),
(('99', 99, None), 99.0),
(('99.01', 99.01, None), 99.01),
((99, None, 99), 99.0),
((99.01, None, 99.01), 99.01),
(('99', None, 99), 99.0),
(('99.01', None, 99.01), 99.01),
((-99, -99, -99), -99.0),
((-99.01, -99.01, -99.01), -99.01),
(('-99', -99, -99), -99.0),
(('-99.01', -99.01, -99.01), -99.01),):
actual = utils.config_float_value(*args)
self.assertEqual(expected, actual)
for val, minimum in ((99, 100),
('99', 100),
(-99, -98),
('-98.01', -98)):
with self.assertRaises(ValueError) as cm:
utils.config_float_value(val, minimum=minimum)
self.assertIn('greater than %s' % minimum, cm.exception.args[0])
self.assertNotIn('less than', cm.exception.args[0])
for val, maximum in ((99, 98),
('99', 98),
(-99, -100),
('-97.9', -98)):
with self.assertRaises(ValueError) as cm:
utils.config_float_value(val, maximum=maximum)
self.assertIn('less than %s' % maximum, cm.exception.args[0])
self.assertNotIn('greater than', cm.exception.args[0])
for val, minimum, maximum in ((99, 99, 98),
('99', 100, 100),
(99, 98, 98),):
with self.assertRaises(ValueError) as cm:
utils.config_float_value(val, minimum=minimum, maximum=maximum)
self.assertIn('greater than %s' % minimum, cm.exception.args[0])
self.assertIn('less than %s' % maximum, cm.exception.args[0])
def test_config_auto_int_value(self): def test_config_auto_int_value(self):
expectations = { expectations = {
# (value, default) : expected, # (value, default) : expected,

View File

@ -2013,6 +2013,75 @@ class TestContainerBroker(unittest.TestCase):
self.assertEqual(info['reported_object_count'], 2) self.assertEqual(info['reported_object_count'], 2)
self.assertEqual(info['reported_bytes_used'], 1123) self.assertEqual(info['reported_bytes_used'], 1123)
@with_tempdir
def test_remove_objects(self, tempdir):
objects = (('undeleted', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG, 0, 0),
('other_policy', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG, 0, 1),
('deleted', Timestamp.now().internal, 0, 'text/plain',
EMPTY_ETAG, 1, 0))
object_names = [o[0] for o in objects]
def get_rows(broker):
with broker.get() as conn:
cursor = conn.execute("SELECT * FROM object")
return [r[1] for r in cursor]
def do_setup():
db_path = os.path.join(
tempdir, 'part', 'suffix', 'hash', '%s.db' % uuid4())
broker = ContainerBroker(db_path, account='a', container='c')
broker.initialize(Timestamp.now().internal, 0)
for obj in objects:
# ensure row order matches put order
broker.put_object(*obj)
broker._commit_puts()
self.assertEqual(3, broker.get_max_row()) # sanity check
self.assertEqual(object_names, get_rows(broker)) # sanity check
return broker
broker = do_setup()
broker.remove_objects('', '')
self.assertFalse(get_rows(broker))
broker = do_setup()
broker.remove_objects('deleted', '')
self.assertEqual([object_names[2]], get_rows(broker))
broker = do_setup()
broker.remove_objects('', 'deleted', max_row=2)
self.assertEqual(object_names, get_rows(broker))
broker = do_setup()
broker.remove_objects('deleted', 'un')
self.assertEqual([object_names[0], object_names[2]], get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=-1)
self.assertEqual(object_names, get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=0)
self.assertEqual(object_names, get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=1)
self.assertEqual(object_names[1:], get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=2)
self.assertEqual(object_names[2:], get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=3)
self.assertFalse(get_rows(broker))
broker = do_setup()
broker.remove_objects('', '', max_row=99)
self.assertFalse(get_rows(broker))
def test_get_objects(self): def test_get_objects(self):
broker = ContainerBroker(':memory:', account='a', container='c') broker = ContainerBroker(':memory:', account='a', container='c')
broker.initialize(Timestamp('1').internal, 0) broker.initialize(Timestamp('1').internal, 0)

File diff suppressed because it is too large Load Diff

View File

@ -159,6 +159,91 @@ class TestContainerController(TestRingBase):
for key in owner_headers: for key in owner_headers:
self.assertIn(key, resp.headers) self.assertIn(key, resp.headers)
def test_reseller_admin(self):
reseller_internal_headers = {
get_sys_meta_prefix('container') + 'sharding': 'True'}
reseller_external_headers = {'x-container-sharding': 'on'}
controller = proxy_server.ContainerController(self.app, 'a', 'c')
# Normal users, even swift owners, can't set it
req = Request.blank('/v1/a/c', method='PUT',
headers=reseller_external_headers,
environ={'swift_owner': True})
with mocked_http_conn(*[201] * self.CONTAINER_REPLICAS) as mock_conn:
resp = req.get_response(self.app)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_internal_headers:
for captured in mock_conn.requests:
self.assertNotIn(key.title(), captured['headers'])
req = Request.blank('/v1/a/c', method='POST',
headers=reseller_external_headers,
environ={'swift_owner': True})
with mocked_http_conn(*[204] * self.CONTAINER_REPLICAS) as mock_conn:
resp = req.get_response(self.app)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_internal_headers:
for captured in mock_conn.requests:
self.assertNotIn(key.title(), captured['headers'])
req = Request.blank('/v1/a/c', environ={'swift_owner': True})
# Heck, they don't even get to know
with mock.patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, 200,
headers=reseller_internal_headers)):
resp = controller.HEAD(req)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_external_headers:
self.assertNotIn(key, resp.headers)
with mock.patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, 200,
headers=reseller_internal_headers)):
resp = controller.GET(req)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_external_headers:
self.assertNotIn(key, resp.headers)
# But reseller admins can set it
req = Request.blank('/v1/a/c', method='PUT',
headers=reseller_external_headers,
environ={'reseller_request': True})
with mocked_http_conn(*[201] * self.CONTAINER_REPLICAS) as mock_conn:
resp = req.get_response(self.app)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_internal_headers:
for captured in mock_conn.requests:
self.assertIn(key.title(), captured['headers'])
req = Request.blank('/v1/a/c', method='POST',
headers=reseller_external_headers,
environ={'reseller_request': True})
with mocked_http_conn(*[204] * self.CONTAINER_REPLICAS) as mock_conn:
resp = req.get_response(self.app)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_internal_headers:
for captured in mock_conn.requests:
self.assertIn(key.title(), captured['headers'])
# And see that they have
req = Request.blank('/v1/a/c', environ={'reseller_request': True})
with mock.patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, 200,
headers=reseller_internal_headers)):
resp = controller.HEAD(req)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_external_headers:
self.assertIn(key, resp.headers)
self.assertEqual(resp.headers[key], 'True')
with mock.patch('swift.proxy.controllers.base.http_connect',
fake_http_connect(200, 200,
headers=reseller_internal_headers)):
resp = controller.GET(req)
self.assertEqual(2, resp.status_int // 100)
for key in reseller_external_headers:
self.assertEqual(resp.headers[key], 'True')
def test_sys_meta_headers_PUT(self): def test_sys_meta_headers_PUT(self):
# check that headers in sys meta namespace make it through # check that headers in sys meta namespace make it through
# the container controller # the container controller