Merge from trunk

This commit is contained in:
gholt 2011-05-09 22:18:17 +00:00
commit 8bf12ddaad
34 changed files with 1161 additions and 176 deletions

View File

@ -27,6 +27,7 @@ Stephen Milton
Russ Nelson
Colin Nicholson
Andrew Clay Shafer
Scott Simpson
Monty Taylor
Caleb Tennis
FUJITA Tomonori

4
CHANGELOG Normal file
View File

@ -0,0 +1,4 @@
swift (x.x.x)
* Renamed swift-stats-populate to swift-dispersion-populate and
swift-stats-report to swift-dispersion-report.

View File

@ -65,4 +65,4 @@ if __name__ == '__main__':
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'Account creation failed: %s %s' % (resp.status, resp.reason)
exit('Account creation failed: %s %s' % (resp.status, resp.reason))

View File

@ -90,4 +90,4 @@ if __name__ == '__main__':
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'User creation failed: %s %s' % (resp.status, resp.reason)
exit('User creation failed: %s %s' % (resp.status, resp.reason))

View File

@ -25,7 +25,7 @@ from optparse import OptionParser
from sys import argv, exit
from time import sleep, time
from swift.common.client import Connection
from swift.common.client import Connection, ClientException
if __name__ == '__main__':
@ -65,7 +65,15 @@ if __name__ == '__main__':
while True:
if options.verbose:
print 'GET %s?marker=%s' % (container, marker)
objs = conn.get_container(container, marker=marker)[1]
try:
objs = conn.get_container(container, marker=marker)[1]
except ClientException, e:
if e.http_status == 404:
exit('Container %s not found. swauth-prep needs to be '
'rerun' % (container))
else:
exit('Object listing on container %s failed with status '
'code %d' % (container, e.http_status))
if objs:
marker = objs[-1]['name']
else:
@ -90,7 +98,13 @@ if __name__ == '__main__':
(container, obj['name'],
time() - detail['expires'])
print 'DELETE %s/%s' % (container, obj['name'])
conn.delete_object(container, obj['name'])
try:
conn.delete_object(container, obj['name'])
except ClientException, e:
if e.http_status != 404:
print 'DELETE of %s/%s failed with status ' \
'code %d' % (container, obj['name'],
e.http_status)
elif options.verbose:
print "%s/%s won't expire for %ds; skipping" % \
(container, obj['name'],

View File

@ -57,4 +57,4 @@ if __name__ == '__main__':
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'Account deletion failed: %s %s' % (resp.status, resp.reason)
exit('Account deletion failed: %s %s' % (resp.status, resp.reason))

View File

@ -57,4 +57,4 @@ if __name__ == '__main__':
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'User deletion failed: %s %s' % (resp.status, resp.reason)
exit('User deletion failed: %s %s' % (resp.status, resp.reason))

View File

@ -75,9 +75,9 @@ If the [user] is '.groups', the active groups for the account will be listed.
conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers,
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'List failed: %s %s' % (resp.status, resp.reason)
body = resp.read()
if resp.status // 100 != 2:
exit('List failed: %s %s' % (resp.status, resp.reason))
if options.plain_text:
info = json.loads(body)
for group in info[['accounts', 'users', 'groups'][len(args)]]:

View File

@ -56,4 +56,4 @@ if __name__ == '__main__':
ssl=(parsed.scheme == 'https'))
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'Auth subsystem prep failed: %s %s' % (resp.status, resp.reason)
exit('Auth subsystem prep failed: %s %s' % (resp.status, resp.reason))

View File

@ -70,4 +70,4 @@ Example: %prog -K swauthkey test storage local http://127.0.0.1:8080/v1/AUTH_018
conn.send(body)
resp = conn.getresponse()
if resp.status // 100 != 2:
print 'Service set failed: %s %s' % (resp.status, resp.reason)
exit('Service set failed: %s %s' % (resp.status, resp.reason))

View File

@ -58,7 +58,7 @@
</div>
<script type="text/javascript">$('#searchbox').show(0);</script>
<p class="triangle-border right">
Psst... hey. Did you know you can read <a href="http://swift.openstack.org/1.2">Swift 1.2 docs</a> or <a href="http://swift.openstack.org/1.1">Swift 1.1 docs</a> also?
Psst... hey. Did you know you can read <a href="http://swift.openstack.org/1.3">Swift 1.3 docs</a> or <a href="http://swift.openstack.org/1.2">Swift 1.2 docs</a> also?
</p>
{%- endif %}

View File

@ -58,7 +58,7 @@ Instructions for Building Debian Packages for Swift
apt-get install python-software-properties
add-apt-repository ppa:swift-core/ppa
apt-get update
apt-get install curl gcc bzr python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr python-webob python-eventlet python-greenlet debhelper python-sphinx python-all python-openssl python-pastedeploy bzr-builddeb
apt-get install curl gcc bzr python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr python-webob python-eventlet python-greenlet debhelper python-sphinx python-all python-openssl python-pastedeploy python-netifaces bzr-builddeb
* As you

View File

@ -11,8 +11,8 @@ virtual machine will emulate running a four node Swift cluster.
* Get the *Ubuntu 10.04 LTS (Lucid Lynx)* server image:
- Ubuntu Server ISO: http://releases.ubuntu.com/10.04/ubuntu-10.04.1-server-amd64.iso (682 MB)
- Ubuntu Live/Install: http://cdimage.ubuntu.com/releases/10.04/release/ubuntu-10.04-dvd-amd64.iso (4.1 GB)
- Ubuntu Server ISO: http://releases.ubuntu.com/lucid/ubuntu-10.04.2-server-amd64.iso (717 MB)
- Ubuntu Live/Install: http://cdimage.ubuntu.com/releases/lucid/release/ubuntu-10.04.2-dvd-amd64.iso (4.2 GB)
- Ubuntu Mirrors: https://launchpad.net/ubuntu/+cdmirrors
* Create guest virtual machine from the Ubuntu image.
@ -70,6 +70,7 @@ Using a loopback device for storage
If you want to use a loopback device instead of another partition, follow these instructions.
#. `mkdir /srv`
#. `dd if=/dev/zero of=/srv/swift-disk bs=1024 count=0 seek=1000000`
(modify seek to make a larger or smaller partition)
#. `mkfs.xfs -i size=1024 /srv/swift-disk`
@ -79,7 +80,6 @@ If you want to use a loopback device instead of another partition, follow these
#. `mount /mnt/sdb1`
#. `mkdir /mnt/sdb1/1 /mnt/sdb1/2 /mnt/sdb1/3 /mnt/sdb1/4`
#. `chown <your-user-name>:<your-group-name> /mnt/sdb1/*`
#. `mkdir /srv`
#. `for x in {1..4}; do ln -s /mnt/sdb1/$x /srv/$x; done`
#. `mkdir -p /etc/swift/object-server /etc/swift/container-server /etc/swift/account-server /srv/1/node/sdb1 /srv/2/node/sdb2 /srv/3/node/sdb3 /srv/4/node/sdb4 /var/run/swift`
#. `chown -R <your-user-name>:<your-group-name> /etc/swift /srv/[1-4]/ /var/run/swift` -- **Make sure to include the trailing slash after /srv/[1-4]/**
@ -555,7 +555,9 @@ Sample configuration files are provided with all defaults in line-by-line commen
Setting up scripts for running Swift
------------------------------------
#. Create `~/bin/resetswift.` If you are using a loopback device substitute `/dev/sdb1` with `/srv/swift-disk`::
#. Create `~/bin/resetswift.`
If you are using a loopback device substitute `/dev/sdb1` with `/srv/swift-disk`.
If you did not set up rsyslog for individual logging, remove the `find /var/log/swift...` line::
#!/bin/bash

View File

@ -1,5 +1,5 @@
import gettext
__version__ = '1.3-dev'
__version__ = '1.4-dev'
gettext.install('swift')

View File

