merged trunk to fix conflict

This commit is contained in:
Chuck Thier 2011-03-03 10:50:56 -06:00
commit 975094cb57
67 changed files with 3676 additions and 547 deletions

2
bin/st
View File

@ -1723,7 +1723,7 @@ Example:
error_thread.abort = True
while error_thread.isAlive():
error_thread.join(0.01)
except Exception:
except (SystemExit, Exception):
for thread in threading_enumerate():
thread.abort = True
raise

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -22,9 +22,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -18,9 +18,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -22,9 +22,9 @@ import gettext
from optparse import OptionParser
from os.path import basename
from sys import argv, exit
from urlparse import urlparse
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.utils import urlparse
if __name__ == '__main__':

View File

@ -23,4 +23,4 @@ if __name__ == '__main__':
# currently AccountStat only supports run_once
options['once'] = True
run_daemon(AccountStat, conf_file, section_name='log-processor-stats',
**options)
log_name="account-stats", **options)

View File

@ -23,16 +23,18 @@ import sqlite3
if __name__ == '__main__':
gettext.install('swift', unicode=1)
if len(argv) != 4 or argv[1] != '-K':
exit('Syntax: %s -K <super_admin_key> <path to auth.db>' % argv[0])
_junk, _junk, super_admin_key, auth_db = argv
# This version will not attempt to prep swauth
# call(['swauth-prep', '-K', super_admin_key])
if len(argv) != 2:
exit('Syntax: %s <path_to_auth.db>' % argv[0])
_junk, auth_db = argv
conn = sqlite3.connect(auth_db)
for account, cfaccount, user, password, admin, reseller_admin in \
conn.execute('SELECT account, cfaccount, user, password, admin, '
'reseller_admin FROM account'):
cmd = ['swauth-add-user', '-K', super_admin_key, '-s',
try:
listing = conn.execute('SELECT account, cfaccount, user, password, '
'admin, reseller_admin FROM account')
except sqlite3.OperationalError, err:
listing = conn.execute('SELECT account, cfaccount, user, password, '
'"f", "f" FROM account')
for account, cfaccount, user, password, admin, reseller_admin in listing:
cmd = ['swauth-add-user', '-K', '<your_swauth_key>', '-s',
cfaccount.split('_', 1)[1]]
if admin == 't':
cmd.append('-a')
@ -40,9 +42,3 @@ if __name__ == '__main__':
cmd.append('-r')
cmd.extend([account, user, password])
print ' '.join(cmd)
# For this version, the script will only print out the commands
# call(cmd)
print '----------------------------------------------------------------'
print ' Assuming the above worked perfectly, you should copy and paste '
print ' those lines into your ~/bin/recreateaccounts script.'
print '----------------------------------------------------------------'

View File

@ -22,7 +22,7 @@ import uuid
from optparse import OptionParser
from swift.common.bench import BenchController
from swift.common.utils import readconf, LogAdapter, NamedFormatter
from swift.common.utils import readconf, LogAdapter
# The defaults should be sufficient to run swift-bench on a SAIO
CONF_DEFAULTS = {
@ -125,9 +125,9 @@ if __name__ == '__main__':
options.log_level.lower(), logging.INFO))
loghandler = logging.StreamHandler()
logger.addHandler(loghandler)
logger = LogAdapter(logger)
logformat = NamedFormatter('swift-bench', logger,
fmt='%(server)s %(asctime)s %(levelname)s %(message)s')
logger = LogAdapter(logger, 'swift-bench')
logformat = logging.Formatter('%(server)s %(asctime)s %(levelname)s '
'%(message)s')
loghandler.setFormatter(logformat)
controller = BenchController(logger, options)

View File

@ -99,7 +99,8 @@ if __name__ == '__main__':
device_dir = conf.get('device_dir', '/srv/node')
minutes = int(conf.get('minutes', 60))
error_limit = int(conf.get('error_limit', 1))
logger = get_logger(conf, 'drive-audit')
conf['log_name'] = conf.get('log_name', 'drive-audit')
logger = get_logger(conf, log_route='drive-audit')
devices = get_devices(device_dir, logger)
logger.debug("Devices found: %s" % str(devices))
if not devices:

212
bin/swift-init Executable file → Normal file
View File

@ -14,180 +14,60 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import errno
import glob
import os
import resource
import signal
import sys
import time
from optparse import OptionParser
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-server', 'container-updater',
'object-auditor', 'object-server', 'object-replicator', 'object-updater',
'proxy-server', 'account-replicator', 'auth-server', 'account-reaper']
GRACEFUL_SHUTDOWN_SERVERS = ['account-server', 'container-server',
'object-server', 'proxy-server', 'auth-server']
MAX_DESCRIPTORS = 32768
MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB
from swift.common.manager import Server, Manager, UnknownCommandError
_junk, server, command = sys.argv
if server == 'all':
servers = ALL_SERVERS
else:
if '-' not in server:
server = '%s-server' % server
servers = [server]
command = command.lower()
USAGE = """%prog <server> [<server> ...] <command> [options]
def pid_files(server):
if os.path.exists('/var/run/swift/%s.pid' % server):
pid_files = ['/var/run/swift/%s.pid' % server]
else:
pid_files = glob.glob('/var/run/swift/%s/*.pid' % server)
for pid_file in pid_files:
pid = int(open(pid_file).read().strip())
yield pid_file, pid
Commands:
""" + '\n'.join(["%16s: %s" % x for x in Manager.list_commands()])
def do_start(server, once=False):
server_type = '-'.join(server.split('-')[:-1])
for pid_file, pid in pid_files(server):
if os.path.exists('/proc/%s' % pid):
print "%s appears to already be running: %s" % (server, pid_file)
return
else:
print "Removing stale pid file %s" % pid_file
os.unlink(pid_file)
def main():
parser = OptionParser(USAGE)
parser.add_option('-v', '--verbose', action="store_true",
default=False, help="display verbose output")
parser.add_option('-w', '--no-wait', action="store_false", dest="wait",
default=True, help="won't wait for server to start "
"before returning")
parser.add_option('-o', '--once', action="store_true",
default=False, help="only run one pass of daemon")
# this is a negative option, default is options.daemon = True
parser.add_option('-n', '--no-daemon', action="store_false", dest="daemon",
default=True, help="start server interactively")
parser.add_option('-g', '--graceful', action="store_true",
default=False, help="send SIGHUP to supporting servers")
parser.add_option('-c', '--config-num', metavar="N", type="int",
dest="number", default=0,
help="send command to the Nth server only")
options, args = parser.parse_args()
if len(args) < 2:
parser.print_help()
print 'ERROR: specify server(s) and command'
return 1
command = args[-1]
servers = args[:-1]
# this is just a silly swap for me cause I always try to "start main"
commands = dict(Manager.list_commands()).keys()
if command not in commands and servers[0] in commands:
servers.append(command)
command = servers.pop(0)
manager = Manager(servers)
try:
resource.setrlimit(resource.RLIMIT_NOFILE,
(MAX_DESCRIPTORS, MAX_DESCRIPTORS))
resource.setrlimit(resource.RLIMIT_DATA,
(MAX_MEMORY, MAX_MEMORY))
except ValueError:
print "Unable to increase file descriptor limit. Running as non-root?"
os.environ['PYTHON_EGG_CACHE'] = '/tmp'
status = manager.run_command(command, **options.__dict__)
except UnknownCommandError:
parser.print_help()
print 'ERROR: unknown command, %s' % command
status = 1
def write_pid_file(pid_file, pid):
dir, file = os.path.split(pid_file)
if not os.path.exists(dir):
try:
os.makedirs(dir)
except OSError, err:
if err.errno == errno.EACCES:
sys.exit('Unable to create %s. Running as non-root?' % dir)
fp = open(pid_file, 'w')
fp.write('%d\n' % pid)
fp.close()
return 1 if status else 0
def launch(ini_file, pid_file):
cmd = 'swift-%s' % server
args = [server, ini_file]
if once:
print 'Running %s once' % server
args.append('once')
else:
print 'Starting %s' % server
pid = os.fork()
if pid == 0:
os.setsid()
with open(os.devnull, 'r+b') as nullfile:
for desc in (0, 1, 2): # close stdio
try:
os.dup2(nullfile.fileno(), desc)
except OSError:
pass
try:
if once:
os.execlp('swift-%s' % server, server,
ini_file, 'once')
else:
os.execlp('swift-%s' % server, server, ini_file)
except OSError:
print 'unable to launch %s' % server
sys.exit(0)
else:
write_pid_file(pid_file, pid)
ini_file = '/etc/swift/%s-server.conf' % server_type
if os.path.exists(ini_file):
# single config file over-rides config dirs
pid_file = '/var/run/swift/%s.pid' % server
launch_args = [(ini_file, pid_file)]
elif os.path.exists('/etc/swift/%s-server/' % server_type):
# found config directory, searching for config file(s)
launch_args = []
for num, ini_file in enumerate(glob.glob('/etc/swift/%s-server/*.conf' \
% server_type)):
pid_file = '/var/run/swift/%s/%d.pid' % (server, num)
# start a server for each ini_file found
launch_args.append((ini_file, pid_file))
else:
# maybe there's a config file(s) out there, but I couldn't find it!
print 'Unable to locate config file for %s. %s does not exist?' % \
(server, ini_file)
return
# start all servers
for ini_file, pid_file in launch_args:
launch(ini_file, pid_file)
def do_stop(server, graceful=False):
if graceful and server in GRACEFUL_SHUTDOWN_SERVERS:
sig = signal.SIGHUP
else:
sig = signal.SIGTERM
did_anything = False
pfiles = pid_files(server)
for pid_file, pid in pfiles:
did_anything = True
try:
print 'Stopping %s pid: %s signal: %s' % (server, pid, sig)
os.kill(pid, sig)
except OSError:
print "Process %d not running" % pid
try:
os.unlink(pid_file)
except OSError:
pass
for pid_file, pid in pfiles:
for _junk in xrange(150): # 15 seconds
if not os.path.exists('/proc/%s' % pid):
break
time.sleep(0.1)
else:
print 'Waited 15 seconds for pid %s (%s) to die; giving up' % \
(pid, pid_file)
if not did_anything:
print 'No %s running' % server
if command == 'start':
for server in servers:
do_start(server)
if command == 'stop':
for server in servers:
do_stop(server)
if command == 'shutdown':
for server in servers:
do_stop(server, graceful=True)
if command == 'restart':
for server in servers:
do_stop(server)
for server in servers:
do_start(server)
if command == 'reload' or command == 'force-reload':
for server in servers:
do_stop(server, graceful=True)
do_start(server)
if command == 'once':
for server in servers:
do_start(server, once=True)
if __name__ == "__main__":
sys.exit(main())

View File

