Merge "Remove swift-bench"
This commit is contained in:
commit
204110fa10
189
bin/swift-bench
189
bin/swift-bench
@ -1,189 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import uuid
|
||||
from optparse import OptionParser
|
||||
|
||||
from swift.common.bench import (BenchController, DistributedBenchController,
|
||||
create_containers, delete_containers)
|
||||
from swift.common.utils import readconf, LogAdapter, config_true_value
|
||||
|
||||
# The defaults should be sufficient to run swift-bench on a SAIO
|
||||
CONF_DEFAULTS = {
|
||||
'auth': os.environ.get('ST_AUTH', ''),
|
||||
'user': os.environ.get('ST_USER', ''),
|
||||
'key': os.environ.get('ST_KEY', ''),
|
||||
'auth_version': '1.0',
|
||||
'use_proxy': 'yes',
|
||||
'put_concurrency': '10',
|
||||
'get_concurrency': '10',
|
||||
'del_concurrency': '10',
|
||||
'concurrency': '', # set all 3 in one shot
|
||||
'object_sources': '', # set of file contents to read and use for PUTs
|
||||
'lower_object_size': '10', # bounded random size used if these differ
|
||||
'upper_object_size': '10',
|
||||
'object_size': '1', # only if not object_sources and lower == upper
|
||||
'num_objects': '1000',
|
||||
'num_gets': '10000',
|
||||
'delete': 'yes',
|
||||
'container_name': uuid.uuid4().hex, # really "container name base"
|
||||
'num_containers': '20',
|
||||
'url': '', # used when use_proxy = no or overrides auth X-Storage-Url
|
||||
'account': '', # used when use_proxy = no
|
||||
'devices': 'sdb1', # space-sep list
|
||||
'log_level': 'INFO',
|
||||
'timeout': '10',
|
||||
'delay': '0',
|
||||
'bench_clients': [],
|
||||
}
|
||||
|
||||
SAIO_DEFAULTS = {
|
||||
'auth': 'http://localhost:8080/auth/v1.0',
|
||||
'user': 'test:tester',
|
||||
'key': 'testing',
|
||||
}
|
||||
|
||||
if __name__ == '__main__':
|
||||
usage = "usage: %prog [OPTIONS] [CONF_FILE]"
|
||||
usage += """\n\nConf file with SAIO defaults:
|
||||
|
||||
[bench]
|
||||
auth = http://localhost:8080/auth/v1.0
|
||||
user = test:tester
|
||||
key = testing
|
||||
concurrency = 10
|
||||
object_size = 1
|
||||
num_objects = 1000
|
||||
num_gets = 10000
|
||||
delete = yes
|
||||
auth_version = 1.0
|
||||
"""
|
||||
parser = OptionParser(usage=usage)
|
||||
parser.add_option('', '--saio', dest='saio', action='store_true',
|
||||
default=False, help='Run benchmark with SAIO defaults')
|
||||
parser.add_option('-A', '--auth', dest='auth',
|
||||
help='URL for obtaining an auth token')
|
||||
parser.add_option('-U', '--user', dest='user',
|
||||
help='User name for obtaining an auth token')
|
||||
parser.add_option('-K', '--key', dest='key',
|
||||
help='Key for obtaining an auth token')
|
||||
parser.add_option('-b', '--bench-clients', action='append',
|
||||
metavar='<ip>:<port>',
|
||||
help=('A string of the form "<ip>:<port>" which matches '
|
||||
'the arguments supplied to a swift-bench-client '
|
||||
'process. This argument must be specified '
|
||||
'once per swift-bench-client you want to '
|
||||
'utilize.'))
|
||||
parser.add_option('-u', '--url', dest='url',
|
||||
help='Storage URL')
|
||||
parser.add_option('-c', '--concurrency', dest='concurrency',
|
||||
help=('Number of concurrent connections to use. For '
|
||||
'finer-grained control, see --get-concurrency, '
|
||||
'--put-concurrency, and --delete-concurrency.'))
|
||||
parser.add_option('--get-concurrency', dest='get_concurrency',
|
||||
help='Number of concurrent GET requests')
|
||||
parser.add_option('--put-concurrency', dest='put_concurrency',
|
||||
help='Number of concurrent PUT requests')
|
||||
parser.add_option('--delete-concurrency', dest='delete_concurrency',
|
||||
help='Number of concurrent DELETE requests')
|
||||
parser.add_option('-s', '--object-size', dest='object_size',
|
||||
help='Size of objects to PUT (in bytes)')
|
||||
parser.add_option('-l', '--lower-object-size', dest='lower_object_size',
|
||||
help=('Lower size of objects (in bytes); '
|
||||
'--object-size will be upper-object-size'))
|
||||
parser.add_option('-n', '--num-objects', dest='num_objects',
|
||||
help='Number of objects to PUT')
|
||||
parser.add_option('-g', '--num-gets', dest='num_gets',
|
||||
help='Number of GET operations to perform')
|
||||
parser.add_option('-C', '--num-containers', dest='num_containers',
|
||||
help='Number of containers to distribute objects among')
|
||||
parser.add_option('-x', '--no-delete', dest='delete', action='store_false',
|
||||
help='If set, will not delete the objects created')
|
||||
parser.add_option('-V', '--auth_version', dest='auth_version',
|
||||
help='Authentication version')
|
||||
parser.add_option('-d', '--delay', dest='delay',
|
||||
help='Delay before delete requests in seconds')
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
options, args = parser.parse_args()
|
||||
if options.saio:
|
||||
CONF_DEFAULTS.update(SAIO_DEFAULTS)
|
||||
if getattr(options, 'lower_object_size', None):
|
||||
if options.object_size <= options.lower_object_size:
|
||||
raise ValueError('--lower-object-size (%s) must be '
|
||||
'< --object-size (%s)' %
|
||||
(options.lower_object_size, options.object_size))
|
||||
CONF_DEFAULTS['upper_object_size'] = options.object_size
|
||||
if args:
|
||||
conf = args[0]
|
||||
if not os.path.exists(conf):
|
||||
sys.exit("No such conf file: %s" % conf)
|
||||
conf = readconf(conf, 'bench', log_name='swift-bench',
|
||||
defaults=CONF_DEFAULTS)
|
||||
conf['bench_clients'] = []
|
||||
else:
|
||||
conf = CONF_DEFAULTS
|
||||
parser.set_defaults(**conf)
|
||||
options, _junk = parser.parse_args()
|
||||
if options.concurrency is not '':
|
||||
options.put_concurrency = options.concurrency
|
||||
options.get_concurrency = options.concurrency
|
||||
options.del_concurrency = options.concurrency
|
||||
options.containers = ['%s_%d' % (options.container_name, i)
|
||||
for i in xrange(int(options.num_containers))]
|
||||
|
||||
# Normalize boolean option to a config parameter value
|
||||
if config_true_value(str(options.delete).lower()):
|
||||
options.delete = 'yes'
|
||||
else:
|
||||
options.delete = 'no'
|
||||
|
||||
def sigterm(signum, frame):
|
||||
sys.exit('Termination signal received.')
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.propagate = False
|
||||
logger.setLevel({
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL}.get(
|
||||
options.log_level.lower(), logging.INFO))
|
||||
loghandler = logging.StreamHandler()
|
||||
logger.addHandler(loghandler)
|
||||
logger = LogAdapter(logger, 'swift-bench')
|
||||
logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
|
||||
'%(message)s')
|
||||
loghandler.setFormatter(logformat)
|
||||
|
||||
if options.use_proxy:
|
||||
create_containers(logger, options)
|
||||
|
||||
controller_class = DistributedBenchController if options.bench_clients \
|
||||
else BenchController
|
||||
controller = controller_class(logger, options)
|
||||
controller.run()
|
||||
|
||||
if options.use_proxy and options.delete == 'yes':
|
||||
delete_containers(logger, options)
|
@ -1,59 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import sys
|
||||
import signal
|
||||
from optparse import OptionParser
|
||||
|
||||
from swift.common.bench import BenchServer
|
||||
from swift.common.utils import LogAdapter
|
||||
|
||||
if __name__ == '__main__':
|
||||
usage = "usage: %prog <ip> <port>"
|
||||
usage += "\n\nRun a client for distributed swift-bench runs."
|
||||
parser = OptionParser(usage=usage)
|
||||
parser.add_option('-o', '--log-level', dest='log_level',
|
||||
default='info',
|
||||
help='Logging level (debug, info, etc)')
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
options, args = parser.parse_args()
|
||||
|
||||
logger = logging.getLogger()
|
||||
logger.setLevel({
|
||||
'debug': logging.DEBUG,
|
||||
'info': logging.INFO,
|
||||
'warning': logging.WARNING,
|
||||
'error': logging.ERROR,
|
||||
'critical': logging.CRITICAL}.get(
|
||||
options.log_level.lower(), logging.INFO))
|
||||
loghandler = logging.StreamHandler()
|
||||
logger.addHandler(loghandler)
|
||||
logger = LogAdapter(logger, 'swift-bench-client')
|
||||
logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
|
||||
'%(message)s')
|
||||
loghandler.setFormatter(logformat)
|
||||
|
||||
def sigterm(signum, frame):
|
||||
sys.exit('Termination signal received.')
|
||||
signal.signal(signal.SIGTERM, sigterm)
|
||||
signal.signal(signal.SIGINT, sigterm)
|
||||
|
||||
server = BenchServer(logger, args[0], args[1])
|
||||
server.run()
|
@ -1,60 +0,0 @@
|
||||
[bench]
|
||||
# auth = http://localhost:8080/auth/v1.0
|
||||
# user = test:tester
|
||||
# key = testing
|
||||
# auth_version = 1.0
|
||||
# log-level = INFO
|
||||
# timeout = 10
|
||||
|
||||
# You can configure PUT, GET, and DELETE concurrency independently or set all
|
||||
# three with "concurrency"
|
||||
# put_concurrency = 10
|
||||
# get_concurrency = 10
|
||||
# del_concurrency = 10
|
||||
# concurrency =
|
||||
|
||||
# A space-sep list of files whose contents will be read and randomly chosen
|
||||
# as the body (object contents) for each PUT.
|
||||
# object_sources =
|
||||
|
||||
# If object_sources is not set and lower_object_size != upper_object_size,
|
||||
# each PUT will randomly select an object size between the two values. Units
|
||||
# are bytes.
|
||||
# lower_object_size = 10
|
||||
# upper_object_size = 10
|
||||
|
||||
# If object_sources is not set and lower_object_size == upper_object_size,
|
||||
# every object PUT will contain this many bytes.
|
||||
# object_size = 1
|
||||
|
||||
# num_objects = 1000
|
||||
# num_gets = 10000
|
||||
# num_containers = 20
|
||||
|
||||
# The base name for created containers.
|
||||
# container_name = (randomly-chosen uuid4)
|
||||
|
||||
# Should swift-bench benchmark DELETEing the created objects and then delete
|
||||
# all created containers?
|
||||
# delete = yes
|
||||
|
||||
# Without use_proxy, swift-bench will talk directly to the backend Swift
|
||||
# servers. Doing that will require "url", "account", and at least one
|
||||
# "devices" entry.
|
||||
# use_proxy = yes
|
||||
|
||||
# If use_proxy = yes, this will override any returned X-Storage-Url returned
|
||||
# by authenticaion (the account name will still be extracted from
|
||||
# X-Storage-Url though and may NOT be set with the "account" conf var). If
|
||||
# use_proxy = no, this setting is required and used as the X-Storage-Url when
|
||||
# deleting containers and as a source for IP and port for back-end Swift server
|
||||
# connections. The IP and port specified in this setting must have local
|
||||
# storage access to every device specified in "devices".
|
||||
# url =
|
||||
|
||||
# Only used (and required) when use_proxy = no.
|
||||
# account =
|
||||
|
||||
# A space-sep list of devices names; only relevant (and required) when
|
||||
# use_proxy = no.
|
||||
# devices = sdb1
|
@ -31,8 +31,6 @@ scripts =
|
||||
bin/swift-account-reaper
|
||||
bin/swift-account-replicator
|
||||
bin/swift-account-server
|
||||
bin/swift-bench
|
||||
bin/swift-bench-client
|
||||
bin/swift-config
|
||||
bin/swift-container-auditor
|
||||
bin/swift-container-replicator
|
||||
|
@ -1,494 +0,0 @@
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import re
|
||||
import sys
|
||||
import uuid
|
||||
import time
|
||||
import random
|
||||
import signal
|
||||
import socket
|
||||
import logging
|
||||
from contextlib import contextmanager
|
||||
from swift import gettext_ as _
|
||||
from optparse import Values
|
||||
|
||||
import eventlet
|
||||
import eventlet.pools
|
||||
from eventlet.green.httplib import CannotSendRequest
|
||||
|
||||
from swift.common.utils import config_true_value, LogAdapter
|
||||
import swiftclient as client
|
||||
from swift.common import direct_client
|
||||
from swift.common.http import HTTP_CONFLICT
|
||||
from swift.common.utils import json
|
||||
|
||||
|
||||
def _func_on_containers(logger, conf, concurrency_key, func):
|
||||
"""Run a function on each container with concurrency."""
|
||||
|
||||
bench = Bench(logger, conf, [])
|
||||
pool = eventlet.GreenPool(int(getattr(conf, concurrency_key)))
|
||||
for container in conf.containers:
|
||||
pool.spawn_n(func, bench.url, bench.token, container)
|
||||
pool.waitall()
|
||||
|
||||
|
||||
def delete_containers(logger, conf):
|
||||
"""Utility function to delete benchmark containers."""
|
||||
|
||||
def _deleter(url, token, container):
|
||||
try:
|
||||
client.delete_container(url, token, container)
|
||||
except client.ClientException as e:
|
||||
if e.http_status != HTTP_CONFLICT:
|
||||
logger.warn("Unable to delete container '%s'. "
|
||||
"Got http status '%d'."
|
||||
% (container, e.http_status))
|
||||
|
||||
_func_on_containers(logger, conf, 'del_concurrency', _deleter)
|
||||
|
||||
|
||||
def create_containers(logger, conf):
|
||||
"""Utility function to create benchmark containers."""
|
||||
|
||||
_func_on_containers(logger, conf, 'put_concurrency', client.put_container)
|
||||
|
||||
|
||||
class SourceFile(object):
|
||||
"""
|
||||
Iterable, file-like object to lazily emit a bunch of zeros in
|
||||
reasonable-size chunks.
|
||||
|
||||
swift.common.direct_client wants iterables, but swiftclient wants
|
||||
file-like objects where hasattr(thing, 'read') is true. Therefore,
|
||||
this class can do both.
|
||||
"""
|
||||
|
||||
def __init__(self, size, chunk_size=1024 * 64):
|
||||
self.pos = 0
|
||||
self.size = size
|
||||
self.chunk_size = chunk_size
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __len__(self):
|
||||
return self.size
|
||||
|
||||
def next(self):
|
||||
if self.pos >= self.size:
|
||||
raise StopIteration
|
||||
chunk_size = min(self.size - self.pos, self.chunk_size)
|
||||
yield '0' * chunk_size
|
||||
self.pos += chunk_size
|
||||
|
||||
def read(self, desired_size):
|
||||
chunk_size = min(self.size - self.pos, desired_size)
|
||||
self.pos += chunk_size
|
||||
return '0' * chunk_size
|
||||
|
||||
|
||||
class ConnectionPool(eventlet.pools.Pool):
|
||||
|
||||
def __init__(self, url, size):
|
||||
self.url = url
|
||||
eventlet.pools.Pool.__init__(self, size, size)
|
||||
|
||||
def create(self):
|
||||
return client.http_connection(self.url)
|
||||
|
||||
|
||||
class BenchServer(object):
|
||||
"""
|
||||
A BenchServer binds to an IP/port and listens for bench jobs. A bench
|
||||
job consists of the normal conf "dict" encoded in JSON, terminated with an
|
||||
EOF. The log level is at least INFO, but DEBUG may also be specified in
|
||||
the conf dict.
|
||||
|
||||
The server will wait forever for jobs, running them one at a time.
|
||||
"""
|
||||
def __init__(self, logger, bind_ip, bind_port):
|
||||
self.logger = logger
|
||||
self.bind_ip = bind_ip
|
||||
self.bind_port = int(bind_port)
|
||||
|
||||
def run(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.logger.info('Binding to %s:%s', self.bind_ip, self.bind_port)
|
||||
s.bind((self.bind_ip, self.bind_port))
|
||||
s.listen(20)
|
||||
while True:
|
||||
client, address = s.accept()
|
||||
self.logger.debug('Accepting connection from %s:%s', *address)
|
||||
client_file = client.makefile('rb+', 1)
|
||||
json_data = client_file.read()
|
||||
conf = Values(json.loads(json_data))
|
||||
|
||||
self.logger.info(
|
||||
'Starting run for %s:%s [put/get/del_concurrency: %s/%s/%s, '
|
||||
'num_objects: %s, num_gets: %s]', address[0], address[1],
|
||||
conf.put_concurrency, conf.get_concurrency,
|
||||
conf.del_concurrency, conf.num_objects, conf.num_gets)
|
||||
|
||||
logger = logging.getLogger('bench-server')
|
||||
level = logging.DEBUG if conf.log_level.lower() == 'debug' \
|
||||
else logging.INFO
|
||||
logger.setLevel(level)
|
||||
loghandler = logging.StreamHandler(client_file)
|
||||
logformat = logging.Formatter(
|
||||
'%(server)s %(asctime)s %(levelname)s %(message)s')
|
||||
loghandler.setFormatter(logformat)
|
||||
logger.addHandler(loghandler)
|
||||
logger = LogAdapter(logger, 'swift-bench-server')
|
||||
|
||||
controller = BenchController(logger, conf)
|
||||
try:
|
||||
controller.run()
|
||||
except socket.error:
|
||||
logger.warning('Socket error', exc_info=1)
|
||||
|
||||
logger.logger.removeHandler(loghandler)
|
||||
client_file.close()
|
||||
client.close()
|
||||
|
||||
self.logger.info('...bench run completed; waiting for next run.')
|
||||
|
||||
|
||||
class Bench(object):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
self.logger = logger
|
||||
self.aborted = False
|
||||
self.user = conf.user
|
||||
self.key = conf.key
|
||||
self.auth_url = conf.auth
|
||||
self.use_proxy = config_true_value(conf.use_proxy)
|
||||
self.auth_version = conf.auth_version
|
||||
self.logger.info("Auth version: %s" % self.auth_version)
|
||||
if self.use_proxy:
|
||||
url, token = client.get_auth(self.auth_url, self.user, self.key,
|
||||
auth_version=self.auth_version)
|
||||
self.token = token
|
||||
self.account = url.split('/')[-1]
|
||||
if conf.url == '':
|
||||
self.url = url
|
||||
else:
|
||||
self.url = conf.url
|
||||
else:
|
||||
self.token = 'SlapChop!'
|
||||
self.account = conf.account
|
||||
self.url = conf.url
|
||||
self.ip, self.port = self.url.split('/')[2].split(':')
|
||||
|
||||
self.object_size = int(conf.object_size)
|
||||
self.object_sources = conf.object_sources
|
||||
self.lower_object_size = int(conf.lower_object_size)
|
||||
self.upper_object_size = int(conf.upper_object_size)
|
||||
self.files = []
|
||||
if self.object_sources:
|
||||
self.object_sources = self.object_sources.split()
|
||||
self.files = [file(f, 'rb').read() for f in self.object_sources]
|
||||
|
||||
self.put_concurrency = int(conf.put_concurrency)
|
||||
self.get_concurrency = int(conf.get_concurrency)
|
||||
self.del_concurrency = int(conf.del_concurrency)
|
||||
self.total_objects = int(conf.num_objects)
|
||||
self.total_gets = int(conf.num_gets)
|
||||
self.timeout = int(conf.timeout)
|
||||
self.devices = conf.devices.split()
|
||||
self.names = names
|
||||
self.conn_pool = ConnectionPool(self.url,
|
||||
max(self.put_concurrency,
|
||||
self.get_concurrency,
|
||||
self.del_concurrency))
|
||||
|
||||
def _log_status(self, title):
|
||||
total = time.time() - self.beginbeat
|
||||
self.logger.info(_('%(complete)s %(title)s [%(fail)s failures], '
|
||||
'%(rate).01f/s'),
|
||||
{'title': title, 'complete': self.complete,
|
||||
'fail': self.failures,
|
||||
'rate': (float(self.complete) / total)})
|
||||
|
||||
@contextmanager
|
||||
def connection(self):
|
||||
try:
|
||||
hc = self.conn_pool.get()
|
||||
try:
|
||||
yield hc
|
||||
except CannotSendRequest:
|
||||
self.logger.info(_("CannotSendRequest. Skipping..."))
|
||||
try:
|
||||
hc.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.failures += 1
|
||||
hc = self.conn_pool.create()
|
||||
finally:
|
||||
self.conn_pool.put(hc)
|
||||
|
||||
def run(self):
|
||||
pool = eventlet.GreenPool(self.concurrency)
|
||||
self.beginbeat = self.heartbeat = time.time()
|
||||
self.heartbeat -= 13 # just to get the first report quicker
|
||||
self.failures = 0
|
||||
self.complete = 0
|
||||
for i in xrange(self.total):
|
||||
if self.aborted:
|
||||
break
|
||||
pool.spawn_n(self._run, i)
|
||||
pool.waitall()
|
||||
self._log_status(self.msg + ' **FINAL**')
|
||||
|
||||
def _run(self, thread):
|
||||
return
|
||||
|
||||
|
||||
class DistributedBenchController(object):
|
||||
"""
|
||||
This class manages a distributed swift-bench run. For this Controller
|
||||
class to make sense, the conf.bench_clients list must contain at least one
|
||||
entry.
|
||||
|
||||
The idea is to split the configured load between one or more
|
||||
swift-bench-client processes, each of which use eventlet for concurrency.
|
||||
We deliberately take a simple, naive approach with these limitations:
|
||||
1) Concurrency, num_objects, and num_gets are spread evenly between the
|
||||
swift-bench-client processes. With a low concurrency to
|
||||
swift-bench-client count ratio, rounding may result in a greater
|
||||
than desired aggregate concurrency.
|
||||
2) Each swift-bench-client process runs independently so some may
|
||||
finish up before others, i.e. the target aggregate concurrency is
|
||||
not necessarily present the whole time. This may bias aggregate
|
||||
reported rates lower than a more efficient architecture.
|
||||
3) Because of #2, some swift-bench-client processes may be running GETs
|
||||
while others are still runinng their PUTs. Because of this
|
||||
potential skew, distributed runs will not isolate one operation at a
|
||||
time like a single swift-bench run will.
|
||||
3) Reported aggregate rates are simply the sum of each
|
||||
swift-bench-client process reported FINAL number. That's probably
|
||||
inaccurate somehow.
|
||||
"""
|
||||
|
||||
def __init__(self, logger, conf):
|
||||
self.logger = logger
|
||||
# ... INFO 1000 PUTS **FINAL** [0 failures], 34.9/s
|
||||
self.final_re = re.compile(
|
||||
'INFO (\d+) (.*) \*\*FINAL\*\* \[(\d+) failures\], (\d+\.\d+)/s')
|
||||
self.clients = conf.bench_clients
|
||||
del conf.bench_clients
|
||||
for key, minval in [('put_concurrency', 1),
|
||||
('get_concurrency', 1),
|
||||
('del_concurrency', 1),
|
||||
('num_objects', 0),
|
||||
('num_gets', 0)]:
|
||||
setattr(conf, key,
|
||||
max(minval, int(getattr(conf, key)) / len(self.clients)))
|
||||
self.conf = conf
|
||||
|
||||
def run(self):
|
||||
eventlet.patcher.monkey_patch(socket=True)
|
||||
pool = eventlet.GreenPool(size=len(self.clients))
|
||||
pile = eventlet.GreenPile(pool)
|
||||
for client in self.clients:
|
||||
pile.spawn(self.do_run, client)
|
||||
results = {
|
||||
'PUTS': dict(count=0, failures=0, rate=0.0),
|
||||
'GETS': dict(count=0, failures=0, rate=0.0),
|
||||
'DEL': dict(count=0, failures=0, rate=0.0),
|
||||
}
|
||||
for result in pile:
|
||||
for k, v in result.iteritems():
|
||||
target = results[k]
|
||||
target['count'] += int(v['count'])
|
||||
target['failures'] += int(v['failures'])
|
||||
target['rate'] += float(v['rate'])
|
||||
for k in ['PUTS', 'GETS', 'DEL']:
|
||||
v = results[k]
|
||||
self.logger.info('%d %s **FINAL** [%d failures], %.1f/s' % (
|
||||
v['count'], k, v['failures'], v['rate']))
|
||||
|
||||
def do_run(self, client):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
ip, port = client.split(':')
|
||||
s.connect((ip, int(port)))
|
||||
s.sendall(json.dumps(self.conf.__dict__))
|
||||
s.shutdown(socket.SHUT_WR)
|
||||
s_file = s.makefile('rb', 1)
|
||||
result = {}
|
||||
for line in s_file:
|
||||
match = self.final_re.search(line)
|
||||
if match:
|
||||
g = match.groups()
|
||||
result[g[1]] = {
|
||||
'count': g[0],
|
||||
'failures': g[2],
|
||||
'rate': g[3],
|
||||
}
|
||||
else:
|
||||
sys.stderr.write('%s %s' % (client, line))
|
||||
return result
|
||||
|
||||
|
||||
class BenchController(object):
|
||||
|
||||
def __init__(self, logger, conf):
|
||||
self.logger = logger
|
||||
self.conf = conf
|
||||
self.names = []
|
||||
self.delete = config_true_value(conf.delete)
|
||||
self.gets = int(conf.num_gets)
|
||||
self.aborted = False
|
||||
self.delay = int(self.conf.delay)
|
||||
|
||||
def sigint1(self, signum, frame):
|
||||
if self.delete:
|
||||
print >>sys.stderr, (
|
||||
'SIGINT received; finishing up and running DELETE.\n'
|
||||
'Send one more SIGINT to exit *immediately*.')
|
||||
self.aborted = True
|
||||
if self.running and not isinstance(self.running, BenchDELETE):
|
||||
self.running.aborted = True
|
||||
signal.signal(signal.SIGINT, self.sigint2)
|
||||
else:
|
||||
self.sigint2(signum, frame)
|
||||
|
||||
def sigint2(self, signum, frame):
|
||||
sys.exit('Final SIGINT received.')
|
||||
|
||||
def run(self):
|
||||
eventlet.patcher.monkey_patch(socket=True)
|
||||
signal.signal(signal.SIGINT, self.sigint1)
|
||||
puts = BenchPUT(self.logger, self.conf, self.names)
|
||||
self.running = puts
|
||||
puts.run()
|
||||
if self.gets and not self.aborted:
|
||||
gets = BenchGET(self.logger, self.conf, self.names)
|
||||
self.running = gets
|
||||
gets.run()
|
||||
if self.delete:
|
||||
if self.delay != 0:
|
||||
self.logger.info('Delay before '
|
||||
'DELETE request %s sec'
|
||||
% self.delay)
|
||||
time.sleep(self.delay)
|
||||
dels = BenchDELETE(self.logger, self.conf, self.names)
|
||||
self.running = dels
|
||||
dels.run()
|
||||
|
||||
|
||||
class BenchDELETE(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.del_concurrency
|
||||
self.total = len(names)
|
||||
self.msg = 'DEL'
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('DEL')
|
||||
device, partition, name, container_name = self.names.pop()
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.delete_object(self.url, self.token,
|
||||
container_name, name, http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_delete_object(node, partition,
|
||||
self.account,
|
||||
container_name, name)
|
||||
except client.ClientException as e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchGET(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.get_concurrency
|
||||
self.total = self.total_gets
|
||||
self.msg = 'GETS'
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('GETS')
|
||||
device, partition, name, container_name = random.choice(self.names)
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.get_object(self.url, self.token,
|
||||
container_name, name, http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_get_object(node, partition,
|
||||
self.account,
|
||||
container_name, name)
|
||||
except client.ClientException as e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
self.complete += 1
|
||||
|
||||
|
||||
class BenchPUT(Bench):
|
||||
|
||||
def __init__(self, logger, conf, names):
|
||||
Bench.__init__(self, logger, conf, names)
|
||||
self.concurrency = self.put_concurrency
|
||||
self.total = self.total_objects
|
||||
self.msg = 'PUTS'
|
||||
self.containers = conf.containers
|
||||
|
||||
def _run(self, thread):
|
||||
if time.time() - self.heartbeat >= 15:
|
||||
self.heartbeat = time.time()
|
||||
self._log_status('PUTS')
|
||||
name = uuid.uuid4().hex
|
||||
if self.object_sources:
|
||||
source = random.choice(self.files)
|
||||
elif self.upper_object_size > self.lower_object_size:
|
||||
source = SourceFile(random.randint(self.lower_object_size,
|
||||
self.upper_object_size))
|
||||
else:
|
||||
source = SourceFile(self.object_size)
|
||||
device = random.choice(self.devices)
|
||||
partition = str(random.randint(1, 3000))
|
||||
container_name = random.choice(self.containers)
|
||||
with self.connection() as conn:
|
||||
try:
|
||||
if self.use_proxy:
|
||||
client.put_object(self.url, self.token,
|
||||
container_name, name, source,
|
||||
content_length=len(source),
|
||||
http_conn=conn)
|
||||
else:
|
||||
node = {'ip': self.ip, 'port': self.port, 'device': device}
|
||||
direct_client.direct_put_object(node, partition,
|
||||
self.account,
|
||||
container_name, name,
|
||||
source,
|
||||
content_length=len(source))
|
||||
except client.ClientException as e:
|
||||
self.logger.debug(str(e))
|
||||
self.failures += 1
|
||||
else:
|
||||
self.names.append((device, partition, name, container_name))
|
||||
self.complete += 1
|
@ -1,27 +0,0 @@
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# TODO(gholt): Tests
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestBench(unittest.TestCase):
|
||||
def test_placeholder(self):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user