@ -147,6 +147,16 @@ class BenchDELETE(Bench):
self.total = len(names)
self.msg = 'DEL'
def run(self):
Bench.run(self)
for container in self.containers:
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, e:
if e.http_status != 409:
self._log_status("Unable to delete container '%s'. " \
"Got http status '%d'." % (container, e.http_status))
def _run(self, thread):
if time.time() - self.heartbeat >= 15:
self.heartbeat = time.time()

View File

@ -20,6 +20,8 @@ import random
import math
import time
import shutil
import uuid
import errno
from eventlet import GreenPool, sleep, Timeout, TimeoutError
from eventlet.green import subprocess
@ -49,7 +51,13 @@ def quarantine_db(object_file, server_type):
quarantine_dir = os.path.abspath(os.path.join(object_dir, '..',
'..', '..', '..', 'quarantined', server_type + 's',
os.path.basename(object_dir)))
renamer(object_dir, quarantine_dir)
try:
renamer(object_dir, quarantine_dir)
except OSError, e:
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
raise
quarantine_dir = "%s-%s" % (quarantine_dir, uuid.uuid4().hex)
renamer(object_dir, quarantine_dir)
class ReplConnection(BufferedHTTPConnection):

View File

@ -468,7 +468,7 @@ class Swauth(object):
{"account_id": "AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162",
"services": {"storage": {"default": "local",
"local": "http://127.0.0.1:8080/v1/AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162"}},
"local": "http://127.0.0.1:8080/v1/AUTH_018c3946"}},
"users": [{"name": "tester"}, {"name": "tester3"}]}
:param req: The webob.Request to process.
@ -522,7 +522,7 @@ class Swauth(object):
this::
"services": {"storage": {"default": "local",
"local": "http://127.0.0.1:8080/v1/AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162"}}
"local": "http://127.0.0.1:8080/v1/AUTH_018c3946"}}
Making use of this section is described in :func:`handle_get_token`.
@ -849,6 +849,12 @@ class Swauth(object):
raise Exception('Could not retrieve user object: %s %s' %
(path, resp.status))
body = resp.body
display_groups = [g['name'] for g in json.loads(body)['groups']]
if ('.admin' in display_groups and
not self.is_reseller_admin(req)) or \
('.reseller_admin' in display_groups and
not self.is_super_admin(req)):
return HTTPForbidden(request=req)
return Response(body=body)
def handle_put_user(self, req):

View File

@ -16,9 +16,6 @@
"""
The swift3 middleware will emulate the S3 REST api on top of swift.
The boto python library is necessary to use this middleware (install
the python-boto package if you use Ubuntu).
The following opperations are currently supported:
* GET Service
@ -239,7 +236,7 @@ class BucketController(Controller):
if 'acl' in args:
return get_acl(self.account_name)
objects = loads(''.join(list(body_iter)))
body = ('<?xml version="1.0" encoding="UTF-8"?>'
'<ListBucketResult '
@ -429,7 +426,7 @@ class Swift3Middleware(object):
self.app = app
def get_controller(self, path):
container, obj = split_path(path, 0, 2)
container, obj = split_path(path, 0, 2, True)
d = dict(container_name=container, object_name=obj)
if container and obj:
@ -438,32 +435,35 @@ class Swift3Middleware(object):
return BucketController, d
return ServiceController, d
def get_account_info(self, env, req):
try:
account, user, _junk = \
req.headers['Authorization'].split(' ')[-1].split(':')
except Exception:
return None, None
h = canonical_string(req)
token = base64.urlsafe_b64encode(h)
return '%s:%s' % (account, user), token
def __call__(self, env, start_response):
req = Request(env)
if not'Authorization' in req.headers:
if 'AWSAccessKeyId' in req.GET:
try:
req.headers['Date'] = req.GET['Expires']
req.headers['Authorization'] = \
'AWS %(AWSAccessKeyId)s:%(Signature)s' % req.GET
except KeyError:
return get_err_response('InvalidArgument')(env, start_response)
if not 'Authorization' in req.headers:
return self.app(env, start_response)
try:
account, signature = \
req.headers['Authorization'].split(' ')[-1].rsplit(':', 1)
except Exception:
return get_err_response('InvalidArgument')(env, start_response)
try:
controller, path_parts = self.get_controller(req.path)
except ValueError:
return get_err_response('InvalidURI')(env, start_response)
account_name, token = self.get_account_info(env, req)
if not account_name:
return get_err_response('InvalidArgument')(env, start_response)
token = base64.urlsafe_b64encode(canonical_string(req))
controller = controller(env, self.app, account, token, **path_parts)
controller = controller(env, self.app, account_name, token,
**path_parts)
if hasattr(controller, req.method):
res = getattr(controller, req.method)(env, start_response)
else:

View File

@ -776,7 +776,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None):
return conf
def write_pickle(obj, dest, tmp):
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
"""
Ensure that a pickle file gets written to disk. The file
is first written to a tmp location, ensure it is synced to disk, then
@ -784,11 +784,14 @@ def write_pickle(obj, dest, tmp):
:param obj: python object to be pickled
:param dest: path of final destination file
:param tmp: path to tmp to use
:param tmp: path to tmp to use, defaults to None
:param pickle_protocol: protocol to pickle the obj with, defaults to 0
"""
fd, tmppath = mkstemp(dir=tmp)
if tmp == None:
tmp = os.path.dirname(dest)
fd, tmppath = mkstemp(dir=tmp, suffix='.tmp')
with os.fdopen(fd, 'wb') as fo:
pickle.dump(obj, fo)
pickle.dump(obj, fo, pickle_protocol)
fo.flush()
os.fsync(fd)
renamer(tmppath, dest)

View File

@ -25,12 +25,6 @@ import mimetools
import eventlet
from eventlet import greenio, GreenPool, sleep, wsgi, listen
from paste.deploy import loadapp, appconfig
# Hook to ensure connection resets don't blow up our servers.
# Remove with next release of Eventlet that has it in the set already.
from errno import ECONNRESET
wsgi.ACCEPT_ERRNO.add(ECONNRESET)
from eventlet.green import socket, ssl
from swift.common.utils import get_logger, drop_privileges, \
@ -124,8 +118,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
# remaining tasks should not require elevated privileges
drop_privileges(conf.get('user', 'swift'))
# finally after binding to ports and privilege drop, run app __init__ code
app = loadapp('config:%s' % conf_file, global_conf={'log_name': log_name})
# Ensure the application can be loaded before proceeding.
loadapp('config:%s' % conf_file, global_conf={'log_name': log_name})
# redirect errors to logger and close stdio
capture_stdio(logger)
@ -135,6 +129,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs):
eventlet.hubs.use_hub('poll')
eventlet.patcher.monkey_patch(all=False, socket=True)
monkey_patch_mimetools()
app = loadapp('config:%s' % conf_file,
global_conf={'log_name': log_name})
pool = GreenPool(size=1024)
try:
wsgi.server(sock, app, NullLogger(), custom_pool=pool)

View File

@ -30,7 +30,7 @@ from eventlet.support.greenlets import GreenletExit
from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
renamer, compute_eta, get_logger
compute_eta, get_logger, write_pickle
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@ -105,9 +105,7 @@ def invalidate_hash(suffix_dir):
except Exception:
return
hashes[suffix] = None
with open(hashes_file + '.tmp', 'wb') as fp:
pickle.dump(hashes, fp, PICKLE_PROTOCOL)
renamer(hashes_file + '.tmp', hashes_file)
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
def get_hashes(partition_dir, recalculate=[], do_listdir=False,
@ -157,12 +155,20 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
modified = True
sleep()
if modified:
with open(hashes_file + '.tmp', 'wb') as fp:
pickle.dump(hashes, fp, PICKLE_PROTOCOL)
renamer(hashes_file + '.tmp', hashes_file)
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes
# Hack to work around Eventlet's tpool not catching and reraising Timeouts. We
# return the Timeout, Timeout if it's raised, the caller looks for it and
# reraises it if found.
def tpooled_get_hashes(*args, **kwargs):
try:
return get_hashes(*args, **kwargs)
except Timeout, err:
return err, err
class ObjectReplicator(Daemon):
"""
Replicate objects.
@ -336,9 +342,12 @@ class ObjectReplicator(Daemon):
self.replication_count += 1
begin = time.time()
try:
hashed, local_hash = tpool.execute(get_hashes, job['path'],
hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
do_listdir=(self.replication_count % 10) == 0,
reclaim_age=self.reclaim_age)
# See tpooled_get_hashes "Hack".
if isinstance(hashed, BaseException):
raise hashed
self.suffix_hash += hashed
attempts_left = self.object_ring.replica_count - 1
nodes = itertools.chain(job['nodes'],
@ -368,8 +377,12 @@ class ObjectReplicator(Daemon):
local_hash[suffix] != remote_hash.get(suffix, -1)]
if not suffixes:
continue
hashed, local_hash = tpool.execute(get_hashes, job['path'],
recalculate=suffixes, reclaim_age=self.reclaim_age)
hashed, local_hash = tpool.execute(tpooled_get_hashes,
job['path'], recalculate=suffixes,
reclaim_age=self.reclaim_age)
# See tpooled_get_hashes "Hack".
if isinstance(hashed, BaseException):
raise hashed
suffixes = [suffix for suffix in local_hash if
local_hash[suffix] != remote_hash.get(suffix, -1)]
self.rsync(node, job, suffixes)
@ -496,6 +509,7 @@ class ObjectReplicator(Daemon):
self.partition_times = []
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
eventlet.sleep() # Give spawns a cycle
try:
self.run_pool = GreenPool(size=self.concurrency)
jobs = self.collect_jobs()