@ -34,7 +34,7 @@ if __name__ == '__main__':
uploader_conf.update(plugin_conf)
# pre-configure logger
logger = utils.get_logger(uploader_conf, plugin,
logger = utils.get_logger(uploader_conf, log_route='log-uploader',
log_to_console=options.get('verbose', False))
# currently LogUploader only supports run_once
options['once'] = True

View File

@ -19,7 +19,7 @@ from errno import EEXIST
from gzip import GzipFile
from os import mkdir
from os.path import basename, dirname, exists, join as pathjoin
from sys import argv, exit
from sys import argv, exit, modules
from textwrap import wrap
from time import time
@ -48,6 +48,8 @@ The <search-value> can be of the form:
/sdb1 Matches devices with the device name sdb1
_shiny Matches devices with shiny in the meta data
_"snet: 5.6.7.8" Matches devices with snet: 5.6.7.8 in the meta data
[::1] Matches devices in any zone with the ip ::1
z1-[::1]:5678 Matches devices in zone 1 with the ip ::1 and port 5678
Most specific example:
d74z1-1.2.3.4:5678/sdb1_"snet: 5.6.7.8"
Nerd explanation:
@ -76,6 +78,13 @@ The <search-value> can be of the form:
i += 1
match.append(('ip', search_value[:i]))
search_value = search_value[i:]
elif len(search_value) and search_value[0] == '[':
i = 1
while i < len(search_value) and search_value[i] != ']':
i += 1
i += 1
match.append(('ip', search_value[:i].lstrip('[').rstrip(']')))
search_value = search_value[i:]
if search_value.startswith(':'):
i = 1
while i < len(search_value) and search_value[i].isdigit():
@ -110,6 +119,16 @@ The <search-value> can be of the form:
return devs
def format_device(dev):
"""
Format a device for display.
"""
if ':' in dev['ip']:
return 'd%(id)sz%(zone)s-[%(ip)s]:%(port)s/%(device)s_"%(meta)s"' % dev
else:
return 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
class Commands:
def unknown():
@ -134,9 +153,9 @@ swift-ring-builder <builder_file> create <part_power> <replicas>
except OSError, err:
if err.errno != EEXIST:
raise
pickle.dump(builder, open(pathjoin(backup_dir,
pickle.dump(builder.to_dict(), open(pathjoin(backup_dir,
'%d.' % time() + basename(argv[1])), 'wb'), protocol=2)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
def default():
@ -235,10 +254,18 @@ swift-ring-builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta>
print 'Invalid add value: %s' % argv[3]
exit(EXIT_ERROR)
i = 1
while i < len(rest) and rest[i] in '0123456789.':
if rest[i] == '[':
i += 1
ip = rest[1:i]
rest = rest[i:]
while i < len(rest) and rest[i] != ']':
i += 1
i += 1
ip = rest[1:i].lstrip('[').rstrip(']')
rest = rest[i:]
else:
while i < len(rest) and rest[i] in '0123456789.':
i += 1
ip = rest[1:i]
rest = rest[i:]
if not rest.startswith(':'):
print 'Invalid add value: %s' % argv[3]
@ -279,9 +306,13 @@ swift-ring-builder <builder_file> add z<zone>-<ip>:<port>/<device_name>_<meta>
builder.add_dev({'id': next_dev_id, 'zone': zone, 'ip': ip,
'port': port, 'device': device_name, 'weight': weight,
'meta': meta})
print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
if ':' in ip:
print 'Device z%s-[%s]:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
else:
print 'Device z%s-%s:%s/%s_"%s" with %s weight got id %s' % \
(zone, ip, port, device_name, meta, weight, next_dev_id)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
def set_weight():
@ -314,7 +345,7 @@ swift-ring-builder <builder_file> set_weight <search-value> <weight>
builder.set_dev_weight(dev['id'], weight)
print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
'weight set to %(weight)s' % dev
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
def set_info():
@ -342,6 +373,13 @@ swift-ring-builder <builder_file> set_info <search-value>
i += 1
change.append(('ip', change_value[:i]))
change_value = change_value[i:]
elif len(change_value) and change_value[0] == '[':
i = 1
while i < len(change_value) and change_value[i] != ']':
i += 1
i += 1
change.append(('ip', change_value[:i].lstrip('[').rstrip(']')))
change_value = change_value[i:]
if change_value.startswith(':'):
i = 1
while i < len(change_value) and change_value[i].isdigit():
@ -366,15 +404,13 @@ swift-ring-builder <builder_file> set_info <search-value>
if len(devs) > 1:
print 'Matched more than one device:'
for dev in devs:
print ' d%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_' \
'"%(meta)s"' % dev
print ' %s' % format_device(dev)
if raw_input('Are you sure you want to update the info for '
'these %s devices? (y/N) ' % len(devs)) != 'y':
print 'Aborting device modifications'
exit(EXIT_ERROR)
for dev in devs:
orig_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
orig_dev_string = format_device(dev)
test_dev = dict(dev)
for key, value in change:
test_dev[key] = value
@ -390,10 +426,8 @@ swift-ring-builder <builder_file> set_info <search-value>
exit(EXIT_ERROR)
for key, value in change:
dev[key] = value
new_dev_string = \
'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s"' % dev
print 'Device %s is now %s' % (orig_dev_string, new_dev_string)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
print 'Device %s is now %s' % (orig_dev_string, format_device(dev))
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
def remove():
@ -429,7 +463,7 @@ swift-ring-builder <builder_file> remove <search-value>
print 'd%(id)sz%(zone)s-%(ip)s:%(port)s/%(device)s_"%(meta)s" ' \
'marked for removal and will be removed next rebalance.' \
% dev
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
def rebalance():
@ -461,13 +495,14 @@ swift-ring-builder <builder_file> rebalance
% builder.min_part_hours
print '-' * 79
ts = time()
pickle.dump(builder.get_ring(),
pickle.dump(builder.get_ring().to_dict(),
GzipFile(pathjoin(backup_dir, '%d.' % ts +
basename(ring_file)), 'wb'), protocol=2)
pickle.dump(builder, open(pathjoin(backup_dir,
pickle.dump(builder.to_dict(), open(pathjoin(backup_dir,
'%d.' % ts + basename(argv[1])), 'wb'), protocol=2)
pickle.dump(builder.get_ring(), GzipFile(ring_file, 'wb'), protocol=2)
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.get_ring().to_dict(), GzipFile(ring_file, 'wb'),
protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
def validate():
@ -494,15 +529,15 @@ swift-ring-builder <builder_file> write_ring
'"rebalance"?'
else:
print 'Warning: Writing an empty ring'
pickle.dump(ring_data,
pickle.dump(ring_data.to_dict(),
GzipFile(pathjoin(backup_dir, '%d.' % time() +
basename(ring_file)), 'wb'), protocol=2)
pickle.dump(ring_data, GzipFile(ring_file, 'wb'), protocol=2)
pickle.dump(ring_data.to_dict(), GzipFile(ring_file, 'wb'), protocol=2)
exit(EXIT_RING_CHANGED)
def pretend_min_part_hours_passed():
builder.pretend_min_part_hours_passed()
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
def set_min_part_hours():
@ -518,7 +553,7 @@ swift-ring-builder <builder_file> set_min_part_hours <hours>
builder.change_min_part_hours(int(argv[3]))
print 'The minimum number of hours before a partition can be ' \
'reassigned is now set to %s' % argv[3]
pickle.dump(builder, open(argv[1], 'wb'), protocol=2)
pickle.dump(builder.to_dict(), open(argv[1], 'wb'), protocol=2)
exit(EXIT_RING_UNCHANGED)
@ -544,7 +579,17 @@ if __name__ == '__main__':
exit(EXIT_RING_UNCHANGED)
if exists(argv[1]):
builder = pickle.load(open(argv[1], 'rb'))
try:
builder = pickle.load(open(argv[1], 'rb'))
if not hasattr(builder, 'devs'):
builder_dict = builder
builder = RingBuilder(1, 1, 1)
builder.copy_from(builder_dict)
except ImportError: # Happens with really old builder pickles
modules['swift.ring_builder'] = \
modules['swift.common.ring.builder']
builder = RingBuilder(1, 1, 1)
builder.copy_from(pickle.load(open(argv[1], 'rb')))
for dev in builder.devs:
if dev and 'meta' not in dev:
dev['meta'] = ''

View File

@ -107,6 +107,7 @@ Instructions for Deploying Debian Packages for Swift
apt-get install rsync python-openssl python-setuptools python-webob
python-simplejson python-xattr python-greenlet python-eventlet
python-netifaces
#. Install base packages::

View File

@ -255,7 +255,7 @@ Configuring each node
Sample configuration files are provided with all defaults in line-by-line comments.
#. If your going to use the DevAuth (the default swift-auth-server), create
#. If you're going to use the DevAuth (the default swift-auth-server), create
`/etc/swift/auth-server.conf` (you can skip this if you're going to use
Swauth)::
@ -584,7 +584,6 @@ Setting up scripts for running Swift
#!/bin/bash
swift-init all stop
sleep 5
sudo umount /mnt/sdb1
sudo mkfs.xfs -f -i size=1024 /dev/sdb1
sudo mount /mnt/sdb1
@ -626,12 +625,9 @@ Setting up scripts for running Swift
#!/bin/bash
swift-init main start
# The auth-server line is only needed for DevAuth:
swift-init auth-server start
swift-init proxy-server start
swift-init account-server start
swift-init container-server start
swift-init object-server start
#. For Swauth (not needed for DevAuth), create `~/bin/recreateaccounts`::
@ -653,15 +649,7 @@ Setting up scripts for running Swift
# /etc/swift/auth-server.conf). This swift-auth-recreate-accounts line
# is only needed for DevAuth:
swift-auth-recreate-accounts -K devauth
swift-init object-updater start
swift-init container-updater start
swift-init object-replicator start
swift-init container-replicator start
swift-init account-replicator start
swift-init object-auditor start
swift-init container-auditor start
swift-init account-auditor start
swift-init account-reaper start
swift-init rest start
#. `chmod +x ~/bin/*`
#. `remakerings`

View File

@ -116,6 +116,13 @@ MemCacheD
:members:
:show-inheritance:
Manager
=========
.. automodule:: swift.common.manager
:members:
:show-inheritance:
Ratelimit
=========

View File

@ -164,4 +164,4 @@ earlier. This file will have one entry per account per hour for each account
with activity in that hour. One .csv file should be produced per hour. Note
that the stats will be delayed by at least two hours by default. This can be
changed with the new_log_cutoff variable in the config file. See
`log-processing.conf-sample` for more details.
`log-processor.conf-sample` for more details.

View File

@ -24,6 +24,9 @@ use = egg:swift#proxy
# set log_name = proxy-server
# set log_facility = LOG_LOCAL0
# set log_level = INFO
# set access_log_name = proxy-server
# set access_log_facility = LOG_LOCAL0
# set access_log_level = INFO
# set log_headers = False
# recheck_account_existence = 60
# recheck_container_existence = 60

View File

@ -7,13 +7,16 @@ pid file = /var/run/rsyncd.pid
max connections = 2
path = /srv/node
read only = false
lock file = /var/lock/account.lock
[container]
max connections = 4
path = /srv/node
read only = false
lock file = /var/lock/container.lock
[object]
max connections = 8
path = /srv/node
read only = false
lock file = /var/lock/object.lock

View File

@ -28,7 +28,7 @@ class AccountAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'account-auditor')
self.logger = get_logger(conf, log_route='account-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -53,7 +53,7 @@ class AccountReaper(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='account-reaper')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -42,7 +42,7 @@ class AccountController(object):
"""WSGI controller for the account server."""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='account-server')
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -20,7 +20,6 @@ from contextlib import contextmanager
from time import gmtime, strftime, time
from urllib import unquote, quote
from uuid import uuid4
from urlparse import urlparse
from hashlib import md5, sha1
import hmac
import base64
@ -32,7 +31,7 @@ from webob.exc import HTTPBadRequest, HTTPConflict, HTTPForbidden, \
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.db import get_db_connection
from swift.common.utils import get_logger, split_path
from swift.common.utils import get_logger, split_path, urlparse
class AuthController(object):
@ -90,7 +89,7 @@ class AuthController(object):
"""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='auth-server')
self.super_admin_key = conf.get('super_admin_key')
if not self.super_admin_key:
msg = _('No super_admin_key set in conf file! Exiting.')

View File

@ -16,13 +16,12 @@
import uuid
import time
import random
from urlparse import urlparse
from contextlib import contextmanager
import eventlet.pools
from eventlet.green.httplib import CannotSendRequest
from swift.common.utils import TRUE_VALUES
from swift.common.utils import TRUE_VALUES, urlparse
from swift.common import client
from swift.common import direct_client

View File

@ -26,7 +26,7 @@ class Daemon(object):
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(conf, 'swift-daemon')
self.logger = utils.get_logger(conf, log_route='daemon')
def run_once(self):
"""Override this to run the script once"""
@ -39,8 +39,8 @@ class Daemon(object):
def run(self, once=False, **kwargs):
"""Run the daemon"""
utils.validate_configuration()
utils.capture_stdio(self.logger, **kwargs)
utils.drop_privileges(self.conf.get('user', 'swift'))
utils.capture_stdio(self.logger, **kwargs)
def kill_children(*args):
signal.signal(signal.SIGTERM, signal.SIG_IGN)
@ -84,7 +84,7 @@ def run_daemon(klass, conf_file, section_name='',
logger = kwargs.pop('logger')
else:
logger = utils.get_logger(conf, conf.get('log_name', section_name),
log_to_console=kwargs.pop('verbose', False))
log_to_console=kwargs.pop('verbose', False), log_route=section_name)
try:
klass(conf).run(once=once, **kwargs)
except KeyboardInterrupt:

View File

@ -92,7 +92,7 @@ class Replicator(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='replicator')
self.root = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

605
swift/common/manager.py Normal file
View File

@ -0,0 +1,605 @@
# Copyright (c) 2010-2011 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import functools
import errno
import os
import resource
import signal
import sys
import time
import subprocess
import re
from swift.common.utils import search_tree, remove_file, write_file
SWIFT_DIR = '/etc/swift'
RUN_DIR = '/var/run/swift'
# auth-server has been removed from ALL_SERVERS, start it explicitly
ALL_SERVERS = ['account-auditor', 'account-server', 'container-auditor',
'container-replicator', 'container-server', 'container-updater',
'object-auditor', 'object-server', 'object-replicator', 'object-updater',
'proxy-server', 'account-replicator', 'account-reaper']
MAIN_SERVERS = ['proxy-server', 'account-server', 'container-server',
'object-server']
REST_SERVERS = [s for s in ALL_SERVERS if s not in MAIN_SERVERS]
GRACEFUL_SHUTDOWN_SERVERS = MAIN_SERVERS + ['auth-server']
START_ONCE_SERVERS = REST_SERVERS
KILL_WAIT = 15 # seconds to wait for servers to die
MAX_DESCRIPTORS = 32768
MAX_MEMORY = (1024 * 1024 * 1024) * 2 # 2 GB
def setup_env():
"""Try to increase resource limits of the OS. Move PYTHON_EGG_CACHE to /tmp
"""
try:
resource.setrlimit(resource.RLIMIT_NOFILE,
(MAX_DESCRIPTORS, MAX_DESCRIPTORS))
resource.setrlimit(resource.RLIMIT_DATA,
(MAX_MEMORY, MAX_MEMORY))
except ValueError:
print _("WARNING: Unable to increase file descriptor limit. "
"Running as non-root?")
os.environ['PYTHON_EGG_CACHE'] = '/tmp'
def command(func):
"""
Decorator to declare which methods are accessible as commands, commands
always return 1 or 0, where 0 should indicate success.
:param func: function to make public
"""
func.publicly_accessible = True
@functools.wraps(func)
def wrapped(*a, **kw):
rv = func(*a, **kw)
return 1 if rv else 0
return wrapped
def watch_server_pids(server_pids, interval=1, **kwargs):
"""Monitor a collection of server pids yeilding back those pids that
aren't responding to signals.
:param server_pids: a dict, lists of pids [int,...] keyed on
Server objects
"""
status = {}
start = time.time()
end = start + interval
server_pids = dict(server_pids) # make a copy
while True:
for server, pids in server_pids.items():
for pid in pids:
try:
# let pid stop if it wants to
os.waitpid(pid, os.WNOHANG)
except OSError, e:
if e.errno not in (errno.ECHILD, errno.ESRCH):
raise # else no such child/process
# check running pids for server
status[server] = server.get_running_pids(**kwargs)
for pid in pids:
# original pids no longer in running pids!
if pid not in status[server]:
yield server, pid
# update active pids list using running_pids
server_pids[server] = status[server]
if not [p for server, pids in status.items() for p in pids]:
# no more running pids
break
if time.time() > end:
break
else:
time.sleep(0.1)
class UnknownCommandError(Exception):
pass
class Manager():
"""Main class for performing commands on groups of servers.
:param servers: list of server names as strings
"""
def __init__(self, servers):
server_names = set()
for server in servers:
if server == 'all':
server_names.update(ALL_SERVERS)
elif server == 'main':
server_names.update(MAIN_SERVERS)
elif server == 'rest':
server_names.update(REST_SERVERS)
elif '*' in server:
# convert glob to regex
server_names.update([s for s in ALL_SERVERS if
re.match(server.replace('*', '.*'), s)])
else:
server_names.add(server)
self.servers = set()
for name in server_names:
self.servers.add(Server(name))
@command
def status(self, **kwargs):
"""display status of tracked pids for server
"""
status = 0
for server in self.servers:
status += server.status(**kwargs)
return status
@command
def start(self, **kwargs):
"""starts a server
"""
setup_env()
status = 0
for server in self.servers:
server.launch(**kwargs)
if not kwargs.get('daemon', True):
for server in self.servers:
try:
status += server.interact(**kwargs)
except KeyboardInterrupt:
print _('\nuser quit')
self.stop(**kwargs)
break
elif kwargs.get('wait', True):
for server in self.servers:
status += server.wait(**kwargs)
return status
@command
def no_wait(self, **kwargs):
"""spawn server and return immediately
"""
kwargs['wait'] = False
return self.start(**kwargs)
@command
def no_daemon(self, **kwargs):
"""start a server interactively
"""
kwargs['daemon'] = False
return self.start(**kwargs)
@command
def once(self, **kwargs):
"""start server and run one pass on supporting daemons
"""
kwargs['once'] = True
return self.start(**kwargs)
@command
def stop(self, **kwargs):
"""stops a server
"""
server_pids = {}
for server in self.servers:
signaled_pids = server.stop(**kwargs)
if not signaled_pids:
print _('No %s running') % server
else:
server_pids[server] = signaled_pids
# all signaled_pids, i.e. list(itertools.chain(*server_pids.values()))
signaled_pids = [p for server, pids in server_pids.items()
for p in pids]
# keep track of the pids yeiled back as killed for all servers
killed_pids = set()
for server, killed_pid in watch_server_pids(server_pids,
interval=KILL_WAIT, **kwargs):
print _("%s (%s) appears to have stopped") % (server, killed_pid)
killed_pids.add(killed_pid)
if not killed_pids.symmetric_difference(signaled_pids):
# all proccesses have been stopped
return 0
# reached interval n watch_pids w/o killing all servers
for server, pids in server_pids.items():
if not killed_pids.issuperset(pids):
# some pids of this server were not killed
print _('Waited %s seconds for %s to die; giving up') % (
KILL_WAIT, server)
return 1
@command
def shutdown(self, **kwargs):
"""allow current requests to finish on supporting servers
"""
kwargs['graceful'] = True
status = 0
status += self.stop(**kwargs)
return status
@command
def restart(self, **kwargs):
"""stops then restarts server
"""
status = 0
status += self.stop(**kwargs)
status += self.start(**kwargs)
return status
@command
def reload(self, **kwargs):
"""graceful shutdown then restart on supporting servers
"""
kwargs['graceful'] = True
status = 0
for server in self.servers:
m = Manager([server.server])
status += m.stop(**kwargs)
status += m.start(**kwargs)
return status
@command
def force_reload(self, **kwargs):
"""alias for reload
"""
return self.reload(**kwargs)
def get_command(self, cmd):
"""Find and return the decorated method named like cmd
:param cmd: the command to get, a string, if not found raises
UnknownCommandError
"""
cmd = cmd.lower().replace('-', '_')
try:
f = getattr(self, cmd)
except AttributeError:
raise UnknownCommandError(cmd)
if not hasattr(f, 'publicly_accessible'):
raise UnknownCommandError(cmd)
return f
@classmethod
def list_commands(cls):
"""Get all publicly accessible commands
:returns: a list of string tuples (cmd, help), the method names who are
decorated as commands
"""
get_method = lambda cmd: getattr(cls, cmd)
return sorted([(x.replace('_', '-'), get_method(x).__doc__.strip())
for x in dir(cls) if
getattr(get_method(x), 'publicly_accessible', False)])
def run_command(self, cmd, **kwargs):
"""Find the named command and run it
:param cmd: the command name to run
"""
f = self.get_command(cmd)
return f(**kwargs)
class Server():
"""Manage operations on a server or group of servers of similar type
:param server: name of server
"""
def __init__(self, server):
if '-' not in server:
server = '%s-server' % server
self.server = server.lower()
self.type = server.rsplit('-', 1)[0]
self.cmd = 'swift-%s' % server
self.procs = []
def __str__(self):
return self.server
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, repr(str(self)))
def __hash__(self):
return hash(str(self))
def __eq__(self, other):
try:
return self.server == other.server
except AttributeError:
return False
def get_pid_file_name(self, conf_file):
"""Translate conf_file to a corresponding pid_file
:param conf_file: an conf_file for this server, a string
:returns: the pid_file for this conf_file
"""
return conf_file.replace(
os.path.normpath(SWIFT_DIR), RUN_DIR, 1).replace(
'%s-server' % self.type, self.server, 1).rsplit(
'.conf', 1)[0] + '.pid'
def get_conf_file_name(self, pid_file):
"""Translate pid_file to a corresponding conf_file
:param pid_file: a pid_file for this server, a string
:returns: the conf_file for this pid_file
"""
return pid_file.replace(
os.path.normpath(RUN_DIR), SWIFT_DIR, 1).replace(
self.server, '%s-server' % self.type, 1).rsplit(
'.pid', 1)[0] + '.conf'
def conf_files(self, **kwargs):
"""Get conf files for this server
:param: number, if supplied will only lookup the nth server
:returns: list of conf files
"""
found_conf_files = search_tree(SWIFT_DIR, '%s-server*' % self.type,
'.conf')
number = kwargs.get('number')
if number:
try:
conf_files = [found_conf_files[number - 1]]
except IndexError:
conf_files = []
else:
conf_files = found_conf_files
if not conf_files:
# maybe there's a config file(s) out there, but I couldn't find it!
if not kwargs.get('quiet'):
print _('Unable to locate config %sfor %s') % (
('number %s ' % number if number else ''), self.server)
if kwargs.get('verbose') and not kwargs.get('quiet'):
if found_conf_files:
print _('Found configs:')
for i, conf_file in enumerate(found_conf_files):
print ' %d) %s' % (i + 1, conf_file)
return conf_files
def pid_files(self, **kwargs):
"""Get pid files for this server
:param: number, if supplied will only lookup the nth server
:returns: list of pid files
"""
pid_files = search_tree(RUN_DIR, '%s*' % self.server, '.pid')
if kwargs.get('number', 0):
conf_files = self.conf_files(**kwargs)
# filter pid_files to match the index of numbered conf_file
pid_files = [pid_file for pid_file in pid_files if
self.get_conf_file_name(pid_file) in conf_files]
return pid_files
def iter_pid_files(self, **kwargs):
"""Generator, yields (pid_file, pids)
"""
for pid_file in self.pid_files(**kwargs):
yield pid_file, int(open(pid_file).read().strip())
def signal_pids(self, sig, **kwargs):
"""Send a signal to pids for this server
:param sig: signal to send
:returns: a dict mapping pids (ints) to pid_files (paths)
"""
pids = {}
for pid_file, pid in self.iter_pid_files(**kwargs):
try:
if sig != signal.SIG_DFL:
print _('Signal %s pid: %s signal: %s') % (self.server,
pid, sig)
os.kill(pid, sig)
except OSError, e:
if e.errno == errno.ESRCH:
# pid does not exist
if kwargs.get('verbose'):
print _("Removing stale pid file %s") % pid_file
remove_file(pid_file)
else:
# process exists
pids[pid] = pid_file
return pids
def get_running_pids(self, **kwargs):
"""Get running pids
:returns: a dict mapping pids (ints) to pid_files (paths)
"""
return self.signal_pids(signal.SIG_DFL, **kwargs) # send noop
def kill_running_pids(self, **kwargs):
"""Kill running pids
:param graceful: if True, attempt SIGHUP on supporting servers
:returns: a dict mapping pids (ints) to pid_files (paths)
"""
graceful = kwargs.get('graceful')
if graceful and self.server in GRACEFUL_SHUTDOWN_SERVERS:
sig = signal.SIGHUP
else:
sig = signal.SIGTERM
return self.signal_pids(sig, **kwargs)
def status(self, pids=None, **kwargs):
"""Display status of server
:param: pids, if not supplied pids will be populated automatically
:param: number, if supplied will only lookup the nth server
:returns: 1 if server is not running, 0 otherwise
"""
if pids is None:
pids = self.get_running_pids(**kwargs)
if not pids:
number = kwargs.get('number', 0)
if number:
kwargs['quiet'] = True
conf_files = self.conf_files(**kwargs)
if conf_files:
print _("%s #%d not running (%s)") % (self.server, number,
conf_files[0])
else:
print _("No %s running") % self.server
return 1
for pid, pid_file in pids.items():
conf_file = self.get_conf_file_name(pid_file)
print _("%s running (%s - %s)") % (self.server, pid, conf_file)
return 0
def spawn(self, conf_file, once=False, wait=True, daemon=True, **kwargs):
"""Launch a subprocess for this server.
:param conf_file: path to conf_file to use as first arg
:param once: boolean, add once argument to command
:param wait: boolean, if true capture stdout with a pipe
:param daemon: boolean, if true ask server to log to console
:returns : the pid of the spawned process
"""
args = [self.cmd, conf_file]
if once:
args.append('once')
if not daemon:
# ask the server to log to console
args.append('verbose')
# figure out what we're going to do with stdio
if not daemon:
# do nothing, this process is open until the spawns close anyway
re_out = None
re_err = None
else:
re_err = subprocess.STDOUT
if wait:
# we're going to need to block on this...
re_out = subprocess.PIPE
else:
re_out = open(os.devnull, 'w+b')
proc = subprocess.Popen(args, stdout=re_out, stderr=re_err)
pid_file = self.get_pid_file_name(conf_file)
write_file(pid_file, proc.pid)
self.procs.append(proc)
return proc.pid
def wait(self, **kwargs):
"""
wait on spawned procs to start
"""
status = 0
for proc in self.procs:
# wait for process to close it's stdout
output = proc.stdout.read()
if output:
print output
proc.communicate()
if proc.returncode:
status += 1
return status
def interact(self, **kwargs):
"""
wait on spawned procs to terminate
"""
status = 0
for proc in self.procs:
# wait for process to terminate
proc.communicate()
if proc.returncode:
status += 1
return status
def launch(self, **kwargs):
"""
Collect conf files and attempt to spawn the processes for this server
"""
conf_files = self.conf_files(**kwargs)
if not conf_files:
return []
pids = self.get_running_pids(**kwargs)
already_started = False
for pid, pid_file in pids.items():
conf_file = self.get_conf_file_name(pid_file)
# for legacy compat you can't start other servers if one server is
# already running (unless -n specifies which one you want), this
# restriction could potentially be lifted, and launch could start
# any unstarted instances
if conf_file in conf_files:
already_started = True
print _("%s running (%s - %s)") % (self.server, pid, conf_file)
elif not kwargs.get('number', 0):
already_started = True
print _("%s running (%s - %s)") % (self.server, pid, pid_file)
if already_started:
print _("%s already started...") % self.server
return []
if self.server not in START_ONCE_SERVERS:
kwargs['once'] = False
pids = {}
for conf_file in conf_files:
if kwargs.get('once'):
msg = _('Running %s once') % self.server
else:
msg = _('Starting %s') % self.server
print '%s...(%s)' % (msg, conf_file)
try:
pid = self.spawn(conf_file, **kwargs)
except OSError, e:
if e.errno == errno.ENOENT:
# TODO: should I check if self.cmd exists earlier?
print _("%s does not exist") % self.cmd
break
pids[pid] = conf_file
return pids
def stop(self, **kwargs):
"""Send stop signals to pids for this server
:returns: a dict mapping pids (ints) to pid_files (paths)
"""
return self.kill_running_pids(**kwargs)

View File

@ -38,13 +38,17 @@ TRY_COUNT = 3
# will be considered failed for ERROR_LIMIT_DURATION seconds.
ERROR_LIMIT_COUNT = 10
ERROR_LIMIT_TIME = 60
ERROR_LIMIT_DURATION = 300
ERROR_LIMIT_DURATION = 60
def md5hash(key):
return md5(key).hexdigest()
class MemcacheConnectionError(Exception):
pass
class MemcacheRing(object):
"""
Simple, consistent-hashed memcache client.
@ -180,6 +184,7 @@ class MemcacheRing(object):
:param delta: amount to add to the value of key (or set as the value
if the key is not found) will be cast to an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
key = md5hash(key)
command = 'incr'
@ -209,6 +214,7 @@ class MemcacheRing(object):
return ret
except Exception, e:
self._exception_occurred(server, e)
raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, timeout=0):
"""
@ -220,6 +226,7 @@ class MemcacheRing(object):
value to 0 if the key is not found) will be cast to
an int
:param timeout: ttl in memcache
:raises MemcacheConnectionError:
"""
self.incr(key, delta=-delta, timeout=timeout)

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from urlparse import urlparse
from swift.common.utils import urlparse
def clean_acl(name, value):

View File

@ -26,11 +26,7 @@ class CatchErrorMiddleware(object):
def __init__(self, app, conf):
self.app = app
# if the application already has a logger we should use that one
self.logger = getattr(app, 'logger', None)
if not self.logger:
# and only call get_logger if we have to
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='catch-errors')
def __call__(self, env, start_response):
try:

View File

@ -53,7 +53,7 @@ class CNAMELookupMiddleware(object):
self.storage_domain = '.' + self.storage_domain
self.lookup_depth = int(conf.get('lookup_depth', '1'))
self.memcache = None
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='cname-lookup')
def __call__(self, env, start_response):
if not self.storage_domain:

View File

@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound
from swift.common.utils import split_path, cache_from_env, get_logger
from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class MaxSleepTimeHitError(Exception):
@ -39,7 +40,7 @@ class RateLimitMiddleware(object):
if logger:
self.logger = logger
else:
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='ratelimit')
self.account_ratelimit = float(conf.get('account_ratelimit', 0))
self.max_sleep_time_seconds = \
float(conf.get('max_sleep_time_seconds', 60))
@ -136,28 +137,31 @@ class RateLimitMiddleware(object):
:param max_rate: maximum rate allowed in requests per second
:raises: MaxSleepTimeHitError if max sleep time is exceeded.
'''
now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key,
delta=time_per_request_m)
need_to_sleep_m = 0
if (now_m - running_time_m >
self.rate_buffer_seconds * self.clock_accuracy):
next_avail_time = int(now_m + time_per_request_m)
self.memcache_client.set(key, str(next_avail_time),
serialize=False)
else:
need_to_sleep_m = \
max(running_time_m - now_m - time_per_request_m, 0)
try:
now_m = int(round(time.time() * self.clock_accuracy))
time_per_request_m = int(round(self.clock_accuracy / max_rate))
running_time_m = self.memcache_client.incr(key,
delta=time_per_request_m)
need_to_sleep_m = 0
if (now_m - running_time_m >
self.rate_buffer_seconds * self.clock_accuracy):
next_avail_time = int(now_m + time_per_request_m)
self.memcache_client.set(key, str(next_avail_time),
serialize=False)
else:
need_to_sleep_m = \
max(running_time_m - now_m - time_per_request_m, 0)
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
# treat as no-op decrement time
self.memcache_client.decr(key, delta=time_per_request_m)
raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" %
need_to_sleep_m)
max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy
if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01:
# treat as no-op decrement time
self.memcache_client.decr(key, delta=time_per_request_m)
raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" %
need_to_sleep_m)
return float(need_to_sleep_m) / self.clock_accuracy
return float(need_to_sleep_m) / self.clock_accuracy
except MemcacheConnectionError:
return 0
def handle_ratelimit(self, req, account_name, container_name, obj_name):
'''

View File

@ -21,7 +21,6 @@ from httplib import HTTPConnection, HTTPSConnection
from time import gmtime, strftime, time
from traceback import format_exc
from urllib import quote, unquote
from urlparse import urlparse
from uuid import uuid4
from hashlib import md5, sha1
import hmac
@ -36,7 +35,7 @@ from webob.exc import HTTPAccepted, HTTPBadRequest, HTTPConflict, \
from swift.common.bufferedhttp import http_connect_raw as http_connect
from swift.common.middleware.acl import clean_acl, parse_acl, referrer_allowed
from swift.common.utils import cache_from_env, get_logger, split_path
from swift.common.utils import cache_from_env, get_logger, split_path, urlparse
class Swauth(object):
@ -51,7 +50,7 @@ class Swauth(object):
def __init__(self, app, conf):
self.app = app
self.conf = conf
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='swauth')
self.log_headers = conf.get('log_headers') == 'True'
self.reseller_prefix = conf.get('reseller_prefix', 'AUTH').strip()
if self.reseller_prefix and self.reseller_prefix[-1] != '_':
@ -269,7 +268,7 @@ class Swauth(object):
user_groups = (req.remote_user or '').split(',')
if '.reseller_admin' in user_groups and \
account != self.reseller_prefix and \
account[len(self.reseller_prefix)].isalnum():
account[len(self.reseller_prefix)] != '.':
return None
if account in user_groups and \
(req.method not in ('DELETE', 'PUT') or container):
@ -475,7 +474,7 @@ class Swauth(object):
explained above.
"""
account = req.path_info_pop()
if req.path_info or not account.isalnum():
if req.path_info or not account or account[0] == '.':
return HTTPBadRequest(request=req)
if not self.is_account_admin(req, account):
return HTTPForbidden(request=req)
@ -551,7 +550,7 @@ class Swauth(object):
if not self.is_reseller_admin(req):
return HTTPForbidden(request=req)
account = req.path_info_pop()
if req.path_info != '/.services' or not account.isalnum():
if req.path_info != '/.services' or not account or account[0] == '.':
return HTTPBadRequest(request=req)
try:
new_services = json.loads(req.body)
@ -597,7 +596,7 @@ class Swauth(object):
if not self.is_reseller_admin(req):
return HTTPForbidden(request=req)
account = req.path_info_pop()
if req.path_info or not account.isalnum():
if req.path_info or not account or account[0] == '.':
return HTTPBadRequest(request=req)
# Ensure the container in the main auth account exists (this
# container represents the new account)
@ -679,7 +678,7 @@ class Swauth(object):
if not self.is_reseller_admin(req):
return HTTPForbidden(request=req)
account = req.path_info_pop()
if req.path_info or not account.isalnum():
if req.path_info or not account or account[0] == '.':
return HTTPBadRequest(request=req)
# Make sure the account has no users and get the account_id
marker = ''
@ -799,8 +798,8 @@ class Swauth(object):
"""
account = req.path_info_pop()
user = req.path_info_pop()
if req.path_info or not account.isalnum() or \
(not user.isalnum() and user != '.groups'):
if req.path_info or not account or account[0] == '.' or not user or \
(user[0] == '.' and user != '.groups'):
return HTTPBadRequest(request=req)
if not self.is_account_admin(req, account):
return HTTPForbidden(request=req)
@ -874,8 +873,8 @@ class Swauth(object):
req.headers.get('x-auth-user-reseller-admin') == 'true'
if reseller_admin:
admin = True
if req.path_info or not account.isalnum() or not user.isalnum() or \
not key:
if req.path_info or not account or account[0] == '.' or not user or \
user[0] == '.' or not key:
return HTTPBadRequest(request=req)
if reseller_admin:
if not self.is_super_admin(req):
@ -923,7 +922,8 @@ class Swauth(object):
# Validate path info
account = req.path_info_pop()
user = req.path_info_pop()
if req.path_info or not account.isalnum() or not user.isalnum():
if req.path_info or not account or account[0] == '.' or not user or \
user[0] == '.':
return HTTPBadRequest(request=req)
if not self.is_account_admin(req, account):
return HTTPForbidden(request=req)

View File

@ -16,6 +16,9 @@
"""
The swift3 middleware will emulate the S3 REST api on top of swift.
The boto python library is necessary to use this middleware (install
the python-boto package if you use Ubuntu).
The following opperations are currently supported:
* GET Service
@ -55,6 +58,7 @@ import rfc822
import hmac
import base64
import errno
import boto.utils
from xml.sax.saxutils import escape as xml_escape
import cgi
@ -378,31 +382,18 @@ class Swift3Middleware(object):
return ServiceController, d
def get_account_info(self, env, req):
if req.headers.get("content-md5"):
md5 = req.headers.get("content-md5")
else:
md5 = ""
if req.headers.get("content-type"):
content_type = req.headers.get("content-type")
else:
content_type = ""
if req.headers.get("date"):
date = req.headers.get("date")
else:
date = ""
h = req.method + "\n" + md5 + "\n" + content_type + "\n" + date + "\n"
for header in req.headers:
if header.startswith("X-Amz-"):
h += header.lower() + ":" + str(req.headers[header]) + "\n"
h += req.path
try:
account, user, _junk = \
req.headers['Authorization'].split(' ')[-1].split(':')
except Exception:
return None, None
headers = {}
for key in req.headers:
if type(req.headers[key]) == str:
headers[key] = req.headers[key]
h = boto.utils.canonical_string(req.method, req.path_qs, headers)
token = base64.urlsafe_b64encode(h)
return '%s:%s' % (account, user), token

View File

@ -69,6 +69,49 @@ class RingBuilder(object):
self._remove_devs = []
self._ring = None
def copy_from(self, builder):
if hasattr(builder, 'devs'):
self.part_power = builder.part_power
self.replicas = builder.replicas
self.min_part_hours = builder.min_part_hours
self.parts = builder.parts
self.devs = builder.devs
self.devs_changed = builder.devs_changed
self.version = builder.version
self._replica2part2dev = builder._replica2part2dev
self._last_part_moves_epoch = builder._last_part_moves_epoch
self._last_part_moves = builder._last_part_moves
self._last_part_gather_start = builder._last_part_gather_start
self._remove_devs = builder._remove_devs
else:
self.part_power = builder['part_power']
self.replicas = builder['replicas']
self.min_part_hours = builder['min_part_hours']
self.parts = builder['parts']
self.devs = builder['devs']
self.devs_changed = builder['devs_changed']
self.version = builder['version']
self._replica2part2dev = builder['_replica2part2dev']
self._last_part_moves_epoch = builder['_last_part_moves_epoch']
self._last_part_moves = builder['_last_part_moves']
self._last_part_gather_start = builder['_last_part_gather_start']
self._remove_devs = builder['_remove_devs']
self._ring = None
def to_dict(self):
return {'part_power': self.part_power,
'replicas': self.replicas,
'min_part_hours': self.min_part_hours,
'parts': self.parts,
'devs': self.devs,
'devs_changed': self.devs_changed,
'version': self.version,
'_replica2part2dev': self._replica2part2dev,
'_last_part_moves_epoch': self._last_part_moves_epoch,
'_last_part_moves': self._last_part_moves,
'_last_part_gather_start': self._last_part_gather_start,
'_remove_devs': self._remove_devs}
def change_min_part_hours(self, min_part_hours):
"""
Changes the value used to decide if a given partition can be moved

View File

@ -18,7 +18,7 @@ from gzip import GzipFile
from os.path import getmtime
from struct import unpack_from
from time import time
from swift.common.utils import hash_path
from swift.common.utils import hash_path, validate_configuration
class RingData(object):
@ -29,6 +29,11 @@ class RingData(object):
self._replica2part2dev_id = replica2part2dev_id
self._part_shift = part_shift
def to_dict(self):
return {'devs': self.devs,
'replica2part2dev_id': self._replica2part2dev_id,
'part_shift': self._part_shift}
class Ring(object):
"""
@ -39,6 +44,8 @@ class Ring(object):
"""
def __init__(self, pickle_gz_path, reload_time=15):
# can't use the ring unless HASH_PATH_SUFFIX is set
validate_configuration()
self.pickle_gz_path = pickle_gz_path
self.reload_time = reload_time
self._reload(force=True)
@ -47,6 +54,9 @@ class Ring(object):
self._rtime = time() + self.reload_time
if force or self.has_changed():
ring_data = pickle.load(GzipFile(self.pickle_gz_path, 'rb'))
if not hasattr(ring_data, 'devs'):
ring_data = RingData(ring_data['replica2part2dev_id'],
ring_data['devs'], ring_data['part_shift'])
self._mtime = getmtime(self.pickle_gz_path)
self.devs = ring_data.devs
self.zone2devs = {}
@ -139,4 +149,12 @@ class Ring(object):
zones.remove(self.devs[part2dev_id[part]]['zone'])
while zones:
zone = zones.pop(part % len(zones))
yield self.zone2devs[zone][part % len(self.zone2devs[zone])]
weighted_node = None
for i in xrange(len(self.zone2devs[zone])):
node = self.zone2devs[zone][(part + i) %
len(self.zone2devs[zone])]
if node.get('weight'):
weighted_node = node
break
if weighted_node:
yield weighted_node

View File

@ -34,6 +34,8 @@ from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from optparse import OptionParser
from tempfile import mkstemp
import cPickle as pickle
import glob
from urlparse import urlparse as stdlib_urlparse, ParseResult
import eventlet
from eventlet import greenio, GreenPool, sleep, Timeout, listen
@ -48,6 +50,10 @@ import logging
logging.thread = eventlet.green.thread
logging.threading = eventlet.green.threading
logging._lock = logging.threading.RLock()
# setup notice level logging
NOTICE = 25
logging._levelNames[NOTICE] = 'NOTICE'
SysLogHandler.priority_map['NOTICE'] = 'notice'
# These are lazily pulled from libc elsewhere
_sys_fallocate = None
@ -284,7 +290,8 @@ class LoggerFileObject(object):
return self
class LogAdapter(object):
# double inheritance to support property with setter
class LogAdapter(logging.LoggerAdapter, object):
"""
A Logger like object which performs some reformatting on calls to
:meth:`exception`. Can be used to store a threadlocal transaction id.
@ -292,11 +299,10 @@ class LogAdapter(object):
_txn_id = threading.local()
def __init__(self, logger):
self.logger = logger
for proxied_method in ('debug', 'log', 'warn', 'warning', 'error',
'critical', 'info'):
setattr(self, proxied_method, getattr(logger, proxied_method))
def __init__(self, logger, server):
logging.LoggerAdapter.__init__(self, logger, {})
self.server = server
setattr(self, 'warn', self.warning)
@property
def txn_id(self):
@ -310,15 +316,34 @@ class LogAdapter(object):
def getEffectiveLevel(self):
return self.logger.getEffectiveLevel()
def exception(self, msg, *args):
def process(self, msg, kwargs):
"""
Add extra info to message
"""
kwargs['extra'] = {'server': self.server, 'txn_id': self.txn_id}
return msg, kwargs
def notice(self, msg, *args, **kwargs):
"""
Convenience function for syslog priority LOG_NOTICE. The python
logging lvl is set to 25, just above info. SysLogHandler is
monkey patched to map this log lvl to the LOG_NOTICE syslog
priority.
"""
self.log(NOTICE, msg, *args, **kwargs)
def _exception(self, msg, *args, **kwargs):
logging.LoggerAdapter.exception(self, msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
_junk, exc, _junk = sys.exc_info()
call = self.logger.error
call = self.error
emsg = ''
if isinstance(exc, OSError):
if exc.errno in (errno.EIO, errno.ENOSPC):
emsg = str(exc)
else:
call = self.logger.exception
call = self._exception
elif isinstance(exc, socket.error):
if exc.errno == errno.ECONNREFUSED:
emsg = _('Connection refused')
@ -327,7 +352,7 @@ class LogAdapter(object):
elif exc.errno == errno.ETIMEDOUT:
emsg = _('Connection timeout')
else:
call = self.logger.exception
call = self._exception
elif isinstance(exc, eventlet.Timeout):
emsg = exc.__class__.__name__
if hasattr(exc, 'seconds'):
@ -336,53 +361,25 @@ class LogAdapter(object):
if exc.msg:
emsg += ' %s' % exc.msg
else:
call = self.logger.exception
call('%s: %s' % (msg, emsg), *args)
call = self._exception
call('%s: %s' % (msg, emsg), *args, **kwargs)
class NamedFormatter(logging.Formatter):
class TxnFormatter(logging.Formatter):
"""
NamedFormatter is used to add additional information to log messages.
Normally it will simply add the server name as an attribute on the
LogRecord and the default format string will include it at the
begining of the log message. Additionally, if the transaction id is
available and not already included in the message, NamedFormatter will
add it.
NamedFormatter may be initialized with a format string which makes use
of the standard LogRecord attributes. In addition the format string
may include the following mapping key:
+----------------+---------------------------------------------+
| Format | Description |
+================+=============================================+
| %(server)s | Name of the swift server doing logging |
+----------------+---------------------------------------------+
:param server: the swift server name, a string.
:param logger: a Logger or :class:`LogAdapter` instance, additional
context may be pulled from attributes on this logger if
available.
:param fmt: the format string used to construct the message, if none is
supplied it defaults to ``"%(server)s %(message)s"``
Custom logging.Formatter will append txn_id to a log message if the record
has one and the message does not.
"""
def __init__(self, server, logger,
fmt="%(server)s %(message)s"):
logging.Formatter.__init__(self, fmt)
self.server = server
self.logger = logger
def format(self, record):
record.server = self.server
msg = logging.Formatter.format(self, record)
if self.logger.txn_id and (record.levelno != logging.INFO or
self.logger.txn_id not in msg):
msg = "%s (txn: %s)" % (msg, self.logger.txn_id)
if (record.txn_id and record.levelno != logging.INFO and
record.txn_id not in msg):
msg = "%s (txn: %s)" % (msg, record.txn_id)
return msg
def get_logger(conf, name=None, log_to_console=False, log_route=None):
def get_logger(conf, name=None, log_to_console=False, log_route=None,
fmt="%(server)s %(message)s"):
"""
Get the current system logger using config settings.
@ -395,51 +392,52 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None):
:param conf: Configuration dict to read settings from
:param name: Name of the logger
:param log_to_console: Add handler which writes to console on stderr
:param log_route: Route for the logging, not emitted to the log, just used
to separate logging configurations
:param fmt: Override log format
"""
if not conf:
conf = {}
if not hasattr(get_logger, 'root_logger_configured'):
get_logger.root_logger_configured = True
get_logger(conf, name, log_to_console, log_route='root')
if name is None:
name = conf.get('log_name', 'swift')
if not log_route:
log_route = name
if log_route == 'root':
logger = logging.getLogger()
else:
logger = logging.getLogger(log_route)
logger.propagate = False
if not hasattr(get_logger, 'handler4facility'):
get_logger.handler4facility = {}
facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
SysLogHandler.LOG_LOCAL0)
if facility in get_logger.handler4facility:
logger.removeHandler(get_logger.handler4facility[facility])
get_logger.handler4facility[facility].close()
del get_logger.handler4facility[facility]
if log_to_console:
# check if a previous call to get_logger already added a console logger
if hasattr(get_logger, 'console') and get_logger.console:
logger.removeHandler(get_logger.console)
get_logger.console = logging.StreamHandler(sys.__stderr__)
logger.addHandler(get_logger.console)
get_logger.handler4facility[facility] = \
SysLogHandler(address='/dev/log', facility=facility)
logger = logging.getLogger(log_route)
logger.propagate = False
# all new handlers will get the same formatter
formatter = TxnFormatter(fmt)
# get_logger will only ever add one SysLog Handler to a logger
if not hasattr(get_logger, 'handler4logger'):
get_logger.handler4logger = {}
if logger in get_logger.handler4logger:
logger.removeHandler(get_logger.handler4logger[logger])
get_logger.handler4logger[logger] = \
get_logger.handler4facility[facility]
logger.addHandler(get_logger.handler4facility[facility])
# facility for this logger will be set by last call wins
facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'),
SysLogHandler.LOG_LOCAL0)
handler = SysLogHandler(address='/dev/log', facility=facility)
handler.setFormatter(formatter)
logger.addHandler(handler)
get_logger.handler4logger[logger] = handler
# setup console logging
if log_to_console or hasattr(get_logger, 'console_handler4logger'):
# remove pre-existing console handler for this logger
if not hasattr(get_logger, 'console_handler4logger'):
get_logger.console_handler4logger = {}
if logger in get_logger.console_handler4logger:
logger.removeHandler(get_logger.console_handler4logger[logger])
console_handler = logging.StreamHandler(sys.__stderr__)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
get_logger.console_handler4logger[logger] = console_handler
# set the level for the logger
logger.setLevel(
getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO))
adapted_logger = LogAdapter(logger)
formatter = NamedFormatter(name, adapted_logger)
get_logger.handler4facility[facility].setFormatter(formatter)
if hasattr(get_logger, 'console'):
get_logger.console.setFormatter(formatter)
adapted_logger = LogAdapter(logger, name)
return adapted_logger
@ -472,8 +470,12 @@ def capture_stdio(logger, **kwargs):
# collect stdio file desc not in use for logging
stdio_fds = [0, 1, 2]
if hasattr(get_logger, 'console'):
stdio_fds.remove(get_logger.console.stream.fileno())
for _junk, handler in getattr(get_logger,
'console_handler4logger', {}).items():
try:
stdio_fds.remove(handler.stream.fileno())
except ValueError:
pass # fd not in list
with open(os.devnull, 'r+b') as nullfile:
# close stdio (excludes fds open for logging)
@ -788,6 +790,60 @@ def write_pickle(obj, dest, tmp):
renamer(tmppath, dest)
def search_tree(root, glob_match, ext):
"""Look in root, for any files/dirs matching glob, recurively traversing
any found directories looking for files ending with ext
:param root: start of search path
:param glob_match: glob to match in root, matching dirs are traversed with
os.walk
:param ext: only files that end in ext will be returned
:returns: list of full paths to matching files, sorted
"""
found_files = []
for path in glob.glob(os.path.join(root, glob_match)):
if path.endswith(ext):
found_files.append(path)
else:
for root, dirs, files in os.walk(path):
for file in files:
if file.endswith(ext):
found_files.append(os.path.join(root, file))
return sorted(found_files)
def write_file(path, contents):
"""Write contents to file at path
:param path: any path, subdirs will be created as needed
:param contents: data to write to file, will be converted to string
"""
dirname, name = os.path.split(path)
if not os.path.exists(dirname):
try:
os.makedirs(dirname)
except OSError, err:
if err.errno == errno.EACCES:
sys.exit('Unable to create %s. Running as '
'non-root?' % dirname)
with open(path, 'w') as f:
f.write('%s' % contents)
def remove_file(path):
"""Quiet wrapper for os.unlink, OSErrors are suppressed
:param path: first and only argument passed to os.unlink
"""
try:
os.unlink(path)
except OSError:
pass
def audit_location_generator(devices, datadir, mount_check=True, logger=None):
'''
Given a devices path and a data directory, yield (path, device,
@ -864,3 +920,35 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
elif running_time - now > time_per_request:
eventlet.sleep((running_time - now) / clock_accuracy)
return running_time + time_per_request
class ModifiedParseResult(ParseResult):
"Parse results class for urlparse."
@property
def hostname(self):
netloc = self.netloc.split('@', 1)[-1]
if netloc.startswith('['):
return netloc[1:].split(']')[0]
elif ':' in netloc:
return netloc.rsplit(':')[0]
return netloc
@property
def port(self):
netloc = self.netloc.split('@', 1)[-1]
if netloc.startswith('['):
netloc = netloc.rsplit(']')[1]
if ':' in netloc:
return int(netloc.rsplit(':')[1])
return None
def urlparse(url):
"""
urlparse augmentation.
This is necessary because urlparse can't handle RFC 2732 URLs.
:param url: URL to parse.
"""
return ModifiedParseResult(*stdlib_urlparse(url))

View File

@ -68,11 +68,15 @@ def get_socket(conf, default_port=8080):
"""
bind_addr = (conf.get('bind_ip', '0.0.0.0'),
int(conf.get('bind_port', default_port)))
address_family = [addr[0] for addr in socket.getaddrinfo(bind_addr[0],
bind_addr[1], socket.AF_UNSPEC, socket.SOCK_STREAM)
if addr[0] in (socket.AF_INET, socket.AF_INET6)][0]
sock = None
retry_until = time.time() + 30
while not sock and time.time() < retry_until:
try:
sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)))
sock = listen(bind_addr, backlog=int(conf.get('backlog', 4096)),
family=address_family)
if 'cert_file' in conf:
sock = ssl.wrap_socket(sock, certfile=conf['cert_file'],
keyfile=conf['key_file'])
@ -113,10 +117,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
logger = kwargs.pop('logger')
else:
logger = get_logger(conf, log_name,
log_to_console=kwargs.pop('verbose', False))
log_to_console=kwargs.pop('verbose', False), log_route='wsgi')
# redirect errors to logger and close stdio
capture_stdio(logger)
# bind to address and port
sock = get_socket(conf, default_port=kwargs.get('default_port', 8080))
# remaining tasks should not require elevated privileges
@ -125,6 +127,9 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
# finally after binding to ports and privilege drop, run app __init__ code
app = loadapp('config:%s' % conf_file, global_conf={'log_name': log_name})
# redirect errors to logger and close stdio
capture_stdio(logger)
def run_server():
wsgi.HttpProtocol.default_request_version = "HTTP/1.0"
eventlet.hubs.use_hub('poll')
@ -168,10 +173,10 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
run_server()
logger.info('Child %d exiting normally' % os.getpid())
logger.notice('Child %d exiting normally' % os.getpid())
return
else:
logger.info('Started child %s' % pid)
logger.notice('Started child %s' % pid)
children.append(pid)
try:
pid, status = os.wait()
@ -182,8 +187,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
except KeyboardInterrupt:
logger.info('User quit')
logger.notice('User quit')
break
greenio.shutdown_safe(sock)
sock.close()
logger.info('Exited')
logger.notice('Exited')