View File

@ -44,7 +44,7 @@ from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist
from swift.obj.replicator import get_hashes, invalidate_hash
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash
DATADIR = 'objects'
@ -649,8 +649,8 @@ class ObjectController(object):
response = Response(content_type=file.metadata['Content-Type'],
request=request, conditional_response=True)
for key, value in file.metadata.iteritems():
if key == 'X-Object-Manifest' or \
key.lower().startswith('x-object-meta-'):
if key.lower().startswith('x-object-meta-') or \
key.lower() in self.allowed_headers:
response.headers[key] = value
response.etag = file.metadata['ETag']
response.last_modified = float(file.metadata['X-Timestamp'])
@ -708,7 +708,11 @@ class ObjectController(object):
if not os.path.exists(path):
mkdirs(path)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = tpool.execute(get_hashes, path, recalculate=suffixes)
_junk, hashes = tpool.execute(tpooled_get_hashes, path,
recalculate=suffixes)
# See tpooled_get_hashes "Hack".
if isinstance(hashes, BaseException):
raise hashes
return Response(body=pickle.dumps(hashes))
def __call__(self, env, start_response):

View File

@ -132,11 +132,23 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
for update in os.listdir(prefix_path):
last_obj_hash = None
for update in sorted(os.listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
continue
self.process_object_update(update_path, device)
try:
obj_hash, timestamp = update.split('-')
except ValueError:
self.logger.error(
_('ERROR async pending file with unexpected name %s')
% (update_path))
continue
if obj_hash == last_obj_hash:
os.unlink(update_path)
else:
self.process_object_update(update_path, device)
last_obj_hash = obj_hash
time.sleep(self.slowdown)
try:
os.rmdir(prefix_path)

View File

@ -507,8 +507,7 @@ class Controller(object):
return resp.status, resp.reason, resp.read()
elif resp.status == 507:
self.error_limit(node)
except Exception:
self.error_limit(node)
except (Exception, Timeout):
self.exception_occurred(node, self.server_type,
_('Trying to %(method)s %(path)s') %
{'method': method, 'path': path})
@ -646,6 +645,7 @@ class Controller(object):
raise
res.app_iter = file_iter()
update_headers(res, source.getheaders())
update_headers(res, {'accept-ranges': 'bytes'})
res.status = source.status
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
@ -655,6 +655,7 @@ class Controller(object):
elif 200 <= source.status <= 399:
res = status_map[source.status](request=req)
update_headers(res, source.getheaders())
update_headers(res, {'accept-ranges': 'bytes'})
if req.method == 'HEAD':
res.content_length = source.getheader('Content-Length')
if source.getheader('Content-Type'):
@ -829,6 +830,7 @@ class ObjectController(Controller):
resp)
resp.content_length = content_length
resp.last_modified = last_modified
resp.headers['accept-ranges'] = 'bytes'
return resp
@ -1326,8 +1328,8 @@ class AccountController(Controller):
if value[0].lower().startswith('x-account-meta-'))
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.make_requests(req, self.app.account_ring, account_partition,
'PUT', req.path_info, [headers] * len(accounts))
return self.make_requests(req, self.app.account_ring,
account_partition, 'PUT', req.path_info, [headers] * len(accounts))
@public
def POST(self, req):
@ -1343,8 +1345,9 @@ class AccountController(Controller):
if value[0].lower().startswith('x-account-meta-'))
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.make_requests(req, self.app.account_ring, account_partition,
'POST', req.path_info, [headers] * len(accounts))
return self.make_requests(req, self.app.account_ring,
account_partition, 'POST', req.path_info,
[headers] * len(accounts))
@public
def DELETE(self, req):
@ -1357,8 +1360,9 @@ class AccountController(Controller):
'X-CF-Trans-Id': self.trans_id}
if self.app.memcache:
self.app.memcache.delete('account%s' % req.path_info.rstrip('/'))
return self.make_requests(req, self.app.account_ring, account_partition,
'DELETE', req.path_info, [headers] * len(accounts))
return self.make_requests(req, self.app.account_ring,
account_partition, 'DELETE', req.path_info,
[headers] * len(accounts))
class BaseApplication(object):
@ -1551,6 +1555,8 @@ class Application(BaseApplication):
if not client and 'x-forwarded-for' in req.headers:
# remote user for other lbs
client = req.headers['x-forwarded-for'].split(',')[0].strip()
if not client:
client = req.remote_addr
logged_headers = None
if self.log_headers:
logged_headers = '\n'.join('%s: %s' % (k, v)

View File

@ -30,6 +30,8 @@ from swift.common.exceptions import ChunkReadTimeout
from swift.common.utils import get_logger, readconf
from swift.common.daemon import Daemon
now = datetime.datetime.now
class BadFileDownload(Exception):
def __init__(self, status_code=None):
@ -234,31 +236,46 @@ class LogProcessorDaemon(Daemon):
self.log_processor_container = c.get('container_name',
'log_processing_data')
self.worker_count = int(c.get('worker_count', '1'))
self._keylist_mapping = None
self.processed_files_filename = 'processed_files.pickle.gz'
def run_once(self, *args, **kwargs):
for k in 'lookback_hours lookback_window'.split():
if kwargs[k] is not None:
setattr(self, k, kwargs[k])
def get_lookback_interval(self):
"""
:returns: lookback_start, lookback_end.
Both or just lookback_end can be None. Otherwise, returns strings
of the form 'YYYYMMDDHH'. The interval returned is used as bounds
when looking for logs to processes.
A returned None means don't limit the log files examined on that
side of the interval.
"""
self.logger.info(_("Beginning log processing"))
start = time.time()
if self.lookback_hours == 0:
lookback_start = None
lookback_end = None
else:
delta_hours = datetime.timedelta(hours=self.lookback_hours)
lookback_start = datetime.datetime.now() - delta_hours
lookback_start = now() - delta_hours
lookback_start = lookback_start.strftime('%Y%m%d%H')
if self.lookback_window == 0:
lookback_end = None
else:
delta_window = datetime.timedelta(hours=self.lookback_window)
lookback_end = datetime.datetime.now() - \
lookback_end = now() - \
delta_hours + \
delta_window
lookback_end = lookback_end.strftime('%Y%m%d%H')
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
return lookback_start, lookback_end
def get_processed_files_list(self):
"""
:returns: a set of files that have already been processed or returns
None on error.
Downloads the set from the stats account. Creates an empty set if
the an existing file cannot be found.
"""
try:
# Note: this file (or data set) will grow without bound.
# In practice, if it becomes a problem (say, after many months of
@ -266,44 +283,52 @@ class LogProcessorDaemon(Daemon):
# entries. Automatically pruning on each run could be dangerous.
# There is not a good way to determine when an old entry should be
# pruned (lookback_hours could be set to anything and could change)
processed_files_stream = self.log_processor.get_object_data(
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz',
compressed=True)
buf = '\n'.join(x for x in processed_files_stream)
stream = self.log_processor.get_object_data(
self.log_processor_account,
self.log_processor_container,
self.processed_files_filename,
compressed=True)
buf = '\n'.join(x for x in stream)
if buf:
already_processed_files = cPickle.loads(buf)
files = cPickle.loads(buf)
else:
already_processed_files = set()
return None
except BadFileDownload, err:
if err.status_code == 404:
already_processed_files = set()
files = set()
else:
self.logger.error(_('Log processing unable to load list of '
'already processed log files'))
return
self.logger.debug(_('found %d processed files') % \
len(already_processed_files))
logs_to_process = self.log_processor.get_data_list(lookback_start,
lookback_end,
already_processed_files)
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
if not logs_to_process:
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
return
return None
return files
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
self.worker_count)
def get_aggregate_data(self, processed_files, input_data):
"""
Aggregates stats data by account/hour, summing as needed.
:param processed_files: set of processed files
:param input_data: is the output from multiprocess_collate/the plugins.
:returns: A dict containing data aggregated from the input_data
passed in.
The dict returned has tuple keys of the form:
(account, year, month, day, hour)
The dict returned has values that are dicts with items of this
form:
key:field_value
- key corresponds to something in one of the plugin's keylist
mapping, something like the tuple (source, level, verb, code)
- field_value is the sum of the field_values for the
corresponding values in the input
Both input_data and the dict returned are hourly aggregations of
stats.
Multiple values for the same (account, hour, tuple key) found in
input_data are summed in the dict returned.
"""
#reduce
aggr_data = {}
processed_files = already_processed_files
for item, data in results:
for item, data in input_data:
# since item contains the plugin and the log name, new plugins will
# "reprocess" the file and the results will be in the final csv.
processed_files.add(item)
@ -315,14 +340,30 @@ class LogProcessorDaemon(Daemon):
# processing plugins need to realize this
existing_data[i] = current + j
aggr_data[k] = existing_data
return aggr_data
def get_final_info(self, aggr_data):
"""
Aggregates data from aggr_data based on the keylist mapping.
:param aggr_data: The results of the get_aggregate_data function.
:returns: a dict of further aggregated data
The dict returned has keys of the form:
(account, year, month, day, hour)
The dict returned has values that are dicts with items of this
form:
'field_name': field_value (int)
Data is aggregated as specified by the keylist mapping. The
keylist mapping specifies which keys to combine in aggr_data
and the final field_names for these combined keys in the dict
returned. Fields combined are summed.
"""
# group
# reduce a large number of keys in aggr_data[k] to a small number of
# output keys
keylist_mapping = self.log_processor.generate_keylist_mapping()
final_info = collections.defaultdict(dict)
for account, data in aggr_data.items():
for key, mapping in keylist_mapping.items():
for key, mapping in self.keylist_mapping.items():
if isinstance(mapping, (list, set)):
value = 0
for k in mapping:
@ -336,37 +377,154 @@ class LogProcessorDaemon(Daemon):
except KeyError:
value = 0
final_info[account][key] = value
return final_info
# output
sorted_keylist_mapping = sorted(keylist_mapping)
columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
out_buf = [columns]
def store_processed_files_list(self, processed_files):
"""
Stores the proccessed files list in the stats account.
:param processed_files: set of processed files
"""
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
self.processed_files_filename)
def get_output(self, final_info):
"""
:returns: a list of rows to appear in the csv file.
The first row contains the column headers for the rest of the
rows in the returned list.
Each row after the first row corresponds to an account's data
for that hour.
"""
sorted_keylist_mapping = sorted(self.keylist_mapping)
columns = ['data_ts', 'account'] + sorted_keylist_mapping
output = [columns]
for (account, year, month, day, hour), d in final_info.items():
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
row = [data_ts]
row.append('%s' % account)
data_ts = '%04d/%02d/%02d %02d:00:00' % \
(int(year), int(month), int(day), int(hour))
row = [data_ts, '%s' % (account)]
for k in sorted_keylist_mapping:
row.append('%s' % d[k])
out_buf.append(','.join(row))
out_buf = '\n'.join(out_buf)
row.append(str(d[k]))
output.append(row)
return output
def store_output(self, output):
"""
Takes the a list of rows and stores a csv file of the values in the
stats account.
:param output: list of rows to appear in the csv file
This csv file is final product of this script.
"""
out_buf = '\n'.join([','.join(row) for row in output])
h = hashlib.md5(out_buf).hexdigest()
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
f = cStringIO.StringIO(out_buf)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
upload_name)
self.log_processor_account,
self.log_processor_container,
upload_name)
# cleanup
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
f = cStringIO.StringIO(s)
self.log_processor.internal_proxy.upload_file(f,
self.log_processor_account,
self.log_processor_container,
'processed_files.pickle.gz')
@property
def keylist_mapping(self):
"""
:returns: the keylist mapping.
The keylist mapping determines how the stats fields are aggregated
in the final aggregation step.
"""
if self._keylist_mapping == None:
self._keylist_mapping = \
self.log_processor.generate_keylist_mapping()
return self._keylist_mapping
def process_logs(self, logs_to_process, processed_files):
"""
:param logs_to_process: list of logs to process
:param processed_files: set of processed files
:returns: returns a list of rows of processed data.
The first row is the column headers. The rest of the rows contain
hourly aggregate data for the account specified in the row.
Files processed are added to the processed_files set.
When a large data structure is no longer needed, it is deleted in
an effort to conserve memory.
"""
# map
processor_args = (self.total_conf, self.logger)
results = multiprocess_collate(processor_args, logs_to_process,
self.worker_count)
# reduce
aggr_data = self.get_aggregate_data(processed_files, results)
del results
# group
# reduce a large number of keys in aggr_data[k] to a small
# number of output keys
final_info = self.get_final_info(aggr_data)
del aggr_data
# output
return self.get_output(final_info)
def run_once(self, *args, **kwargs):
"""
Process log files that fall within the lookback interval.
Upload resulting csv file to stats account.
Update processed files list and upload to stats account.
"""
for k in 'lookback_hours lookback_window'.split():
if k in kwargs and kwargs[k] is not None:
setattr(self, k, kwargs[k])
start = time.time()
self.logger.info(_("Beginning log processing"))
lookback_start, lookback_end = self.get_lookback_interval()
self.logger.debug('lookback_start: %s' % lookback_start)
self.logger.debug('lookback_end: %s' % lookback_end)
processed_files = self.get_processed_files_list()
if processed_files == None:
self.logger.error(_('Log processing unable to load list of '
'already processed log files'))
return
self.logger.debug(_('found %d processed files') %
len(processed_files))
logs_to_process = self.log_processor.get_data_list(lookback_start,
lookback_end, processed_files)
self.logger.info(_('loaded %d files to process') %
len(logs_to_process))
if logs_to_process:
output = self.process_logs(logs_to_process, processed_files)
self.store_output(output)
del output
self.store_processed_files_list(processed_files)
self.logger.info(_("Log processing done (%0.2f minutes)") %
((time.time() - start) / 60))
((time.time() - start) / 60))
def multiprocess_collate(processor_args, logs_to_process, worker_count):
@ -395,7 +553,7 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count):
except Queue.Empty:
time.sleep(.01)
else:
if not isinstance(data, BadFileDownload):
if not isinstance(data, Exception):
yield item, data
if not any(r.is_alive() for r in results) and out_queue.empty():
# all the workers are done and nothing is in the queue
@ -412,6 +570,8 @@ def collate_worker(processor_args, in_queue, out_queue):
break
try:
ret = p.process_one_file(*item)
except BadFileDownload, err:
except Exception, err:
item_string = '/'.join(item[1:])
p.logger.exception("Unable to process file '%s'" % (item_string))
ret = err
out_queue.put((item, ret))

View File

@ -150,7 +150,8 @@ class LogUploader(Daemon):
def upload_all_logs(self):
"""
Match files under log_dir to source_filename_pattern and upload to swift
Match files under log_dir to source_filename_pattern and upload to
swift
"""
pattern = self.validate_filename_pattern()
if not pattern:

View File

@ -15,13 +15,17 @@
# limitations under the License.
import unittest
import os
from os import kill
from signal import SIGTERM
from subprocess import Popen
from time import sleep
from uuid import uuid4
import eventlet
import sqlite3
from swift.common import client
from swift.common.utils import hash_path, readconf
from test.probe.common import get_to_final_state, kill_pids, reset_environment
@ -316,6 +320,61 @@ class TestContainerFailures(unittest.TestCase):
self.assert_(object2 in [o['name'] for o in
client.get_container(self.url, self.token, container)[1]])
def _get_db_file_path(self, obj_dir):
files = sorted(os.listdir(obj_dir), reverse=True)
for file in files:
if file.endswith('db'):
return os.path.join(obj_dir, file)
def _get_container_db_files(self, container):
opart, onodes = self.container_ring.get_nodes(self.account, container)
onode = onodes[0]
db_files = []
for onode in onodes:
node_id = (onode['port'] - 6000) / 10
device = onode['device']
hash_str = hash_path(self.account, container)
server_conf = readconf('/etc/swift/container-server/%s.conf' %
node_id)
devices = server_conf['app:container-server']['devices']
obj_dir = '%s/%s/containers/%s/%s/%s/' % (devices,
device, opart,
hash_str[-3:], hash_str)
db_files.append(self._get_db_file_path(obj_dir))
return db_files
def test_locked_container_dbs(self):
def run_test(num_locks, catch_503):
container = 'container-%s' % uuid4()
client.put_container(self.url, self.token, container)
db_files = self._get_container_db_files(container)
db_conns = []
for i in range(num_locks):
db_conn = sqlite3.connect(db_files[i])
db_conn.execute('begin exclusive transaction')
db_conns.append(db_conn)
if catch_503:
try:
client.delete_container(self.url, self.token, container)
except client.ClientException, e:
self.assertEquals(e.http_status, 503)
else:
client.delete_container(self.url, self.token, container)
pool = eventlet.GreenPool()
try:
with eventlet.Timeout(15):
p = pool.spawn(run_test, 1, False)
r = pool.spawn(run_test, 2, True)
q = pool.spawn(run_test, 3, True)
pool.waitall()
except eventlet.Timeout, e:
raise Exception(
"The server did not return a 503 on container db locks, "
"it just hangs: %s" % e)
if __name__ == '__main__':
unittest.main()