View File

@ -28,7 +28,7 @@ class ContainerAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'container-auditor')
self.logger = get_logger(conf, log_route='container-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -49,7 +49,7 @@ class ContainerController(object):
save_headers = ['x-container-read', 'x-container-write']
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='container-server')
self.root = conf.get('devices', '/srv/node/')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -89,7 +89,7 @@ class ContainerController(object):
account_partition = req.headers.get('X-Account-Partition')
account_device = req.headers.get('X-Account-Device')
if all([account_host, account_partition, account_device]):
account_ip, account_port = account_host.split(':')
account_ip, account_port = account_host.rsplit(':', 1)
new_path = '/' + '/'.join([account, container])
info = broker.get_info()
account_headers = {'x-put-timestamp': info['put_timestamp'],

View File

@ -37,7 +37,7 @@ class ContainerUpdater(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'container-updater')
self.logger = get_logger(conf, log_route='container-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -31,7 +31,7 @@ class ObjectAuditor(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'object-auditor')
self.logger = get_logger(conf, log_route='object-auditor')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -207,7 +207,7 @@ class ObjectReplicator(Daemon):
:param logger: logging object
"""
self.conf = conf
self.logger = get_logger(conf, 'object-replicator')
self.logger = get_logger(conf, log_route='object-replicator')
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -266,7 +266,7 @@ class ObjectController(object):
<source-dir>/etc/object-server.conf-sample or
/etc/swift/object-server.conf-sample.
"""
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='object-server')
self.devices = conf.get('devices', '/srv/node/')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
@ -301,7 +301,7 @@ class ObjectController(object):
full_path = '/%s/%s/%s' % (account, container, obj)
try:
with ConnectionTimeout(self.conn_timeout):
ip, port = host.split(':')
ip, port = host.rsplit(':', 1)
conn = http_connect(ip, port, contdevice, partition, op,
full_path, headers_out)
with Timeout(self.node_timeout):

View File

@ -35,7 +35,7 @@ class ObjectUpdater(Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = get_logger(conf, 'object-updater')
self.logger = get_logger(conf, log_route='object-updater')
self.devices = conf.get('devices', '/srv/node')
self.mount_check = conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')

View File

@ -624,7 +624,7 @@ class Controller(object):
res.bytes_transferred += len(chunk)
except GeneratorExit:
res.client_disconnect = True
self.app.logger.info(_('Client disconnected on read'))
self.app.logger.warn(_('Client disconnected on read'))
except (Exception, TimeoutError):
self.exception_occurred(node, _('Object'),
_('Trying to read during GET of %s') % req.path)
@ -1054,7 +1054,7 @@ class ObjectController(Controller):
if req.headers.get('transfer-encoding') and chunk == '':
break
except ChunkReadTimeout, err:
self.app.logger.info(
self.app.logger.warn(
_('ERROR Client read timeout (%ss)'), err.seconds)
return HTTPRequestTimeout(request=req)
except Exception:
@ -1064,7 +1064,7 @@ class ObjectController(Controller):
return Response(status='499 Client Disconnect')
if req.content_length and req.bytes_transferred < req.content_length:
req.client_disconnect = True
self.app.logger.info(
self.app.logger.warn(
_('Client disconnected without sending enough data'))
return Response(status='499 Client Disconnect')
statuses = []
@ -1606,12 +1606,20 @@ class BaseApplication(object):
def __init__(self, conf, memcache=None, logger=None, account_ring=None,
container_ring=None, object_ring=None):
if logger is None:
self.logger = get_logger(conf)
else:
self.logger = logger
if conf is None:
conf = {}
if logger is None:
self.logger = get_logger(conf, log_route='proxy-server')
access_log_conf = {}
for key in ('log_facility', 'log_name', 'log_level'):
value = conf.get('access_' + key, conf.get(key, None))
if value:
access_log_conf[key] = value
self.access_logger = get_logger(access_log_conf,
log_route='proxy-access')
else:
self.logger = self.access_logger = logger
swift_dir = conf.get('swift_dir', '/etc/swift')
self.node_timeout = int(conf.get('node_timeout', 10))
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
@ -1790,7 +1798,7 @@ class Application(BaseApplication):
if getattr(req, 'client_disconnect', False) or \
getattr(response, 'client_disconnect', False):
status_int = 499
self.logger.info(' '.join(quote(str(x)) for x in (
self.access_logger.info(' '.join(quote(str(x)) for x in (
client or '-',
req.remote_addr or '-',
time.strftime('%d/%b/%Y/%H/%M/%S', time.gmtime()),

View File

@ -34,7 +34,7 @@ class AccessLogProcessor(object):
conf.get('service_ips', '').split(',')\
if x.strip()]
self.warn_percent = float(conf.get('warn_percent', '0.8'))
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='access-processor')
def log_line_parser(self, raw_log):
'''given a raw access log line, return a dict of the good parts'''

View File

@ -48,7 +48,8 @@ class AccountStat(Daemon):
self.devices = server_conf.get('devices', '/srv/node')
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
('true', 't', '1', 'on', 'yes', 'y')
self.logger = get_logger(stats_conf, 'swift-account-stats-logger')
self.logger = \
get_logger(stats_conf, log_route='account-stats')
def run_once(self):
self.logger.info(_("Gathering account stats"))

View File

@ -40,7 +40,7 @@ class LogProcessor(object):
def __init__(self, conf, logger):
if isinstance(logger, tuple):
self.logger = get_logger(*logger)
self.logger = get_logger(*logger, log_route='log-processor')
else:
self.logger = logger
@ -159,11 +159,10 @@ class LogProcessor(object):
def get_object_data(self, swift_account, container_name, object_name,
compressed=False):
'''reads an object and yields its lines'''
code, o = self.internal_proxy.get_object(swift_account,
container_name,
object_name)
code, o = self.internal_proxy.get_object(swift_account, container_name,
object_name)
if code < 200 or code >= 300:
return
raise BadFileDownload()
last_part = ''
last_compressed_part = ''
# magic in the following zlib.decompressobj argument is courtesy of
@ -226,7 +225,7 @@ class LogProcessorDaemon(Daemon):
c = conf.get('log-processor')
super(LogProcessorDaemon, self).__init__(c)
self.total_conf = conf
self.logger = get_logger(c)
self.logger = get_logger(c, log_route='log-processor')
self.log_processor = LogProcessor(conf, self.logger)
self.lookback_hours = int(c.get('lookback_hours', '120'))
self.lookback_window = int(c.get('lookback_window',
@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon):
already_processed_files = cPickle.loads(buf)
else:
already_processed_files = set()
except Exception:
except BadFileDownload:
already_processed_files = set()
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon):
def multiprocess_collate(processor_args, logs_to_process, worker_count):
'''yield hourly data from logs_to_process'''
'''
yield hourly data from logs_to_process
Every item that this function yields will be added to the processed files
list.
'''
results = []
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
for x in logs_to_process:
in_queue.put(x)
for _junk in range(worker_count):
in_queue.put(None)
count = 0
in_queue.put(None) # tell the worker to end
while True:
try:
item, data = out_queue.get_nowait()
count += 1
if data:
yield item, data
if count >= len(logs_to_process):
# this implies that one result will come from every request
break
except Queue.Empty:
time.sleep(.1)
for r in results:
r.join()
time.sleep(.01)
else:
if not isinstance(data, BadFileDownload):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
break
def collate_worker(processor_args, in_queue, out_queue):
'''worker process for multiprocess_collate'''
p = LogProcessor(*processor_args)
while True:
item = in_queue.get()
if item is None:
# no more work to process
break
try:
item = in_queue.get_nowait()
if item is None:
break
except Queue.Empty:
time.sleep(.1)
else:
ret = p.process_one_file(*item)
out_queue.put((item, ret))
except BadFileDownload, err:
ret = err
out_queue.put((item, ret))

View File

@ -64,8 +64,9 @@ class LogUploader(Daemon):
self.container_name = container_name
self.filename_format = source_filename_format
self.internal_proxy = InternalProxy(proxy_server_conf)
log_name = 'swift-log-uploader-%s' % plugin_name
self.logger = utils.get_logger(uploader_conf, plugin_name)
log_name = '%s-log-uploader' % plugin_name
self.logger = utils.get_logger(uploader_conf, log_name,
log_route=plugin_name)
def run_once(self):
self.logger.info(_("Uploading logs"))
@ -78,7 +79,7 @@ class LogUploader(Daemon):
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
i.sort()
year_offset = month_offset = day_offset = hour_offset = None
base_offset = len(self.log_dir)
base_offset = len(self.log_dir.rstrip('/')) + 1
for start, c in i:
offset = base_offset + start
if c == '%Y':

View File

@ -20,7 +20,7 @@ class StatsLogProcessor(object):
"""Transform account storage stat logs"""
def __init__(self, conf):
self.logger = get_logger(conf)
self.logger = get_logger(conf, log_route='stats-processor')
def process(self, obj_stream, data_object_account, data_object_container,
data_object_name):

View File

@ -4,6 +4,8 @@ import os
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
def readuntil2crlfs(fd):
@ -68,6 +70,27 @@ xattr.setxattr = _setxattr
xattr.getxattr = _getxattr
@contextmanager
def temptree(files, contents=''):
# generate enough contents to fill the files
c = len(files)
contents = (list(contents) + [''] * c)[:c]
tempdir = mkdtemp()
for path, content in zip(files, contents):
if os.path.isabs(path):
path = '.' + path
new_path = os.path.join(tempdir, path)
subdir = os.path.dirname(new_path)
if not os.path.exists(subdir):
os.makedirs(subdir)
with open(new_path, 'w') as f:
f.write(str(content))
try:
yield tempdir
finally:
rmtree(tempdir)
class MockTrue(object):
"""
Instances of MockTrue evaluate like True

View File

@ -456,7 +456,7 @@ class TestAuthServer(unittest.TestCase):
def test_basic_logging(self):
log = StringIO()
log_handler = StreamHandler(log)
logger = get_logger(self.conf, 'auth')
logger = get_logger(self.conf, 'auth-server', log_route='auth-server')
logger.logger.addHandler(log_handler)
try:
auth_server.http_connect = fake_http_connect(201)
@ -534,7 +534,7 @@ class TestAuthServer(unittest.TestCase):
orig_Request = auth_server.Request
log = StringIO()
log_handler = StreamHandler(log)
logger = get_logger(self.conf, 'auth')
logger = get_logger(self.conf, 'auth-server', log_route='auth-server')
logger.logger.addHandler(log_handler)
try:
auth_server.Request = request_causing_exception

View File

@ -21,12 +21,14 @@ from webob import Request
from swift.common.middleware import ratelimit
from swift.proxy.server import get_container_memcache_key
from swift.common.memcached import MemcacheConnectionError
class FakeMemcache(object):
def __init__(self):
self.store = {}
self.error_on_incr = False
def get(self, key):
return self.store.get(key)
@ -36,6 +38,8 @@ class FakeMemcache(object):
return True
def incr(self, key, delta=1, timeout=0):
if self.error_on_incr:
raise MemcacheConnectionError('Memcache restarting')
self.store[key] = int(self.store.setdefault(key, 0)) + int(delta)
if self.store[key] < 0:
self.store[key] = 0
@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase):
start_response)
self._run(make_app_call, num_calls, current_rate)
def test_restarting_memcache(self):
current_rate = 2
num_calls = 5
conf_dict = {'account_ratelimit': current_rate}
self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp())
ratelimit.http_connect = mock_http_connect(204)
req = Request.blank('/v/a')
req.environ['swift.cache'] = FakeMemcache()
req.environ['swift.cache'].error_on_incr = True
make_app_call = lambda: self.test_ratelimit(req.environ,
start_response)
begin = time.time()
self._run(make_app_call, num_calls, current_rate, check_time=False)
time_took = time.time() - begin
self.assert_(round(time_took, 1) == 0) # no memcache, no limiting
if __name__ == '__main__':
unittest.main()

View File

@ -2576,6 +2576,23 @@ class TestAuth(unittest.TestCase):
{"groups": [{"name": "act:usr"}, {"name": "act"}],
"auth": "plaintext:key"})
def test_put_user_special_chars_success(self):
self.test_auth.app = FakeApp(iter([
('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),
# PUT of user object
('201 Created', {}, '')]))
resp = Request.blank('/auth/v2/act/u_s-r',
environ={'REQUEST_METHOD': 'PUT'},
headers={'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'supertest',
'X-Auth-User-Key': 'key'}
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 201)
self.assertEquals(self.test_auth.app.calls, 2)
self.assertEquals(json.loads(self.test_auth.app.request.body),
{"groups": [{"name": "act:u_s-r"}, {"name": "act"}],
"auth": "plaintext:key"})
def test_put_user_account_admin_success(self):
self.test_auth.app = FakeApp(iter([
('200 Ok', {'X-Container-Meta-Account-Id': 'AUTH_cfa'}, ''),

View File

@ -50,7 +50,8 @@ class TestRing(unittest.TestCase):
os.mkdir(self.testdir)
self.testgz = os.path.join(self.testdir, 'ring.gz')
self.intended_replica2part2dev_id = [[0, 2, 0, 2], [2, 0, 2, 0]]
self.intended_devs = [{'id': 0, 'zone': 0}, None, {'id': 2, 'zone': 2}]
self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0}, None,
{'id': 2, 'zone': 2, 'weight': 1.0}]
self.intended_part_shift = 30
self.intended_reload_time = 15
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
@ -69,10 +70,17 @@ class TestRing(unittest.TestCase):
self.assertEquals(self.ring.devs, self.intended_devs)
self.assertEquals(self.ring.reload_time, self.intended_reload_time)
self.assertEquals(self.ring.pickle_gz_path, self.testgz)
# test invalid endcap
_orig_hash_path_suffix = utils.HASH_PATH_SUFFIX
try:
utils.HASH_PATH_SUFFIX = ''
self.assertRaises(SystemExit, ring.Ring, self.testgz)
finally:
utils.HASH_PATH_SUFFIX = _orig_hash_path_suffix
def test_has_changed(self):
self.assertEquals(self.ring.has_changed(), False)
os.utime(self.testgz, (time()+60, time()+60))
os.utime(self.testgz, (time() + 60, time() + 60))
self.assertEquals(self.ring.has_changed(), True)
def test_reload(self):
@ -80,7 +88,7 @@ class TestRing(unittest.TestCase):
self.ring = ring.Ring(self.testgz, reload_time=0.001)
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 3)
self.intended_devs.append({'id': 3, 'zone': 3})
self.intended_devs.append({'id': 3, 'zone': 3, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@ -93,7 +101,7 @@ class TestRing(unittest.TestCase):
self.ring = ring.Ring(self.testgz, reload_time=0.001)
orig_mtime = self.ring._mtime
self.assertEquals(len(self.ring.devs), 4)
self.intended_devs.append({'id': 4, 'zone': 4})
self.intended_devs.append({'id': 4, 'zone': 4, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@ -108,7 +116,7 @@ class TestRing(unittest.TestCase):
orig_mtime = self.ring._mtime
part, nodes = self.ring.get_nodes('a')
self.assertEquals(len(self.ring.devs), 5)
self.intended_devs.append({'id': 5, 'zone': 5})
self.intended_devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
pickle.dump(ring.RingData(self.intended_replica2part2dev_id,
self.intended_devs, self.intended_part_shift),
GzipFile(self.testgz, 'wb'))
@ -127,57 +135,71 @@ class TestRing(unittest.TestCase):
self.assertRaises(TypeError, self.ring.get_nodes)
part, nodes = self.ring.get_nodes('a')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a4')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('aa')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c1')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c0')
self.assertEquals(part, 3)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c3')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o1')
self.assertEquals(part, 1)
self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}])
self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0},
{'id': 0, 'zone': 0, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o5')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o0')
self.assertEquals(part, 0)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
def test_get_more_nodes(self):
# Yes, these tests are deliberately very fragile. We want to make sure
# that if someone changes the results the ring produces, they know it.
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
self.ring.devs.append({'id': 3, 'zone': 0})
self.ring.devs.append({'id': 3, 'zone': 0, 'weight': 1.0})
self.ring.zone2devs[0].append(self.ring.devs[3])
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [])
@ -186,18 +208,36 @@ class TestRing(unittest.TestCase):
self.ring.zone2devs[3] = [self.ring.devs[3]]
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3}])
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}])
self.ring.devs.append(None)
self.ring.devs.append({'id': 5, 'zone': 5})
self.ring.devs.append({'id': 5, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5] = [self.ring.devs[5]]
part, nodes = self.ring.get_nodes('a', 'c', 'o2')
self.assertEquals(part, 2)
self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}])
self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0},
{'id': 2, 'zone': 2, 'weight': 1.0}])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3}, {'id': 5, 'zone': 5}])
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.ring.devs.append({'id': 6, 'zone': 5, 'weight': 1.0})
self.ring.zone2devs[5].append(self.ring.devs[6])
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 5, 'zone': 5, 'weight': 1.0}])
self.ring.devs[5]['weight'] = 0
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0},
{'id': 6, 'zone': 5, 'weight': 1.0}])
self.ring.devs[3]['weight'] = 0
self.ring.devs.append({'id': 7, 'zone': 6, 'weight': 0.0})
self.ring.zone2devs[6] = [self.ring.devs[7]]
nodes = list(self.ring.get_more_nodes(part))
self.assertEquals(nodes, [{'id': 6, 'zone': 5, 'weight': 1.0}])
if __name__ == '__main__':

View File

@ -28,7 +28,7 @@ class MyDaemon(daemon.Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(None, 'server')
self.logger = utils.get_logger(None, 'server', log_route='server')
MyDaemon.forever_called = False
MyDaemon.once_called = False
@ -99,7 +99,7 @@ user = %s
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server')
logger = utils.get_logger(None, 'server', log_route='server')
daemon.run_daemon(MyDaemon, conf_file, logger=logger)
self.assert_('user quit' in sio.getvalue().lower())

File diff suppressed because it is too large Load Diff

View File

@ -50,6 +50,7 @@ class MockMemcached(object):
self.cache = {}
self.down = False
self.exc_on_delete = False
self.read_return_none = False
def sendall(self, string):
if self.down:
@ -110,6 +111,8 @@ class MockMemcached(object):
else:
self.outbuf += 'NOT_FOUND\r\n'
def readline(self):
if self.read_return_none:
return None
if self.down:
raise Exception('mock is down')
if '\n' in self.outbuf:
@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '6')
memcache_client.incr('some_key', delta=-15)
self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15)
def test_decr(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase):
self.assertEquals(memcache_client.get('some_key'), '11')
memcache_client.decr('some_key', delta=15)
self.assertEquals(memcache_client.get('some_key'), '0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.decr, 'some_key', delta=15)
def test_retry(self):
logging.getLogger().addHandler(NullLoggingHandler())

View File

@ -16,9 +16,11 @@
""" Tests for swift.common.utils """
from __future__ import with_statement
from test.unit import temptree
import logging
import mimetools
import os
import errno
import socket
import sys
import time
@ -31,6 +33,8 @@ from tempfile import NamedTemporaryFile
from eventlet import sleep
from swift.common.exceptions import TimeoutError, MessageTimeout, \
ConnectionTimeout
from swift.common import utils
@ -76,6 +80,17 @@ class MockSys():
__stderr__ = sys.__stderr__
def reset_loggers():
if hasattr(utils.get_logger, 'handler4logger'):
for logger, handler in utils.get_logger.handler4logger.items():
logger.removeHandler(handler)
delattr(utils.get_logger, 'handler4logger')
if hasattr(utils.get_logger, 'console_handler4logger'):
for logger, h in utils.get_logger.console_handler4logger.items():
logger.removeHandler(h)
delattr(utils.get_logger, 'console_handler4logger')
class TestUtils(unittest.TestCase):
""" Tests for swift.common.utils """
@ -289,23 +304,152 @@ Error: unable to locate %s
sio = StringIO()
logger = logging.getLogger('server')
logger.addHandler(logging.StreamHandler(sio))
logger = utils.get_logger(None, 'server')
logger = utils.get_logger(None, 'server', log_route='server')
logger.warn('test1')
self.assertEquals(sio.getvalue(), 'test1\n')
logger.debug('test2')
self.assertEquals(sio.getvalue(), 'test1\n')
logger = utils.get_logger({'log_level': 'DEBUG'}, 'server')
logger = utils.get_logger({'log_level': 'DEBUG'}, 'server',
log_route='server')
logger.debug('test3')
self.assertEquals(sio.getvalue(), 'test1\ntest3\n')
# Doesn't really test that the log facility is truly being used all the
# way to syslog; but exercises the code.
logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server')
logger = utils.get_logger({'log_facility': 'LOG_LOCAL3'}, 'server',
log_route='server')
logger.warn('test4')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure debug doesn't log by default
logger.debug('test5')
self.assertEquals(sio.getvalue(),
'test1\ntest3\ntest4\n')
# make sure notice lvl logs by default
logger.notice('test6')
def test_clean_logger_exception(self):
# setup stream logging
sio = StringIO()
logger = utils.get_logger(None)
handler = logging.StreamHandler(sio)
logger.logger.addHandler(handler)
def strip_value(sio):
v = sio.getvalue()
sio.truncate(0)
return v
def log_exception(exc):
try:
raise exc
except (Exception, TimeoutError):
logger.exception('blah')
try:
# establish base case
self.assertEquals(strip_value(sio), '')
logger.info('test')
self.assertEquals(strip_value(sio), 'test\n')
self.assertEquals(strip_value(sio), '')
logger.info('test')
logger.info('test')
self.assertEquals(strip_value(sio), 'test\ntest\n')
self.assertEquals(strip_value(sio), '')
# test OSError
for en in (errno.EIO, errno.ENOSPC):
log_exception(OSError(en, 'my %s error message' % en))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my %s error message' % en in log_msg)
# unfiltered
log_exception(OSError())
self.assert_('Traceback' in strip_value(sio))
# test socket.error
log_exception(socket.error(errno.ECONNREFUSED,
'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('errno.ECONNREFUSED message test' not in log_msg)
self.assert_('Connection refused' in log_msg)
log_exception(socket.error(errno.EHOSTUNREACH,
'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my error message' not in log_msg)
self.assert_('Host unreachable' in log_msg)
log_exception(socket.error(errno.ETIMEDOUT, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('my error message' not in log_msg)
self.assert_('Connection timeout' in log_msg)
# unfiltered
log_exception(socket.error(0, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' in log_msg)
self.assert_('my error message' in log_msg)
# test eventlet.Timeout
log_exception(ConnectionTimeout(42, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('ConnectionTimeout' in log_msg)
self.assert_('(42s)' in log_msg)
self.assert_('my error message' not in log_msg)
log_exception(MessageTimeout(42, 'my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' not in log_msg)
self.assert_('MessageTimeout' in log_msg)
self.assert_('(42s)' in log_msg)
self.assert_('my error message' in log_msg)
# test unhandled
log_exception(Exception('my error message'))
log_msg = strip_value(sio)
self.assert_('Traceback' in log_msg)
self.assert_('my error message' in log_msg)
finally:
logger.logger.removeHandler(handler)
reset_loggers()
def test_txn_formatter(self):
# setup stream logging
sio = StringIO()
logger = utils.get_logger(None)
handler = logging.StreamHandler(sio)
handler.setFormatter(utils.TxnFormatter())
logger.logger.addHandler(handler)
def strip_value(sio):
v = sio.getvalue()
sio.truncate(0)
return v
try:
self.assertFalse(logger.txn_id)
logger.error('my error message')
log_msg = strip_value(sio)
self.assert_('my error message' in log_msg)
self.assert_('txn' not in log_msg)
logger.txn_id = '12345'
logger.error('test')
log_msg = strip_value(sio)
self.assert_('txn' in log_msg)
self.assert_('12345' in log_msg)
# test no txn on info message
self.assertEquals(logger.txn_id, '12345')
logger.info('test')
log_msg = strip_value(sio)
self.assert_('txn' not in log_msg)
self.assert_('12345' not in log_msg)
# test txn already in message
self.assertEquals(logger.txn_id, '12345')
logger.warn('test 12345 test')
self.assertEquals(strip_value(sio), 'test 12345 test\n')
finally:
logger.logger.removeHandler(handler)
reset_loggers()
def test_storage_directory(self):
self.assertEquals(utils.storage_directory('objects', '1', 'ABCDEF'),
@ -391,56 +535,71 @@ log_name = yarr'''
logger = utils.get_logger(None, 'dummy')
# mock utils system modules
utils.sys = MockSys()
utils.os = MockOs()
_orig_sys = utils.sys
_orig_os = utils.os
try:
utils.sys = MockSys()
utils.os = MockOs()
# basic test
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [0, 1, 2])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# basic test
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [0, 1, 2])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# reset; test same args, but exc when trying to close stdio
utils.os = MockOs(raise_funcs=('dup2',))
utils.sys = MockSys()
# reset; test same args, but exc when trying to close stdio
utils.os = MockOs(raise_funcs=('dup2',))
utils.sys = MockSys()
# test unable to close stdio
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# test unable to close stdio
utils.capture_stdio(logger)
self.assert_(utils.sys.excepthook is not None)
self.assertEquals(utils.os.closed_fds, [])
self.assert_(utils.sys.stdout is not None)
self.assert_(utils.sys.stderr is not None)
# reset; test some other args
logger = utils.get_logger(None, log_to_console=True)
utils.os = MockOs()
utils.sys = MockSys()
# reset; test some other args
logger = utils.get_logger(None, log_to_console=True)
utils.os = MockOs()
utils.sys = MockSys()
# test console log
utils.capture_stdio(logger, capture_stdout=False,
capture_stderr=False)
self.assert_(utils.sys.excepthook is not None)
# when logging to console, stderr remains open
self.assertEquals(utils.os.closed_fds, [0, 1])
logger.logger.removeHandler(utils.get_logger.console)
# stdio not captured
self.assertFalse(hasattr(utils.sys, 'stdout'))
self.assertFalse(hasattr(utils.sys, 'stderr'))
# test console log
utils.capture_stdio(logger, capture_stdout=False,
capture_stderr=False)
self.assert_(utils.sys.excepthook is not None)
# when logging to console, stderr remains open
self.assertEquals(utils.os.closed_fds, [0, 1])
reset_loggers()
# stdio not captured
self.assertFalse(hasattr(utils.sys, 'stdout'))
self.assertFalse(hasattr(utils.sys, 'stderr'))
reset_loggers()
finally:
utils.sys = _orig_sys
utils.os = _orig_os
def test_get_logger_console(self):
reload(utils) # reset get_logger attrs
reset_loggers()
logger = utils.get_logger(None)
self.assertFalse(hasattr(utils.get_logger, 'console'))
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assertFalse(console_handlers)
logger = utils.get_logger(None, log_to_console=True)
self.assert_(hasattr(utils.get_logger, 'console'))
self.assert_(isinstance(utils.get_logger.console,
logging.StreamHandler))
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assert_(console_handlers)
# make sure you can't have two console handlers
old_handler = utils.get_logger.console
self.assertEquals(len(console_handlers), 1)
old_handler = console_handlers[0]
logger = utils.get_logger(None, log_to_console=True)
self.assertNotEquals(utils.get_logger.console, old_handler)
logger.logger.removeHandler(utils.get_logger.console)
console_handlers = [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler)]
self.assertEquals(len(console_handlers), 1)
new_handler = console_handlers[0]
self.assertNotEquals(new_handler, old_handler)
reset_loggers()
def test_ratelimit_sleep(self):
running_time = 0
@ -468,6 +627,28 @@ log_name = yarr'''
total += i
self.assertTrue(abs(50 - (time.time() - start) * 100) < 10)
def test_urlparse(self):
parsed = utils.urlparse('http://127.0.0.1/')
self.assertEquals(parsed.scheme, 'http')
self.assertEquals(parsed.hostname, '127.0.0.1')
self.assertEquals(parsed.path, '/')
parsed = utils.urlparse('http://127.0.0.1:8080/')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('https://127.0.0.1/')
self.assertEquals(parsed.scheme, 'https')
parsed = utils.urlparse('http://[::1]/')
self.assertEquals(parsed.hostname, '::1')
parsed = utils.urlparse('http://[::1]:8080/')
self.assertEquals(parsed.hostname, '::1')
self.assertEquals(parsed.port, 8080)
parsed = utils.urlparse('www.example.com')
self.assertEquals(parsed.hostname, '')
def test_ratelimit_sleep_with_sleep(self):
running_time = 0
start = time.time()
@ -480,5 +661,86 @@ log_name = yarr'''
self.assertTrue(abs(100 - (time.time() - start) * 100) < 10)
def test_search_tree(self):
# file match & ext miss
with temptree(['asdf.conf', 'blarg.conf', 'asdf.cfg']) as t:
asdf = utils.search_tree(t, 'a*', '.conf')
self.assertEquals(len(asdf), 1)
self.assertEquals(asdf[0],
os.path.join(t, 'asdf.conf'))
# multi-file match & glob miss & sort
with temptree(['application.bin', 'apple.bin', 'apropos.bin']) as t:
app_bins = utils.search_tree(t, 'app*', 'bin')
self.assertEquals(len(app_bins), 2)
self.assertEquals(app_bins[0],
os.path.join(t, 'apple.bin'))
self.assertEquals(app_bins[1],
os.path.join(t, 'application.bin'))
# test file in folder & ext miss & glob miss
files = (
'sub/file1.ini',
'sub/file2.conf',
'sub.bin',
'bus.ini',
'bus/file3.ini',
)
with temptree(files) as t:
sub_ini = utils.search_tree(t, 'sub*', '.ini')
self.assertEquals(len(sub_ini), 1)
self.assertEquals(sub_ini[0],
os.path.join(t, 'sub/file1.ini'))
# test multi-file in folder & sub-folder & ext miss & glob miss
files = (
'folder_file.txt',
'folder/1.txt',
'folder/sub/2.txt',
'folder2/3.txt',
'Folder3/4.txt'
'folder.rc',
)
with temptree(files) as t:
folder_texts = utils.search_tree(t, 'folder*', '.txt')
self.assertEquals(len(folder_texts), 4)
f1 = os.path.join(t, 'folder_file.txt')
f2 = os.path.join(t, 'folder/1.txt')
f3 = os.path.join(t, 'folder/sub/2.txt')
f4 = os.path.join(t, 'folder2/3.txt')
for f in [f1, f2, f3, f4]:
self.assert_(f in folder_texts)
def test_write_file(self):
with temptree([]) as t:
file_name = os.path.join(t, 'test')
utils.write_file(file_name, 'test')
with open(file_name, 'r') as f:
contents = f.read()
self.assertEquals(contents, 'test')
# and also subdirs
file_name = os.path.join(t, 'subdir/test2')
utils.write_file(file_name, 'test2')
with open(file_name, 'r') as f:
contents = f.read()
self.assertEquals(contents, 'test2')
# but can't over-write files
file_name = os.path.join(t, 'subdir/test2/test3')
self.assertRaises(IOError, utils.write_file, file_name,
'test3')
def test_remove_file(self):
with temptree([]) as t:
file_name = os.path.join(t, 'blah.pid')
# assert no raise
self.assertEquals(os.path.exists(file_name), False)
self.assertEquals(utils.remove_file(file_name), None)
with open(file_name, 'w') as f:
f.write('1')
self.assert_(os.path.exists(file_name))
self.assertEquals(utils.remove_file(file_name), None)
self.assertFalse(os.path.exists(file_name))
if __name__ == '__main__':
unittest.main()

View File

@ -14,7 +14,7 @@
# limitations under the License.
# TODO: Tests
from test import unit as _setup_mocks
from test import unit
import unittest
import tempfile
import os
@ -57,6 +57,7 @@ class TestAuditor(unittest.TestCase):
def tearDown(self):
rmtree(os.path.dirname(self.testdir), ignore_errors=1)
unit.xattr_data = {}
def test_object_audit_extra_data(self):
self.auditor = auditor.ObjectAuditor(self.conf)

View File

@ -16,6 +16,7 @@
from __future__ import with_statement
import cPickle as pickle
import logging
from logging.handlers import SysLogHandler
import os
import sys
import unittest
@ -465,8 +466,138 @@ class TestController(unittest.TestCase):
test(404, 507, 503)
test(503, 503, 503)
class TestProxyServer(unittest.TestCase):
def test_access_log(self):
class MyApp(proxy_server.Application):
def handle_request(self, req):
resp = Response(request=req)
req.response = resp
req.start_time = time()
return resp
def start_response(*args):
pass
class MockLogger():
def __init__(self):
self.buffer = StringIO()
def info(self, msg, args=None):
if args:
msg = msg % args
self.buffer.write(msg)
def strip_value(self):
rv = self.buffer.getvalue()
self.buffer.truncate(0)
return rv
class SnarfStream(object):
# i can't seem to subclass cStringIO
def __init__(self, *args, **kwargs):
self.sio = StringIO()
def strip_value(self):
rv = self.getvalue().strip()
self.truncate(0)
return rv
def __getattr__(self, name):
try:
return object.__getattr__(self, name)
except AttributeError:
try:
return getattr(self.sio, name)
except AttributeError:
return self.__getattribute__(name)
snarf = SnarfStream()
_orig_get_logger = proxy_server.get_logger
def mock_get_logger(*args, **kwargs):
if kwargs.get('log_route') != 'proxy-access':
return _orig_get_logger(*args, **kwargs)
kwargs['log_route'] = 'snarf'
logger = _orig_get_logger(*args, **kwargs)
if [h for h in logger.logger.handlers if
isinstance(h, logging.StreamHandler) and h.stream is snarf]:
# snarf handler already setup!
return logger
formatter = logger.logger.handlers[0].formatter
formatter._fmt += ' %(levelname)s'
snarf_handler = logging.StreamHandler(snarf)
snarf_handler.setFormatter(formatter)
logger.logger.addHandler(snarf_handler)
return logger
def test_conf(conf):
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
req = Request.blank('')
app(req.environ, start_response)
try:
proxy_server.get_logger = mock_get_logger
test_conf({})
line = snarf.strip_value()
print line
self.assert_(line.startswith('swift'))
self.assert_(line.endswith('INFO'))
test_conf({'log_name': 'snarf-test'})
line = snarf.strip_value()
print line
self.assert_(line.startswith('snarf-test'))
self.assert_(line.endswith('INFO'))
test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR'})
line = snarf.strip_value()
print line
self.assertFalse(line)
test_conf({'log_name': 'snarf-test', 'log_level': 'ERROR',
'access_log_name': 'access-test',
'access_log_level': 'INFO'})
line = snarf.strip_value()
print line
self.assert_(line.startswith('access-test'))
self.assert_(line.endswith('INFO'))
# test facility
def get_facility(logger):
h = [h for h in logger.logger.handlers if
isinstance(h, SysLogHandler)][0]
return h.facility
conf = {'log_facility': 'LOG_LOCAL0'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL0)
conf = {'log_facility': 'LOG_LOCAL0',
'access_log_facility': 'LOG_LOCAL1'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL1)
conf = {'access_log_facility': 'LOG_LOCAL1'}
app = MyApp(conf, memcache=FakeMemcache(), account_ring=FakeRing(),
container_ring=FakeRing(), object_ring=FakeRing())
self.assertEquals(get_facility(app.logger),
SysLogHandler.LOG_LOCAL0)
self.assertEquals(get_facility(app.access_logger),
SysLogHandler.LOG_LOCAL1)
finally:
proxy_server.get_logger = _orig_get_logger
def test_unhandled_exception(self):
class MyApp(proxy_server.Application):
@ -1805,8 +1936,8 @@ class TestObjectController(unittest.TestCase):
def info(self, msg):
self.msg = msg
orig_logger = prosrv.logger
prosrv.logger = Logger()
orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger
prosrv.logger = prosrv.access_logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
@ -1822,11 +1953,9 @@ class TestObjectController(unittest.TestCase):
prosrv.logger.msg)
exp = 'host1'
self.assertEquals(prosrv.logger.msg[:len(exp)], exp)
prosrv.logger = orig_logger
# Turn on header logging.
orig_logger = prosrv.logger
prosrv.logger = Logger()
prosrv.logger = prosrv.access_logger = Logger()
prosrv.log_headers = True
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
@ -1840,7 +1969,7 @@ class TestObjectController(unittest.TestCase):
self.assert_('Goofy-Header%3A%20True' in prosrv.logger.msg,
prosrv.logger.msg)
prosrv.log_headers = False
prosrv.logger = orig_logger
prosrv.logger, prosrv.access_logger = orig_logger, orig_access_logger
def test_chunked_put_utf8_all_the_way_down(self):
# Test UTF-8 Unicode all the way through the system

View File

@ -15,9 +15,11 @@
import unittest
from test.unit import tmpfile
import Queue
from swift.common import internal_proxy
from swift.stats import log_processor
from swift.common.exceptions import ChunkReadTimeout
class FakeUploadApp(object):
@ -33,6 +35,11 @@ class DumbLogger(object):
pass
class DumbInternalProxy(object):
def __init__(self, code=200, timeout=False, bad_compressed=False):
self.code = code
self.timeout = timeout
self.bad_compressed = bad_compressed
def get_container_list(self, account, container, marker=None,
end_marker=None):
n = '2010/03/14/13/obj1'
@ -46,22 +53,28 @@ class DumbInternalProxy(object):
return []
def get_object(self, account, container, object_name):
code = 200
if object_name.endswith('.gz'):
# same data as below, compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
if self.bad_compressed:
# invalid compressed data
def data():
yield '\xff\xff\xff\xff\xff\xff\xff'
else:
# 'obj\ndata', compressed with gzip -9
def data():
yield '\x1f\x8b\x08'
yield '\x08"\xd79L'
yield '\x02\x03te'
yield 'st\x00\xcbO'
yield '\xca\xe2JI,I'
yield '\xe4\x02\x00O\xff'
yield '\xa3Y\t\x00\x00\x00'
else:
def data():
yield 'obj\n'
if self.timeout:
raise ChunkReadTimeout
yield 'data'
return code, data()
return self.code, data()
class TestLogProcessor(unittest.TestCase):
@ -159,6 +172,19 @@ use = egg:swift#proxy
'prefix_query': 0}}
self.assertEquals(result, expected)
def test_process_one_access_file_error(self):
access_proxy_config = self.proxy_config.copy()
access_proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
p = log_processor.LogProcessor(access_proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
self.assertRaises(log_processor.BadFileDownload, p.process_one_file,
'access', 'a', 'c', 'o')
def test_get_container_listing(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy()
@ -193,6 +219,18 @@ use = egg:swift#proxy
result = list(p.get_object_data('a', 'c', 'o.gz', True))
self.assertEquals(result, expected)
def test_get_object_data_errors(self):
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
p._internal_proxy = DumbInternalProxy(code=500)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(bad_compressed=True)
result = p.get_object_data('a', 'c', 'o.gz', True)
self.assertRaises(log_processor.BadFileDownload, list, result)
p._internal_proxy = DumbInternalProxy(timeout=True)
result = p.get_object_data('a', 'c', 'o')
self.assertRaises(log_processor.BadFileDownload, list, result)
def test_get_stat_totals(self):
stats_proxy_config = self.proxy_config.copy()
stats_proxy_config.update({
@ -262,3 +300,130 @@ use = egg:swift#proxy
# these only work for Py2.7+
#self.assertIsInstance(k, str)
self.assertTrue(isinstance(k, str), type(k))
def test_collate_worker(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
expected = {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}}
self.assertEquals(ret, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
q_in = Queue.Queue()
q_out = Queue.Queue()
work_request = ('access', 'a','c','o')
q_in.put(work_request)
q_in.put(None)
log_processor.collate_worker(processor_args, q_in, q_out)
item, ret = q_out.get()
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
type(ret))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate(self):
try:
log_processor.LogProcessor._internal_proxy = DumbInternalProxy()
def get_object_data(*a,**kw):
return [self.access_test_line]
orig_get_object_data = log_processor.LogProcessor.get_object_data
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = [(item, {('acct', '2010', '07', '09', '04'):
{('public', 'object', 'GET', '2xx'): 1,
('public', 'bytes_out'): 95,
'marker_query': 0,
'format_query': 1,
'delimiter_query': 0,
'path_query': 0,
('public', 'bytes_in'): 6,
'prefix_query': 0}})]
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
def test_multiprocess_collate_errors(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
proxy_config = self.proxy_config.copy()
proxy_config.update({
'log-processor-access': {
'source_filename_format':'%Y%m%d%H*',
'class_path':
'swift.stats.access_processor.AccessLogProcessor'
}})
processor_args = (proxy_config, DumbLogger())
item = ('access', 'a','c','o')
logs_to_process = [item]
results = log_processor.multiprocess_collate(processor_args,
logs_to_process,
1)
results = list(results)
expected = []
self.assertEquals(results, expected)
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data

View File

@ -13,16 +13,154 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: Tests
# TODO: More tests
import unittest
import os
from datetime import datetime
from tempfile import mkdtemp
from shutil import rmtree
from swift.stats import log_uploader
import logging
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger()
DEFAULT_GLOB = '%Y%m%d%H'
class TestLogUploader(unittest.TestCase):
def test_placeholder(self):
pass
def test_upload_all_logs(self):
class MockInternalProxy():
def create_container(self, *args, **kwargs):
pass
class MonkeyLogUploader(log_uploader.LogUploader):
def __init__(self, conf, logger=LOGGER):
self.log_dir = conf['log_dir']
self.filename_format = conf.get('filename_format',
DEFAULT_GLOB)
self.new_log_cutoff = 0
self.logger = logger
self.internal_proxy = MockInternalProxy()
self.swift_account = ''
self.container_name = ''
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s%0.2d' % (today_str, i))
for ts in time_strs:
open(os.path.join(tmpdir, ts), 'w').close()
conf = {'log_dir': tmpdir}
uploader = MonkeyLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s%0.2d' %
(today_str, i)), d)
self.assertEquals(file_date, expected)
finally:
rmtree(tmpdir)
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s-%0.2d00' % (today_str, i))
for ts in time_strs:
open(os.path.join(tmpdir, 'swift-blah_98764.%s-2400.tar.gz' %
ts), 'w').close()
open(os.path.join(tmpdir, 'swift.blah_98764.%s-2400.tar.gz' % ts),
'w').close()
open(os.path.join(tmpdir, 'swift-blah_98764.%s-2400.tar.g' % ts),
'w').close()
open(os.path.join(tmpdir,
'swift-blah_201102160100.%s-2400.tar.gz' %
'201102160100'), 'w').close()
conf = {
'log_dir': '%s/' % tmpdir,
'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz',
}
uploader = MonkeyLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
filename, date_dict = file_date
filename = os.path.basename(filename)
self.assert_(today_str in filename, filename)
self.assert_(filename.startswith('swift'), filename)
self.assert_(filename.endswith('tar.gz'), filename)
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
self.assertEquals(d, date_dict)
finally:
rmtree(tmpdir)
tmpdir = mkdtemp()
try:
today = datetime.now()
year = today.year
month = today.month
day = today.day
today_str = today.strftime('%Y%m%d')
time_strs = []
for i in range(24):
time_strs.append('%s%0.2d' % (today_str, i))
for i, ts in enumerate(time_strs):
open(os.path.join(tmpdir, '%s.%s.log' % (i, ts)), 'w').close()
conf = {
'log_dir': tmpdir,
'filename_format': '*.%Y%m%d%H.log',
}
uploader = MonkeyLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' %
(i, today_str, i)), d)
# TODO: support wildcards before the date pattern
# (i.e. relative offsets)
#print file_date
#self.assertEquals(file_date, expected)
finally:
rmtree(tmpdir)
if __name__ == '__main__':