View File

@ -2354,8 +2354,7 @@ class TestAuth(unittest.TestCase):
"auth": "plaintext:key"})),
# GET of requested user object
('200 Ok', {}, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".admin"}],
{"groups": [{"name": "act:usr"}, {"name": "act"}],
"auth": "plaintext:key"}))]))
resp = Request.blank('/auth/v2/act/usr',
headers={'X-Auth-Admin-User': 'act:adm',
@ -2363,11 +2362,86 @@ class TestAuth(unittest.TestCase):
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.body, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".admin"}],
{"groups": [{"name": "act:usr"}, {"name": "act"}],
"auth": "plaintext:key"}))
self.assertEquals(self.test_auth.app.calls, 2)
def test_get_user_account_admin_fail_getting_account_admin(self):
self.test_auth.app = FakeApp(iter([
# GET of user object (account admin check)
('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
{"name": "test"}, {"name": ".admin"}],
"auth": "plaintext:key"})),
# GET of requested user object [who is an .admin as well]
('200 Ok', {}, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".admin"}],
"auth": "plaintext:key"})),
# GET of user object (reseller admin check [and fail here])
('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
{"name": "test"}, {"name": ".admin"}],
"auth": "plaintext:key"}))]))
resp = Request.blank('/auth/v2/act/usr',
headers={'X-Auth-Admin-User': 'act:adm',
'X-Auth-Admin-Key': 'key'}
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 403)
self.assertEquals(self.test_auth.app.calls, 3)
def test_get_user_account_admin_fail_getting_reseller_admin(self):
self.test_auth.app = FakeApp(iter([
# GET of user object (account admin check)
('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
{"name": "test"}, {"name": ".admin"}],
"auth": "plaintext:key"})),
# GET of requested user object [who is a .reseller_admin]
('200 Ok', {}, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".reseller_admin"}],
"auth": "plaintext:key"}))]))
resp = Request.blank('/auth/v2/act/usr',
headers={'X-Auth-Admin-User': 'act:adm',
'X-Auth-Admin-Key': 'key'}
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 403)
self.assertEquals(self.test_auth.app.calls, 2)
def test_get_user_reseller_admin_fail_getting_reseller_admin(self):
self.test_auth.app = FakeApp(iter([
# GET of user object (account admin check)
('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"},
{"name": "test"}, {"name": ".reseller_admin"}],
"auth": "plaintext:key"})),
# GET of requested user object [who also is a .reseller_admin]
('200 Ok', {}, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".reseller_admin"}],
"auth": "plaintext:key"}))]))
resp = Request.blank('/auth/v2/act/usr',
headers={'X-Auth-Admin-User': 'act:adm',
'X-Auth-Admin-Key': 'key'}
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 403)
self.assertEquals(self.test_auth.app.calls, 2)
def test_get_user_super_admin_succeed_getting_reseller_admin(self):
self.test_auth.app = FakeApp(iter([
# GET of requested user object
('200 Ok', {}, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".reseller_admin"}],
"auth": "plaintext:key"}))]))
resp = Request.blank('/auth/v2/act/usr',
headers={'X-Auth-Admin-User': '.super_admin',
'X-Auth-Admin-Key': 'supertest'}
).get_response(self.test_auth)
self.assertEquals(resp.status_int, 200)
self.assertEquals(resp.body, json.dumps(
{"groups": [{"name": "act:usr"}, {"name": "act"},
{"name": ".reseller_admin"}],
"auth": "plaintext:key"}))
self.assertEquals(self.test_auth.app.calls, 1)
def test_get_user_groups_not_found(self):
self.test_auth.app = FakeApp(iter([
# GET of account container (list objects)

View File

@ -207,16 +207,6 @@ class TestSwift3(unittest.TestCase):
code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue
self.assertEquals(code, 'InvalidArgument')
def test_bad_path(self):
req = Request.blank('/bucket/object/bad',
environ={'REQUEST_METHOD': 'GET'},
headers={'Authorization': 'AWS test:tester:hmac'})
resp = self.app(req.environ, start_response)
dom = xml.dom.minidom.parseString("".join(resp))
self.assertEquals(dom.firstChild.nodeName, 'Error')
code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue
self.assertEquals(code, 'InvalidURI')
def test_bad_method(self):
req = Request.blank('/',
environ={'REQUEST_METHOD': 'PUT'},
@ -594,5 +584,21 @@ class TestSwift3(unittest.TestCase):
self.assertEquals(swift3.canonical_string(req2),
swift3.canonical_string(req3))
def test_signed_urls(self):
class FakeApp(object):
def __call__(self, env, start_response):
self.req = Request(env)
start_response('200 OK')
start_response([])
app = FakeApp()
local_app = swift3.filter_factory({})(app)
req = Request.blank('/bucket/object?Signature=X&Expires=Y&'
'AWSAccessKeyId=Z', environ={'REQUEST_METHOD': 'GET'})
req.date = datetime.now()
req.content_type = 'text/plain'
resp = local_app(req.environ, lambda *args: None)
self.assertEquals(app.req.headers['Authorization'], 'AWS Z:X')
self.assertEquals(app.req.headers['Date'], 'Y')
if __name__ == '__main__':
unittest.main()

View File

@ -17,8 +17,10 @@ import unittest
from contextlib import contextmanager
import os
import logging
import errno
from swift.common import db_replicator
from swift.common import utils
from swift.common.utils import normalize_timestamp
from swift.container import server as container_server
@ -86,6 +88,8 @@ class ChangingMtimesOs:
class FakeBroker:
db_file = __file__
get_repl_missing_table = False
db_type = 'container'
def __init__(self, *args, **kwargs):
return None
@contextmanager
@ -104,6 +108,8 @@ class FakeBroker:
def merge_items(self, *args):
self.args = args
def get_replication_info(self):
if self.get_repl_missing_table:
raise Exception('no such table')
return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0}
def reclaim(self, item_timestamp, sync_timestamp):
pass
@ -202,6 +208,35 @@ class TestDBReplicator(unittest.TestCase):
replicator = TestReplicator({})
replicator._replicate_object('0', 'file', 'node_id')
def test_replicate_object_quarantine(self):
replicator = TestReplicator({})
was_db_file = replicator.brokerclass.db_file
try:
def mock_renamer(was, new, cause_colision=False):
if cause_colision and '-' not in new:
raise OSError(errno.EEXIST, "File already exists")
self.assertEquals('/a/b/c/d/e', was)
if '-' in new:
self.assert_(
new.startswith('/a/quarantined/containers/e-'))
else:
self.assertEquals('/a/quarantined/containers/e', new)
def mock_renamer_error(was, new):
return mock_renamer(was, new, cause_colision=True)
was_renamer = db_replicator.renamer
db_replicator.renamer = mock_renamer
db_replicator.lock_parent_directory = lock_parent_directory
replicator.brokerclass.get_repl_missing_table = True
replicator.brokerclass.db_file = '/a/b/c/d/e/hey'
replicator._replicate_object('0', 'file', 'node_id')
# try the double quarantine
db_replicator.renamer = mock_renamer_error
replicator._replicate_object('0', 'file', 'node_id')
finally:
replicator.brokerclass.db_file = was_db_file
db_replicator.renamer = was_renamer
# def test_dispatch(self):
# rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False)

View File

@ -343,6 +343,19 @@ class TestObjectController(unittest.TestCase):
"Content-Encoding" in resp.headers)
self.assertEquals(resp.headers['Content-Type'], 'application/x-test')
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'HEAD'})
resp = self.object_controller.HEAD(req)
self.assert_("X-Object-Meta-1" not in resp.headers and
"X-Object-Meta-Two" not in resp.headers and
"X-Object-Meta-3" in resp.headers and
"X-Object-Meta-4" in resp.headers and
"Foo" in resp.headers and
"Bar" in resp.headers and
"Baz" not in resp.headers and
"Content-Encoding" in resp.headers)
self.assertEquals(resp.headers['Content-Type'], 'application/x-test')
timestamp = normalize_timestamp(time())
req = Request.blank('/sda1/p/a/c/o',
environ={'REQUEST_METHOD': 'POST'},

View File

@ -20,14 +20,17 @@ import unittest
from gzip import GzipFile
from shutil import rmtree
from time import time
from distutils.dir_util import mkpath
from eventlet import spawn, TimeoutError, listen
from eventlet.timeout import Timeout
from swift.obj import updater as object_updater, server as object_server
from swift.obj.server import ASYNCDIR
from swift.common.ring import RingData
from swift.common import utils
from swift.common.utils import hash_path, normalize_timestamp, mkdirs
from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \
write_pickle
class TestObjectUpdater(unittest.TestCase):
@ -48,7 +51,7 @@ class TestObjectUpdater(unittest.TestCase):
os.mkdir(self.devices_dir)
self.sda1 = os.path.join(self.devices_dir, 'sda1')
os.mkdir(self.sda1)
os.mkdir(os.path.join(self.sda1,'tmp'))
os.mkdir(os.path.join(self.sda1, 'tmp'))
def tearDown(self):
rmtree(self.testdir, ignore_errors=1)
@ -70,6 +73,45 @@ class TestObjectUpdater(unittest.TestCase):
self.assertEquals(cu.node_timeout, 5)
self.assert_(cu.get_container_ring() is not None)
def test_object_sweep(self):
prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc')
mkpath(prefix_dir)
objects = {
'a': [1089.3, 18.37, 12.83, 1.3],
'b': [49.4, 49.3, 49.2, 49.1],
'c': [109984.123],
}
expected = set()
for o, timestamps in objects.iteritems():
ohash = hash_path('account', 'container', o)
for t in timestamps:
o_path = os.path.join(prefix_dir, ohash + '-' +
normalize_timestamp(t))
if t == timestamps[0]:
expected.add(o_path)
write_pickle({}, o_path)
seen = set()
class MockObjectUpdater(object_updater.ObjectUpdater):
def process_object_update(self, update_path, device):
seen.add(update_path)
os.unlink(update_path)
cu = MockObjectUpdater({
'devices': self.devices_dir,
'mount_check': 'false',
'swift_dir': self.testdir,
'interval': '1',
'concurrency': '1',
'node_timeout': '5',
})
cu.object_sweep(self.sda1)
self.assert_(not os.path.exists(prefix_dir))
self.assertEqual(expected, seen)
def test_run_once(self):
cu = object_updater.ObjectUpdater({
'devices': self.devices_dir,
@ -103,6 +145,7 @@ class TestObjectUpdater(unittest.TestCase):
self.assert_(os.path.exists(op_path))
bindsock = listen(('127.0.0.1', 0))
def accepter(sock, return_code):
try:
with Timeout(3):
@ -123,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase):
except BaseException, err:
return err
return None
def accept(return_codes):
codes = iter(return_codes)
try:
@ -139,7 +183,7 @@ class TestObjectUpdater(unittest.TestCase):
except BaseException, err:
return err
return None
event = spawn(accept, [201,500])
event = spawn(accept, [201, 500])
for dev in cu.get_container_ring().devs:
if dev is not None:
dev['port'] = bindsock.getsockname()[1]

View File

@ -161,8 +161,10 @@ def fake_http_connect(*code_iter, **kwargs):
self.body = body
def getresponse(self):
if 'raise_exc' in kwargs:
if kwargs.get('raise_exc'):
raise Exception('test')
if kwargs.get('raise_timeout_exc'):
raise TimeoutError()
return self
def getexpect(self):
@ -341,6 +343,14 @@ class TestController(unittest.TestCase):
self.assertEqual(p, partition)
self.assertEqual(n, nodes)
def test_make_requests(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200)
partition, nodes = self.controller.account_info(self.account)
proxy_server.http_connect = fake_http_connect(201,
raise_timeout_exc=True)
self.controller._make_request(nodes, partition, 'POST','/','','')
# tests if 200 is cached and used
def test_account_info_200(self):
with save_globals():
@ -966,6 +976,9 @@ class TestObjectController(unittest.TestCase):
if expected < 400:
self.assert_('x-works' in res.headers)
self.assertEquals(res.headers['x-works'], 'yes')
self.assert_('accept-ranges' in res.headers)
self.assertEquals(res.headers['accept-ranges'], 'bytes')
test_status_map((200, 404, 404), 200)
test_status_map((200, 500, 404), 200)
test_status_map((304, 500, 404), 304)
@ -1237,7 +1250,7 @@ class TestObjectController(unittest.TestCase):
resp = controller.best_response(req, [200] * 3, ['OK'] * 3, [''] * 3,
'Object', etag='68b329da9893e34099c7d8ad5cb9c940')
self.assertEquals(resp.etag, '68b329da9893e34099c7d8ad5cb9c940')
def test_proxy_passes_content_type(self):
with save_globals():
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'})
@ -1893,8 +1906,8 @@ class TestObjectController(unittest.TestCase):
_test_sockets
orig_update_request = prosrv.update_request
def broken_update_request(env, req):
raise Exception('fake')
def broken_update_request(*args, **kwargs):
raise Exception('fake: this should be printed')
prosrv.update_request = broken_update_request
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
@ -1925,6 +1938,35 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(headers[:len(exp)], exp)
self.assert_('\r\nContent-Length: 0\r\n' in headers)
def test_client_ip_logging(self):
# test that the client ip field in the log gets populated with the
# ip instead of being blank
(prosrv, acc1srv, acc2srv, con2srv, con2srv, obj1srv, obj2srv) = \
_test_servers
(prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \
_test_sockets
class Logger(object):
def info(self, msg):
self.msg = msg
orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger
prosrv.logger = prosrv.access_logger = Logger()
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write(
'GET /v1/a?format=json HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n'
'\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 200'
self.assertEquals(headers[:len(exp)], exp)
exp = '127.0.0.1 127.0.0.1'
self.assert_(exp in prosrv.logger.msg)
def test_chunked_put_logging(self):
# GET account with a query string to test that
# Application.log_request logs the query string. Also, throws
@ -2448,7 +2490,29 @@ class TestObjectController(unittest.TestCase):
self.assert_(res.client_disconnect)
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_response_get_accept_ranges_header(self):
with save_globals():
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'})
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
proxy_server.http_connect = fake_http_connect(200, 200, 200)
resp = controller.GET(req)
self.assert_('accept-ranges' in resp.headers)
self.assertEquals(resp.headers['accept-ranges'], 'bytes')
def test_response_head_accept_ranges_header(self):
with save_globals():
req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'HEAD'})
self.app.update_request(req)
controller = proxy_server.ObjectController(self.app, 'account',
'container', 'object')
proxy_server.http_connect = fake_http_connect(200, 200, 200)
resp = controller.HEAD(req)
self.assert_('accept-ranges' in resp.headers)
self.assertEquals(resp.headers['accept-ranges'], 'bytes')
def test_GET_calls_authorize(self):
called = [False]
@ -2788,6 +2852,28 @@ class TestContainerController(unittest.TestCase):
finally:
self.app.object_chunk_size = orig_object_chunk_size
def test_response_get_accept_ranges_header(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c?format=json')
self.app.update_request(req)
res = controller.GET(req)
self.assert_('accept-ranges' in res.headers)
self.assertEqual(res.headers['accept-ranges'], 'bytes')
def test_response_head_accept_ranges_header(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
controller = proxy_server.ContainerController(self.app, 'account',
'container')
req = Request.blank('/a/c?format=json')
self.app.update_request(req)
res = controller.HEAD(req)
self.assert_('accept-ranges' in res.headers)
self.assertEqual(res.headers['accept-ranges'], 'bytes')
def test_PUT_metadata(self):
self.metadata_helper('PUT')
@ -3093,7 +3179,28 @@ class TestAccountController(unittest.TestCase):
res.body
self.assert_(hasattr(res, 'bytes_transferred'))
self.assertEquals(res.bytes_transferred, 2)
def test_response_get_accept_ranges_header(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/a?format=json')
self.app.update_request(req)
res = controller.GET(req)
self.assert_('accept-ranges' in res.headers)
self.assertEqual(res.headers['accept-ranges'], 'bytes')
def test_response_head_accept_ranges_header(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200, 200, body='{}')
controller = proxy_server.AccountController(self.app, 'account')
req = Request.blank('/a?format=json')
self.app.update_request(req)
res = controller.HEAD(req)
res.body
self.assert_('accept-ranges' in res.headers)
self.assertEqual(res.headers['accept-ranges'], 'bytes')
def test_response_client_disconnect_attr(self):
with save_globals():
proxy_server.http_connect = fake_http_connect(200, 200, body='{}')

View File

@ -16,6 +16,10 @@
import unittest
from test.unit import tmpfile
import Queue
import datetime
import hashlib
import pickle
import time
from swift.common import internal_proxy
from swift.stats import log_processor
@ -26,7 +30,6 @@ class FakeUploadApp(object):
def __init__(self, *args, **kwargs):
pass
class DumbLogger(object):
def __getattr__(self, n):
return self.foo
@ -77,7 +80,7 @@ class DumbInternalProxy(object):
return self.code, data()
class TestLogProcessor(unittest.TestCase):
access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\
'09/Jul/2010/04/14/30 GET '\
'/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\
@ -85,7 +88,7 @@ class TestLogProcessor(unittest.TestCase):
'6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262'
stats_test_line = 'account,1,2,3'
proxy_config = {'log-processor': {
}
}
@ -339,7 +342,7 @@ use = egg:swift#proxy
def test_collate_worker_error(self):
def get_object_data(*a,**kw):
raise log_processor.BadFileDownload()
raise Exception()
orig_get_object_data = log_processor.LogProcessor.get_object_data
try:
log_processor.LogProcessor.get_object_data = get_object_data
@ -361,8 +364,7 @@ use = egg:swift#proxy
self.assertEquals(item, work_request)
# these only work for Py2.7+
#self.assertIsInstance(ret, log_processor.BadFileDownload)
self.assertTrue(isinstance(ret, log_processor.BadFileDownload),
type(ret))
self.assertTrue(isinstance(ret, Exception))
finally:
log_processor.LogProcessor.get_object_data = orig_get_object_data
@ -426,3 +428,407 @@ use = egg:swift#proxy
finally:
log_processor.LogProcessor._internal_proxy = None
log_processor.LogProcessor.get_object_data = orig_get_object_data
class TestLogProcessorDaemon(unittest.TestCase):
def test_get_lookback_interval(self):
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, lookback_hours, lookback_window):
self.lookback_hours = lookback_hours
self.lookback_window = lookback_window
try:
d = datetime.datetime
for x in [
[d(2011, 1, 1), 0, 0, None, None],
[d(2011, 1, 1), 120, 0, '2010122700', None],
[d(2011, 1, 1), 120, 24, '2010122700', '2010122800'],
[d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'],
[d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'],
[d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'],
]:
log_processor.now = lambda: x[0]
d = MockLogProcessorDaemon(x[1], x[2])
self.assertEquals((x[3], x[4]), d.get_lookback_interval())
finally:
log_processor.now = datetime.datetime.now
def test_get_processed_files_list(self):
class MockLogProcessor():
def __init__(self, stream):
self.stream = stream
def get_object_data(self, *args, **kwargs):
return self.stream
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, stream):
self.log_processor = MockLogProcessor(stream)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
file_list = set(['a', 'b', 'c'])
for s, l in [['', None],
[pickle.dumps(set()).split('\n'), set()],
[pickle.dumps(file_list).split('\n'), file_list],
]:
self.assertEquals(l,
MockLogProcessorDaemon(s).get_processed_files_list())
def test_get_processed_files_list_bad_file_downloads(self):
class MockLogProcessor():
def __init__(self, status_code):
self.err = log_processor.BadFileDownload(status_code)
def get_object_data(self, *a, **k):
raise self.err
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, status_code):
self.log_processor = MockLogProcessor(status_code)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
for c, l in [[404, set()], [503, None], [None, None]]:
self.assertEquals(l,
MockLogProcessorDaemon(c).get_processed_files_list())
def test_get_aggregate_data(self):
# when run "for real"
# the various keys/values in the input and output
# dictionaries are often not simple strings
# for testing we can use keys that are easier to work with
processed_files = set()
data_in = [
['file1', {
'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
],
['file2', {'acct1_time1': {'field1': 10}}],
]
expected_data_out = {
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
pass
d = MockLogProcessorDaemon()
data_out = d.get_aggregate_data(processed_files, data_in)
for k, v in expected_data_out.items():
self.assertEquals(v, data_out[k])
self.assertEquals(set(['file1', 'file2']), processed_files)
def test_get_final_info(self):
# when run "for real"
# the various keys/values in the input and output
# dictionaries are often not simple strings
# for testing we can use keys/values that are easier to work with
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self._keylist_mapping = {
'out_field1':['field1', 'field2', 'field3'],
'out_field2':['field2', 'field3'],
'out_field3':['field3'],
'out_field4':'field4',
'out_field5':['field6', 'field7', 'field8'],
'out_field6':['field6'],
'out_field7':'field7',
}
data_in = {
'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3,
'field4': 8, 'field5': 11},
'acct1_time2': {'field1': 4, 'field2': 5},
'acct2_time1': {'field1': 6, 'field2': 7},
'acct3_time3': {'field1': 8, 'field2': 9},
}
expected_data_out = {
'acct1_time1': {'out_field1': 16, 'out_field2': 5,
'out_field3': 3, 'out_field4': 8, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct1_time2': {'out_field1': 9, 'out_field2': 5,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct2_time1': {'out_field1': 13, 'out_field2': 7,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
'acct3_time3': {'out_field1': 17, 'out_field2': 9,
'out_field3': 0, 'out_field4': 0, 'out_field5': 0,
'out_field6': 0, 'out_field7': 0,},
}
self.assertEquals(expected_data_out,
MockLogProcessorDaemon().get_final_info(data_in))
def test_store_processed_files_list(self):
class MockInternalProxy:
def __init__(self, test, daemon, processed_files):
self.test = test
self.daemon = daemon
self.processed_files = processed_files
def upload_file(self, f, account, container, filename):
self.test.assertEquals(self.processed_files,
pickle.loads(f.getvalue()))
self.test.assertEquals(self.daemon.log_processor_account,
account)
self.test.assertEquals(self.daemon.log_processor_container,
container)
self.test.assertEquals(self.daemon.processed_files_filename,
filename)
class MockLogProcessor:
def __init__(self, test, daemon, processed_files):
self.internal_proxy = MockInternalProxy(test, daemon,
processed_files)
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test, processed_files):
self.log_processor = \
MockLogProcessor(test, self, processed_files)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
processed_files = set(['a', 'b', 'c'])
MockLogProcessorDaemon(self, processed_files).\
store_processed_files_list(processed_files)
def test_get_output(self):
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self._keylist_mapping = {'a':None, 'b':None, 'c':None}
data_in = {
('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3},
('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30},
('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12},
('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25},
}
expected_data_out = [
['data_ts', 'account', 'a', 'b', 'c'],
['2010/01/01 00:00:00', 'acct1', '1', '2', '3'],
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
]
data_out = MockLogProcessorDaemon().get_output(data_in)
self.assertEquals(expected_data_out[0], data_out[0])
for row in data_out[1:]:
self.assert_(row in expected_data_out)
for row in expected_data_out[1:]:
self.assert_(row in data_out)
def test_store_output(self):
try:
real_strftime = time.strftime
mock_strftime_return = '2010/03/02/01/'
def mock_strftime(format):
self.assertEquals('%Y/%m/%d/%H/', format)
return mock_strftime_return
log_processor.time.strftime = mock_strftime
data_in = [
['data_ts', 'account', 'a', 'b', 'c'],
['2010/10/10 10:00:00', 'acct1', '1', '2', '3'],
['2010/10/10 10:00:00', 'acct1', '10', '20', '30'],
['2008/03/06 09:00:00', 'acct2', '8', '9', '12'],
['2005/04/08 16:00:00', 'acct3', '1', '5', '25'],
]
expected_output = '\n'.join([','.join(row) for row in data_in])
h = hashlib.md5(expected_output).hexdigest()
expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h)
class MockInternalProxy:
def __init__(self, test, daemon, expected_filename,
expected_output):
self.test = test
self.daemon = daemon
self.expected_filename = expected_filename
self.expected_output = expected_output
def upload_file(self, f, account, container, filename):
self.test.assertEquals(self.daemon.log_processor_account,
account)
self.test.assertEquals(self.daemon.log_processor_container,
container)
self.test.assertEquals(self.expected_filename, filename)
self.test.assertEquals(self.expected_output, f.getvalue())
class MockLogProcessor:
def __init__(self, test, daemon, expected_filename,
expected_output):
self.internal_proxy = MockInternalProxy(test, daemon,
expected_filename, expected_output)
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test, expected_filename, expected_output):
self.log_processor = MockLogProcessor(test, self,
expected_filename, expected_output)
self.log_processor_account = 'account'
self.log_processor_container = 'container'
self.processed_files_filename = 'filename'
MockLogProcessorDaemon(self, expected_filename, expected_output).\
store_output(data_in)
finally:
log_processor.time.strftime = real_strftime
def test_keylist_mapping(self):
# Kind of lame test to see if the propery is both
# generated by a particular method and cached properly.
# The method that actually generates the mapping is
# tested elsewhere.
value_return = 'keylist_mapping'
class MockLogProcessor:
def __init__(self):
self.call_count = 0
def generate_keylist_mapping(self):
self.call_count += 1
return value_return
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self.log_processor = MockLogProcessor()
self._keylist_mapping = None
d = MockLogProcessorDaemon()
self.assertEquals(value_return, d.keylist_mapping)
self.assertEquals(value_return, d.keylist_mapping)
self.assertEquals(1, d.log_processor.call_count)
def test_process_logs(self):
try:
mock_logs_to_process = 'logs_to_process'
mock_processed_files = 'processed_files'
real_multiprocess_collate = log_processor.multiprocess_collate
multiprocess_collate_return = 'multiprocess_collate_return'
get_aggregate_data_return = 'get_aggregate_data_return'
get_final_info_return = 'get_final_info_return'
get_output_return = 'get_output_return'
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test):
self.test = test
self.total_conf = 'total_conf'
self.logger = 'logger'
self.worker_count = 'worker_count'
def get_aggregate_data(self, processed_files, results):
self.test.assertEquals(mock_processed_files, processed_files)
self.test.assertEquals(multiprocess_collate_return, results)
return get_aggregate_data_return
def get_final_info(self, aggr_data):
self.test.assertEquals(get_aggregate_data_return, aggr_data)
return get_final_info_return
def get_output(self, final_info):
self.test.assertEquals(get_final_info_return, final_info)
return get_output_return
d = MockLogProcessorDaemon(self)
def mock_multiprocess_collate(processor_args, logs_to_process,
worker_count):
self.assertEquals(d.total_conf, processor_args[0])
self.assertEquals(d.logger, processor_args[1])
self.assertEquals(mock_logs_to_process, logs_to_process)
self.assertEquals(d.worker_count, worker_count)
return multiprocess_collate_return
log_processor.multiprocess_collate = mock_multiprocess_collate
output = d.process_logs(mock_logs_to_process, mock_processed_files)
self.assertEquals(get_output_return, output)
finally:
log_processor.multiprocess_collate = real_multiprocess_collate
def test_run_once_get_processed_files_list_returns_none(self):
class MockLogProcessor:
def get_data_list(self, lookback_start, lookback_end,
processed_files):
raise unittest.TestCase.failureException, \
'Method should not be called'
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self):
self.logger = DumbLogger()
self.log_processor = MockLogProcessor()
def get_lookback_interval(self):
return None, None
def get_processed_files_list(self):
return None
MockLogProcessorDaemon().run_once()
def test_run_once_no_logs_to_process(self):
class MockLogProcessor():
def __init__(self, daemon, test):
self.daemon = daemon
self.test = test
def get_data_list(self, lookback_start, lookback_end,
processed_files):
self.test.assertEquals(self.daemon.lookback_start,
lookback_start)
self.test.assertEquals(self.daemon.lookback_end,
lookback_end)
self.test.assertEquals(self.daemon.processed_files,
processed_files)
return []
class MockLogProcessorDaemon(log_processor.LogProcessorDaemon):
def __init__(self, test):
self.logger = DumbLogger()
self.log_processor = MockLogProcessor(self, test)
self.lookback_start = 'lookback_start'
self.lookback_end = 'lookback_end'
self.processed_files = ['a', 'b', 'c']
def get_lookback_interval(self):
return self.lookback_start, self.lookback_end
def get_processed_files_list(self):
return self.processed_files
def process_logs(logs_to_process, processed_files):
raise unittest.TestCase.failureException, \
'Method should not be called'
MockLogProcessorDaemon(self).run_once()