diff --git a/AUTHORS b/AUTHORS index bf834db788..f6287945c8 100644 --- a/AUTHORS +++ b/AUTHORS @@ -27,6 +27,7 @@ Stephen Milton Russ Nelson Colin Nicholson Andrew Clay Shafer +Scott Simpson Monty Taylor Caleb Tennis FUJITA Tomonori diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000000..97fe05d552 --- /dev/null +++ b/CHANGELOG @@ -0,0 +1,4 @@ +swift (x.x.x) + + * Renamed swift-stats-populate to swift-dispersion-populate and + swift-stats-report to swift-dispersion-report. diff --git a/bin/swauth-add-account b/bin/swauth-add-account index 2b91b6292d..b8591c3425 100755 --- a/bin/swauth-add-account +++ b/bin/swauth-add-account @@ -65,4 +65,4 @@ if __name__ == '__main__': ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'Account creation failed: %s %s' % (resp.status, resp.reason) + exit('Account creation failed: %s %s' % (resp.status, resp.reason)) diff --git a/bin/swauth-add-user b/bin/swauth-add-user index 23144df41b..7b3dc129d3 100755 --- a/bin/swauth-add-user +++ b/bin/swauth-add-user @@ -90,4 +90,4 @@ if __name__ == '__main__': ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'User creation failed: %s %s' % (resp.status, resp.reason) + exit('User creation failed: %s %s' % (resp.status, resp.reason)) diff --git a/bin/swauth-cleanup-tokens b/bin/swauth-cleanup-tokens index 3ca86cd990..3b09072f40 100755 --- a/bin/swauth-cleanup-tokens +++ b/bin/swauth-cleanup-tokens @@ -25,7 +25,7 @@ from optparse import OptionParser from sys import argv, exit from time import sleep, time -from swift.common.client import Connection +from swift.common.client import Connection, ClientException if __name__ == '__main__': @@ -65,7 +65,15 @@ if __name__ == '__main__': while True: if options.verbose: print 'GET %s?marker=%s' % (container, marker) - objs = conn.get_container(container, marker=marker)[1] + try: + objs = conn.get_container(container, marker=marker)[1] + except ClientException, e: + if e.http_status == 404: + exit('Container %s not found. swauth-prep needs to be ' + 'rerun' % (container)) + else: + exit('Object listing on container %s failed with status ' + 'code %d' % (container, e.http_status)) if objs: marker = objs[-1]['name'] else: @@ -90,7 +98,13 @@ if __name__ == '__main__': (container, obj['name'], time() - detail['expires']) print 'DELETE %s/%s' % (container, obj['name']) - conn.delete_object(container, obj['name']) + try: + conn.delete_object(container, obj['name']) + except ClientException, e: + if e.http_status != 404: + print 'DELETE of %s/%s failed with status ' \ + 'code %d' % (container, obj['name'], + e.http_status) elif options.verbose: print "%s/%s won't expire for %ds; skipping" % \ (container, obj['name'], diff --git a/bin/swauth-delete-account b/bin/swauth-delete-account index 66bdf2bbe1..45aba4c502 100755 --- a/bin/swauth-delete-account +++ b/bin/swauth-delete-account @@ -57,4 +57,4 @@ if __name__ == '__main__': ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'Account deletion failed: %s %s' % (resp.status, resp.reason) + exit('Account deletion failed: %s %s' % (resp.status, resp.reason)) diff --git a/bin/swauth-delete-user b/bin/swauth-delete-user index de3ac3b12b..95025bc195 100755 --- a/bin/swauth-delete-user +++ b/bin/swauth-delete-user @@ -57,4 +57,4 @@ if __name__ == '__main__': ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'User deletion failed: %s %s' % (resp.status, resp.reason) + exit('User deletion failed: %s %s' % (resp.status, resp.reason)) diff --git a/bin/swauth-list b/bin/swauth-list index 3f9ae5ea49..bbf5bfe9f1 100755 --- a/bin/swauth-list +++ b/bin/swauth-list @@ -75,9 +75,9 @@ If the [user] is '.groups', the active groups for the account will be listed. conn = http_connect(parsed.hostname, parsed.port, 'GET', path, headers, ssl=(parsed.scheme == 'https')) resp = conn.getresponse() - if resp.status // 100 != 2: - print 'List failed: %s %s' % (resp.status, resp.reason) body = resp.read() + if resp.status // 100 != 2: + exit('List failed: %s %s' % (resp.status, resp.reason)) if options.plain_text: info = json.loads(body) for group in info[['accounts', 'users', 'groups'][len(args)]]: diff --git a/bin/swauth-prep b/bin/swauth-prep index a7b912e60c..456cf3e4c8 100755 --- a/bin/swauth-prep +++ b/bin/swauth-prep @@ -56,4 +56,4 @@ if __name__ == '__main__': ssl=(parsed.scheme == 'https')) resp = conn.getresponse() if resp.status // 100 != 2: - print 'Auth subsystem prep failed: %s %s' % (resp.status, resp.reason) + exit('Auth subsystem prep failed: %s %s' % (resp.status, resp.reason)) diff --git a/bin/swauth-set-account-service b/bin/swauth-set-account-service index 0317546df5..acdba77962 100755 --- a/bin/swauth-set-account-service +++ b/bin/swauth-set-account-service @@ -70,4 +70,4 @@ Example: %prog -K swauthkey test storage local http://127.0.0.1:8080/v1/AUTH_018 conn.send(body) resp = conn.getresponse() if resp.status // 100 != 2: - print 'Service set failed: %s %s' % (resp.status, resp.reason) + exit('Service set failed: %s %s' % (resp.status, resp.reason)) diff --git a/doc/source/_theme/layout.html b/doc/source/_theme/layout.html index f0573a3b43..75d970baa8 100644 --- a/doc/source/_theme/layout.html +++ b/doc/source/_theme/layout.html @@ -58,7 +58,7 @@ </div> <script type="text/javascript">$('#searchbox').show(0);</script> <p class="triangle-border right"> - Psst... hey. Did you know you can read <a href="http://swift.openstack.org/1.2">Swift 1.2 docs</a> or <a href="http://swift.openstack.org/1.1">Swift 1.1 docs</a> also? + Psst... hey. Did you know you can read <a href="http://swift.openstack.org/1.3">Swift 1.3 docs</a> or <a href="http://swift.openstack.org/1.2">Swift 1.2 docs</a> also? </p> {%- endif %} diff --git a/doc/source/debian_package_guide.rst b/doc/source/debian_package_guide.rst index e8086adc16..eef9180a90 100644 --- a/doc/source/debian_package_guide.rst +++ b/doc/source/debian_package_guide.rst @@ -58,7 +58,7 @@ Instructions for Building Debian Packages for Swift apt-get install python-software-properties add-apt-repository ppa:swift-core/ppa apt-get update - apt-get install curl gcc bzr python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr python-webob python-eventlet python-greenlet debhelper python-sphinx python-all python-openssl python-pastedeploy bzr-builddeb + apt-get install curl gcc bzr python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr python-webob python-eventlet python-greenlet debhelper python-sphinx python-all python-openssl python-pastedeploy python-netifaces bzr-builddeb * As you diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 9d0bce0a19..ce2a09e270 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -11,8 +11,8 @@ virtual machine will emulate running a four node Swift cluster. * Get the *Ubuntu 10.04 LTS (Lucid Lynx)* server image: - - Ubuntu Server ISO: http://releases.ubuntu.com/10.04/ubuntu-10.04.1-server-amd64.iso (682 MB) - - Ubuntu Live/Install: http://cdimage.ubuntu.com/releases/10.04/release/ubuntu-10.04-dvd-amd64.iso (4.1 GB) + - Ubuntu Server ISO: http://releases.ubuntu.com/lucid/ubuntu-10.04.2-server-amd64.iso (717 MB) + - Ubuntu Live/Install: http://cdimage.ubuntu.com/releases/lucid/release/ubuntu-10.04.2-dvd-amd64.iso (4.2 GB) - Ubuntu Mirrors: https://launchpad.net/ubuntu/+cdmirrors * Create guest virtual machine from the Ubuntu image. @@ -70,6 +70,7 @@ Using a loopback device for storage If you want to use a loopback device instead of another partition, follow these instructions. + #. `mkdir /srv` #. `dd if=/dev/zero of=/srv/swift-disk bs=1024 count=0 seek=1000000` (modify seek to make a larger or smaller partition) #. `mkfs.xfs -i size=1024 /srv/swift-disk` @@ -79,7 +80,6 @@ If you want to use a loopback device instead of another partition, follow these #. `mount /mnt/sdb1` #. `mkdir /mnt/sdb1/1 /mnt/sdb1/2 /mnt/sdb1/3 /mnt/sdb1/4` #. `chown <your-user-name>:<your-group-name> /mnt/sdb1/*` - #. `mkdir /srv` #. `for x in {1..4}; do ln -s /mnt/sdb1/$x /srv/$x; done` #. `mkdir -p /etc/swift/object-server /etc/swift/container-server /etc/swift/account-server /srv/1/node/sdb1 /srv/2/node/sdb2 /srv/3/node/sdb3 /srv/4/node/sdb4 /var/run/swift` #. `chown -R <your-user-name>:<your-group-name> /etc/swift /srv/[1-4]/ /var/run/swift` -- **Make sure to include the trailing slash after /srv/[1-4]/** @@ -555,7 +555,9 @@ Sample configuration files are provided with all defaults in line-by-line commen Setting up scripts for running Swift ------------------------------------ - #. Create `~/bin/resetswift.` If you are using a loopback device substitute `/dev/sdb1` with `/srv/swift-disk`:: + #. Create `~/bin/resetswift.` + If you are using a loopback device substitute `/dev/sdb1` with `/srv/swift-disk`. + If you did not set up rsyslog for individual logging, remove the `find /var/log/swift...` line:: #!/bin/bash diff --git a/swift/__init__.py b/swift/__init__.py index 25a1c6b8c7..441e13d5e8 100644 --- a/swift/__init__.py +++ b/swift/__init__.py @@ -1,5 +1,5 @@ import gettext -__version__ = '1.3-dev' +__version__ = '1.4-dev' gettext.install('swift') diff --git a/swift/common/bench.py b/swift/common/bench.py index 482c2d77aa..c500493187 100644 --- a/swift/common/bench.py +++ b/swift/common/bench.py @@ -147,6 +147,16 @@ class BenchDELETE(Bench): self.total = len(names) self.msg = 'DEL' + def run(self): + Bench.run(self) + for container in self.containers: + try: + client.delete_container(self.url, self.token, container) + except client.ClientException, e: + if e.http_status != 409: + self._log_status("Unable to delete container '%s'. " \ + "Got http status '%d'." % (container, e.http_status)) + def _run(self, thread): if time.time() - self.heartbeat >= 15: self.heartbeat = time.time() diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 25bb8c810d..838cfc21df 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -20,6 +20,8 @@ import random import math import time import shutil +import uuid +import errno from eventlet import GreenPool, sleep, Timeout, TimeoutError from eventlet.green import subprocess @@ -49,7 +51,13 @@ def quarantine_db(object_file, server_type): quarantine_dir = os.path.abspath(os.path.join(object_dir, '..', '..', '..', '..', 'quarantined', server_type + 's', os.path.basename(object_dir))) - renamer(object_dir, quarantine_dir) + try: + renamer(object_dir, quarantine_dir) + except OSError, e: + if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): + raise + quarantine_dir = "%s-%s" % (quarantine_dir, uuid.uuid4().hex) + renamer(object_dir, quarantine_dir) class ReplConnection(BufferedHTTPConnection): diff --git a/swift/common/middleware/swauth.py b/swift/common/middleware/swauth.py index 76ad4495f4..328799a19a 100644 --- a/swift/common/middleware/swauth.py +++ b/swift/common/middleware/swauth.py @@ -468,7 +468,7 @@ class Swauth(object): {"account_id": "AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162", "services": {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162"}}, + "local": "http://127.0.0.1:8080/v1/AUTH_018c3946"}}, "users": [{"name": "tester"}, {"name": "tester3"}]} :param req: The webob.Request to process. @@ -522,7 +522,7 @@ class Swauth(object): this:: "services": {"storage": {"default": "local", - "local": "http://127.0.0.1:8080/v1/AUTH_018c3946-23f8-4efb-a8fb-b67aae8e4162"}} + "local": "http://127.0.0.1:8080/v1/AUTH_018c3946"}} Making use of this section is described in :func:`handle_get_token`. @@ -849,6 +849,12 @@ class Swauth(object): raise Exception('Could not retrieve user object: %s %s' % (path, resp.status)) body = resp.body + display_groups = [g['name'] for g in json.loads(body)['groups']] + if ('.admin' in display_groups and + not self.is_reseller_admin(req)) or \ + ('.reseller_admin' in display_groups and + not self.is_super_admin(req)): + return HTTPForbidden(request=req) return Response(body=body) def handle_put_user(self, req): diff --git a/swift/common/middleware/swift3.py b/swift/common/middleware/swift3.py index a41c8cb695..df1ca3f755 100644 --- a/swift/common/middleware/swift3.py +++ b/swift/common/middleware/swift3.py @@ -16,9 +16,6 @@ """ The swift3 middleware will emulate the S3 REST api on top of swift. -The boto python library is necessary to use this middleware (install -the python-boto package if you use Ubuntu). - The following opperations are currently supported: * GET Service @@ -239,7 +236,7 @@ class BucketController(Controller): if 'acl' in args: return get_acl(self.account_name) - + objects = loads(''.join(list(body_iter))) body = ('<?xml version="1.0" encoding="UTF-8"?>' '<ListBucketResult ' @@ -429,7 +426,7 @@ class Swift3Middleware(object): self.app = app def get_controller(self, path): - container, obj = split_path(path, 0, 2) + container, obj = split_path(path, 0, 2, True) d = dict(container_name=container, object_name=obj) if container and obj: @@ -438,32 +435,35 @@ class Swift3Middleware(object): return BucketController, d return ServiceController, d - def get_account_info(self, env, req): - try: - account, user, _junk = \ - req.headers['Authorization'].split(' ')[-1].split(':') - except Exception: - return None, None - - h = canonical_string(req) - token = base64.urlsafe_b64encode(h) - return '%s:%s' % (account, user), token - def __call__(self, env, start_response): req = Request(env) - if not'Authorization' in req.headers: + + if 'AWSAccessKeyId' in req.GET: + try: + req.headers['Date'] = req.GET['Expires'] + req.headers['Authorization'] = \ + 'AWS %(AWSAccessKeyId)s:%(Signature)s' % req.GET + except KeyError: + return get_err_response('InvalidArgument')(env, start_response) + + if not 'Authorization' in req.headers: return self.app(env, start_response) + + try: + account, signature = \ + req.headers['Authorization'].split(' ')[-1].rsplit(':', 1) + except Exception: + return get_err_response('InvalidArgument')(env, start_response) + try: controller, path_parts = self.get_controller(req.path) except ValueError: return get_err_response('InvalidURI')(env, start_response) - account_name, token = self.get_account_info(env, req) - if not account_name: - return get_err_response('InvalidArgument')(env, start_response) + token = base64.urlsafe_b64encode(canonical_string(req)) + + controller = controller(env, self.app, account, token, **path_parts) - controller = controller(env, self.app, account_name, token, - **path_parts) if hasattr(controller, req.method): res = getattr(controller, req.method)(env, start_response) else: diff --git a/swift/common/utils.py b/swift/common/utils.py index 2c8bd1d622..4ee57db8f7 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -776,7 +776,7 @@ def readconf(conf, section_name=None, log_name=None, defaults=None): return conf -def write_pickle(obj, dest, tmp): +def write_pickle(obj, dest, tmp=None, pickle_protocol=0): """ Ensure that a pickle file gets written to disk. The file is first written to a tmp location, ensure it is synced to disk, then @@ -784,11 +784,14 @@ def write_pickle(obj, dest, tmp): :param obj: python object to be pickled :param dest: path of final destination file - :param tmp: path to tmp to use + :param tmp: path to tmp to use, defaults to None + :param pickle_protocol: protocol to pickle the obj with, defaults to 0 """ - fd, tmppath = mkstemp(dir=tmp) + if tmp == None: + tmp = os.path.dirname(dest) + fd, tmppath = mkstemp(dir=tmp, suffix='.tmp') with os.fdopen(fd, 'wb') as fo: - pickle.dump(obj, fo) + pickle.dump(obj, fo, pickle_protocol) fo.flush() os.fsync(fd) renamer(tmppath, dest) diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 5f4494b736..275a94cf63 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -25,12 +25,6 @@ import mimetools import eventlet from eventlet import greenio, GreenPool, sleep, wsgi, listen from paste.deploy import loadapp, appconfig - -# Hook to ensure connection resets don't blow up our servers. -# Remove with next release of Eventlet that has it in the set already. -from errno import ECONNRESET -wsgi.ACCEPT_ERRNO.add(ECONNRESET) - from eventlet.green import socket, ssl from swift.common.utils import get_logger, drop_privileges, \ @@ -124,8 +118,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs): # remaining tasks should not require elevated privileges drop_privileges(conf.get('user', 'swift')) - # finally after binding to ports and privilege drop, run app __init__ code - app = loadapp('config:%s' % conf_file, global_conf={'log_name': log_name}) + # Ensure the application can be loaded before proceeding. + loadapp('config:%s' % conf_file, global_conf={'log_name': log_name}) # redirect errors to logger and close stdio capture_stdio(logger) @@ -135,6 +129,8 @@ def run_wsgi(conf_file, app_section, *args, **kwargs): eventlet.hubs.use_hub('poll') eventlet.patcher.monkey_patch(all=False, socket=True) monkey_patch_mimetools() + app = loadapp('config:%s' % conf_file, + global_conf={'log_name': log_name}) pool = GreenPool(size=1024) try: wsgi.server(sock, app, NullLogger(), custom_pool=pool) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 34fde2dc42..35bbed44cd 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -30,7 +30,7 @@ from eventlet.support.greenlets import GreenletExit from swift.common.ring import Ring from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ - renamer, compute_eta, get_logger + compute_eta, get_logger, write_pickle from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -105,9 +105,7 @@ def invalidate_hash(suffix_dir): except Exception: return hashes[suffix] = None - with open(hashes_file + '.tmp', 'wb') as fp: - pickle.dump(hashes, fp, PICKLE_PROTOCOL) - renamer(hashes_file + '.tmp', hashes_file) + write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) def get_hashes(partition_dir, recalculate=[], do_listdir=False, @@ -157,12 +155,20 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False, modified = True sleep() if modified: - with open(hashes_file + '.tmp', 'wb') as fp: - pickle.dump(hashes, fp, PICKLE_PROTOCOL) - renamer(hashes_file + '.tmp', hashes_file) + write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) return hashed, hashes +# Hack to work around Eventlet's tpool not catching and reraising Timeouts. We +# return the Timeout, Timeout if it's raised, the caller looks for it and +# reraises it if found. +def tpooled_get_hashes(*args, **kwargs): + try: + return get_hashes(*args, **kwargs) + except Timeout, err: + return err, err + + class ObjectReplicator(Daemon): """ Replicate objects. @@ -336,9 +342,12 @@ class ObjectReplicator(Daemon): self.replication_count += 1 begin = time.time() try: - hashed, local_hash = tpool.execute(get_hashes, job['path'], + hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'], do_listdir=(self.replication_count % 10) == 0, reclaim_age=self.reclaim_age) + # See tpooled_get_hashes "Hack". + if isinstance(hashed, BaseException): + raise hashed self.suffix_hash += hashed attempts_left = self.object_ring.replica_count - 1 nodes = itertools.chain(job['nodes'], @@ -368,8 +377,12 @@ class ObjectReplicator(Daemon): local_hash[suffix] != remote_hash.get(suffix, -1)] if not suffixes: continue - hashed, local_hash = tpool.execute(get_hashes, job['path'], - recalculate=suffixes, reclaim_age=self.reclaim_age) + hashed, local_hash = tpool.execute(tpooled_get_hashes, + job['path'], recalculate=suffixes, + reclaim_age=self.reclaim_age) + # See tpooled_get_hashes "Hack". + if isinstance(hashed, BaseException): + raise hashed suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] self.rsync(node, job, suffixes) @@ -496,6 +509,7 @@ class ObjectReplicator(Daemon): self.partition_times = [] stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) + eventlet.sleep() # Give spawns a cycle try: self.run_pool = GreenPool(size=self.concurrency) jobs = self.collect_jobs() diff --git a/swift/obj/server.py b/swift/obj/server.py index 74c6d2bded..6e67dc0ff2 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -44,7 +44,7 @@ from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ DiskFileNotExist -from swift.obj.replicator import get_hashes, invalidate_hash +from swift.obj.replicator import tpooled_get_hashes, invalidate_hash DATADIR = 'objects' @@ -649,8 +649,8 @@ class ObjectController(object): response = Response(content_type=file.metadata['Content-Type'], request=request, conditional_response=True) for key, value in file.metadata.iteritems(): - if key == 'X-Object-Manifest' or \ - key.lower().startswith('x-object-meta-'): + if key.lower().startswith('x-object-meta-') or \ + key.lower() in self.allowed_headers: response.headers[key] = value response.etag = file.metadata['ETag'] response.last_modified = float(file.metadata['X-Timestamp']) @@ -708,7 +708,11 @@ class ObjectController(object): if not os.path.exists(path): mkdirs(path) suffixes = suffix.split('-') if suffix else [] - _junk, hashes = tpool.execute(get_hashes, path, recalculate=suffixes) + _junk, hashes = tpool.execute(tpooled_get_hashes, path, + recalculate=suffixes) + # See tpooled_get_hashes "Hack". + if isinstance(hashes, BaseException): + raise hashes return Response(body=pickle.dumps(hashes)) def __call__(self, env, start_response): diff --git a/swift/obj/updater.py b/swift/obj/updater.py index a7715f5a72..ed7e398309 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -132,11 +132,23 @@ class ObjectUpdater(Daemon): prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): continue - for update in os.listdir(prefix_path): + last_obj_hash = None + for update in sorted(os.listdir(prefix_path), reverse=True): update_path = os.path.join(prefix_path, update) if not os.path.isfile(update_path): continue - self.process_object_update(update_path, device) + try: + obj_hash, timestamp = update.split('-') + except ValueError: + self.logger.error( + _('ERROR async pending file with unexpected name %s') + % (update_path)) + continue + if obj_hash == last_obj_hash: + os.unlink(update_path) + else: + self.process_object_update(update_path, device) + last_obj_hash = obj_hash time.sleep(self.slowdown) try: os.rmdir(prefix_path) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index a0cc1443da..99b7201ab4 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -507,8 +507,7 @@ class Controller(object): return resp.status, resp.reason, resp.read() elif resp.status == 507: self.error_limit(node) - except Exception: - self.error_limit(node) + except (Exception, Timeout): self.exception_occurred(node, self.server_type, _('Trying to %(method)s %(path)s') % {'method': method, 'path': path}) @@ -646,6 +645,7 @@ class Controller(object): raise res.app_iter = file_iter() update_headers(res, source.getheaders()) + update_headers(res, {'accept-ranges': 'bytes'}) res.status = source.status res.content_length = source.getheader('Content-Length') if source.getheader('Content-Type'): @@ -655,6 +655,7 @@ class Controller(object): elif 200 <= source.status <= 399: res = status_map[source.status](request=req) update_headers(res, source.getheaders()) + update_headers(res, {'accept-ranges': 'bytes'}) if req.method == 'HEAD': res.content_length = source.getheader('Content-Length') if source.getheader('Content-Type'): @@ -829,6 +830,7 @@ class ObjectController(Controller): resp) resp.content_length = content_length resp.last_modified = last_modified + resp.headers['accept-ranges'] = 'bytes' return resp @@ -1326,8 +1328,8 @@ class AccountController(Controller): if value[0].lower().startswith('x-account-meta-')) if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.make_requests(req, self.app.account_ring, account_partition, - 'PUT', req.path_info, [headers] * len(accounts)) + return self.make_requests(req, self.app.account_ring, + account_partition, 'PUT', req.path_info, [headers] * len(accounts)) @public def POST(self, req): @@ -1343,8 +1345,9 @@ class AccountController(Controller): if value[0].lower().startswith('x-account-meta-')) if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.make_requests(req, self.app.account_ring, account_partition, - 'POST', req.path_info, [headers] * len(accounts)) + return self.make_requests(req, self.app.account_ring, + account_partition, 'POST', req.path_info, + [headers] * len(accounts)) @public def DELETE(self, req): @@ -1357,8 +1360,9 @@ class AccountController(Controller): 'X-CF-Trans-Id': self.trans_id} if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.make_requests(req, self.app.account_ring, account_partition, - 'DELETE', req.path_info, [headers] * len(accounts)) + return self.make_requests(req, self.app.account_ring, + account_partition, 'DELETE', req.path_info, + [headers] * len(accounts)) class BaseApplication(object): @@ -1551,6 +1555,8 @@ class Application(BaseApplication): if not client and 'x-forwarded-for' in req.headers: # remote user for other lbs client = req.headers['x-forwarded-for'].split(',')[0].strip() + if not client: + client = req.remote_addr logged_headers = None if self.log_headers: logged_headers = '\n'.join('%s: %s' % (k, v) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 29f8a75245..22a9679ca7 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -30,6 +30,8 @@ from swift.common.exceptions import ChunkReadTimeout from swift.common.utils import get_logger, readconf from swift.common.daemon import Daemon +now = datetime.datetime.now + class BadFileDownload(Exception): def __init__(self, status_code=None): @@ -234,31 +236,46 @@ class LogProcessorDaemon(Daemon): self.log_processor_container = c.get('container_name', 'log_processing_data') self.worker_count = int(c.get('worker_count', '1')) + self._keylist_mapping = None + self.processed_files_filename = 'processed_files.pickle.gz' - def run_once(self, *args, **kwargs): - for k in 'lookback_hours lookback_window'.split(): - if kwargs[k] is not None: - setattr(self, k, kwargs[k]) + def get_lookback_interval(self): + """ + :returns: lookback_start, lookback_end. + + Both or just lookback_end can be None. Otherwise, returns strings + of the form 'YYYYMMDDHH'. The interval returned is used as bounds + when looking for logs to processes. + + A returned None means don't limit the log files examined on that + side of the interval. + """ - self.logger.info(_("Beginning log processing")) - start = time.time() if self.lookback_hours == 0: lookback_start = None lookback_end = None else: delta_hours = datetime.timedelta(hours=self.lookback_hours) - lookback_start = datetime.datetime.now() - delta_hours + lookback_start = now() - delta_hours lookback_start = lookback_start.strftime('%Y%m%d%H') if self.lookback_window == 0: lookback_end = None else: delta_window = datetime.timedelta(hours=self.lookback_window) - lookback_end = datetime.datetime.now() - \ + lookback_end = now() - \ delta_hours + \ delta_window lookback_end = lookback_end.strftime('%Y%m%d%H') - self.logger.debug('lookback_start: %s' % lookback_start) - self.logger.debug('lookback_end: %s' % lookback_end) + return lookback_start, lookback_end + + def get_processed_files_list(self): + """ + :returns: a set of files that have already been processed or returns + None on error. + + Downloads the set from the stats account. Creates an empty set if + the an existing file cannot be found. + """ try: # Note: this file (or data set) will grow without bound. # In practice, if it becomes a problem (say, after many months of @@ -266,44 +283,52 @@ class LogProcessorDaemon(Daemon): # entries. Automatically pruning on each run could be dangerous. # There is not a good way to determine when an old entry should be # pruned (lookback_hours could be set to anything and could change) - processed_files_stream = self.log_processor.get_object_data( - self.log_processor_account, - self.log_processor_container, - 'processed_files.pickle.gz', - compressed=True) - buf = '\n'.join(x for x in processed_files_stream) + stream = self.log_processor.get_object_data( + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename, + compressed=True) + buf = '\n'.join(x for x in stream) if buf: - already_processed_files = cPickle.loads(buf) + files = cPickle.loads(buf) else: - already_processed_files = set() + return None except BadFileDownload, err: if err.status_code == 404: - already_processed_files = set() + files = set() else: - self.logger.error(_('Log processing unable to load list of ' - 'already processed log files')) - return - self.logger.debug(_('found %d processed files') % \ - len(already_processed_files)) - logs_to_process = self.log_processor.get_data_list(lookback_start, - lookback_end, - already_processed_files) - self.logger.info(_('loaded %d files to process') % - len(logs_to_process)) - if not logs_to_process: - self.logger.info(_("Log processing done (%0.2f minutes)") % - ((time.time() - start) / 60)) - return + return None + return files - # map - processor_args = (self.total_conf, self.logger) - results = multiprocess_collate(processor_args, logs_to_process, - self.worker_count) + def get_aggregate_data(self, processed_files, input_data): + """ + Aggregates stats data by account/hour, summing as needed. + + :param processed_files: set of processed files + :param input_data: is the output from multiprocess_collate/the plugins. + + :returns: A dict containing data aggregated from the input_data + passed in. + + The dict returned has tuple keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + key:field_value + - key corresponds to something in one of the plugin's keylist + mapping, something like the tuple (source, level, verb, code) + - field_value is the sum of the field_values for the + corresponding values in the input + + Both input_data and the dict returned are hourly aggregations of + stats. + + Multiple values for the same (account, hour, tuple key) found in + input_data are summed in the dict returned. + """ - #reduce aggr_data = {} - processed_files = already_processed_files - for item, data in results: + for item, data in input_data: # since item contains the plugin and the log name, new plugins will # "reprocess" the file and the results will be in the final csv. processed_files.add(item) @@ -315,14 +340,30 @@ class LogProcessorDaemon(Daemon): # processing plugins need to realize this existing_data[i] = current + j aggr_data[k] = existing_data + return aggr_data + + def get_final_info(self, aggr_data): + """ + Aggregates data from aggr_data based on the keylist mapping. + + :param aggr_data: The results of the get_aggregate_data function. + :returns: a dict of further aggregated data + + The dict returned has keys of the form: + (account, year, month, day, hour) + The dict returned has values that are dicts with items of this + form: + 'field_name': field_value (int) + + Data is aggregated as specified by the keylist mapping. The + keylist mapping specifies which keys to combine in aggr_data + and the final field_names for these combined keys in the dict + returned. Fields combined are summed. + """ - # group - # reduce a large number of keys in aggr_data[k] to a small number of - # output keys - keylist_mapping = self.log_processor.generate_keylist_mapping() final_info = collections.defaultdict(dict) for account, data in aggr_data.items(): - for key, mapping in keylist_mapping.items(): + for key, mapping in self.keylist_mapping.items(): if isinstance(mapping, (list, set)): value = 0 for k in mapping: @@ -336,37 +377,154 @@ class LogProcessorDaemon(Daemon): except KeyError: value = 0 final_info[account][key] = value + return final_info - # output - sorted_keylist_mapping = sorted(keylist_mapping) - columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping) - out_buf = [columns] + def store_processed_files_list(self, processed_files): + """ + Stores the proccessed files list in the stats account. + + :param processed_files: set of processed files + """ + + s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) + f = cStringIO.StringIO(s) + self.log_processor.internal_proxy.upload_file(f, + self.log_processor_account, + self.log_processor_container, + self.processed_files_filename) + + def get_output(self, final_info): + """ + :returns: a list of rows to appear in the csv file. + + The first row contains the column headers for the rest of the + rows in the returned list. + + Each row after the first row corresponds to an account's data + for that hour. + """ + + sorted_keylist_mapping = sorted(self.keylist_mapping) + columns = ['data_ts', 'account'] + sorted_keylist_mapping + output = [columns] for (account, year, month, day, hour), d in final_info.items(): - data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour) - row = [data_ts] - row.append('%s' % account) + data_ts = '%04d/%02d/%02d %02d:00:00' % \ + (int(year), int(month), int(day), int(hour)) + row = [data_ts, '%s' % (account)] for k in sorted_keylist_mapping: - row.append('%s' % d[k]) - out_buf.append(','.join(row)) - out_buf = '\n'.join(out_buf) + row.append(str(d[k])) + output.append(row) + return output + + def store_output(self, output): + """ + Takes the a list of rows and stores a csv file of the values in the + stats account. + + :param output: list of rows to appear in the csv file + + This csv file is final product of this script. + """ + + out_buf = '\n'.join([','.join(row) for row in output]) h = hashlib.md5(out_buf).hexdigest() upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h f = cStringIO.StringIO(out_buf) self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - upload_name) + self.log_processor_account, + self.log_processor_container, + upload_name) - # cleanup - s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL) - f = cStringIO.StringIO(s) - self.log_processor.internal_proxy.upload_file(f, - self.log_processor_account, - self.log_processor_container, - 'processed_files.pickle.gz') + @property + def keylist_mapping(self): + """ + :returns: the keylist mapping. + + The keylist mapping determines how the stats fields are aggregated + in the final aggregation step. + """ + + if self._keylist_mapping == None: + self._keylist_mapping = \ + self.log_processor.generate_keylist_mapping() + return self._keylist_mapping + + def process_logs(self, logs_to_process, processed_files): + """ + :param logs_to_process: list of logs to process + :param processed_files: set of processed files + + :returns: returns a list of rows of processed data. + + The first row is the column headers. The rest of the rows contain + hourly aggregate data for the account specified in the row. + + Files processed are added to the processed_files set. + + When a large data structure is no longer needed, it is deleted in + an effort to conserve memory. + """ + + # map + processor_args = (self.total_conf, self.logger) + results = multiprocess_collate(processor_args, logs_to_process, + self.worker_count) + + # reduce + aggr_data = self.get_aggregate_data(processed_files, results) + del results + + # group + # reduce a large number of keys in aggr_data[k] to a small + # number of output keys + final_info = self.get_final_info(aggr_data) + del aggr_data + + # output + return self.get_output(final_info) + + def run_once(self, *args, **kwargs): + """ + Process log files that fall within the lookback interval. + + Upload resulting csv file to stats account. + + Update processed files list and upload to stats account. + """ + + for k in 'lookback_hours lookback_window'.split(): + if k in kwargs and kwargs[k] is not None: + setattr(self, k, kwargs[k]) + + start = time.time() + self.logger.info(_("Beginning log processing")) + + lookback_start, lookback_end = self.get_lookback_interval() + self.logger.debug('lookback_start: %s' % lookback_start) + self.logger.debug('lookback_end: %s' % lookback_end) + + processed_files = self.get_processed_files_list() + if processed_files == None: + self.logger.error(_('Log processing unable to load list of ' + 'already processed log files')) + return + self.logger.debug(_('found %d processed files') % + len(processed_files)) + + logs_to_process = self.log_processor.get_data_list(lookback_start, + lookback_end, processed_files) + self.logger.info(_('loaded %d files to process') % + len(logs_to_process)) + + if logs_to_process: + output = self.process_logs(logs_to_process, processed_files) + self.store_output(output) + del output + + self.store_processed_files_list(processed_files) self.logger.info(_("Log processing done (%0.2f minutes)") % - ((time.time() - start) / 60)) + ((time.time() - start) / 60)) def multiprocess_collate(processor_args, logs_to_process, worker_count): @@ -395,7 +553,7 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count): except Queue.Empty: time.sleep(.01) else: - if not isinstance(data, BadFileDownload): + if not isinstance(data, Exception): yield item, data if not any(r.is_alive() for r in results) and out_queue.empty(): # all the workers are done and nothing is in the queue @@ -412,6 +570,8 @@ def collate_worker(processor_args, in_queue, out_queue): break try: ret = p.process_one_file(*item) - except BadFileDownload, err: + except Exception, err: + item_string = '/'.join(item[1:]) + p.logger.exception("Unable to process file '%s'" % (item_string)) ret = err out_queue.put((item, ret)) diff --git a/swift/stats/log_uploader.py b/swift/stats/log_uploader.py index a9248c7c17..4256e64692 100644 --- a/swift/stats/log_uploader.py +++ b/swift/stats/log_uploader.py @@ -150,7 +150,8 @@ class LogUploader(Daemon): def upload_all_logs(self): """ - Match files under log_dir to source_filename_pattern and upload to swift + Match files under log_dir to source_filename_pattern and upload to + swift """ pattern = self.validate_filename_pattern() if not pattern: diff --git a/test/probe/test_container_failures.py b/test/probe/test_container_failures.py index 585835d2a8..a493bffc27 100755 --- a/test/probe/test_container_failures.py +++ b/test/probe/test_container_failures.py @@ -15,13 +15,17 @@ # limitations under the License. import unittest +import os from os import kill from signal import SIGTERM from subprocess import Popen from time import sleep from uuid import uuid4 +import eventlet +import sqlite3 from swift.common import client +from swift.common.utils import hash_path, readconf from test.probe.common import get_to_final_state, kill_pids, reset_environment @@ -316,6 +320,61 @@ class TestContainerFailures(unittest.TestCase): self.assert_(object2 in [o['name'] for o in client.get_container(self.url, self.token, container)[1]]) + def _get_db_file_path(self, obj_dir): + files = sorted(os.listdir(obj_dir), reverse=True) + for file in files: + if file.endswith('db'): + return os.path.join(obj_dir, file) + + def _get_container_db_files(self, container): + opart, onodes = self.container_ring.get_nodes(self.account, container) + onode = onodes[0] + db_files = [] + for onode in onodes: + node_id = (onode['port'] - 6000) / 10 + device = onode['device'] + hash_str = hash_path(self.account, container) + server_conf = readconf('/etc/swift/container-server/%s.conf' % + node_id) + devices = server_conf['app:container-server']['devices'] + obj_dir = '%s/%s/containers/%s/%s/%s/' % (devices, + device, opart, + hash_str[-3:], hash_str) + db_files.append(self._get_db_file_path(obj_dir)) + + return db_files + + def test_locked_container_dbs(self): + + def run_test(num_locks, catch_503): + container = 'container-%s' % uuid4() + client.put_container(self.url, self.token, container) + db_files = self._get_container_db_files(container) + db_conns = [] + for i in range(num_locks): + db_conn = sqlite3.connect(db_files[i]) + db_conn.execute('begin exclusive transaction') + db_conns.append(db_conn) + if catch_503: + try: + client.delete_container(self.url, self.token, container) + except client.ClientException, e: + self.assertEquals(e.http_status, 503) + else: + client.delete_container(self.url, self.token, container) + + pool = eventlet.GreenPool() + try: + with eventlet.Timeout(15): + p = pool.spawn(run_test, 1, False) + r = pool.spawn(run_test, 2, True) + q = pool.spawn(run_test, 3, True) + pool.waitall() + except eventlet.Timeout, e: + raise Exception( + "The server did not return a 503 on container db locks, " + "it just hangs: %s" % e) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/middleware/test_swauth.py b/test/unit/common/middleware/test_swauth.py index ea78347d7a..19d07fc8fa 100644 --- a/test/unit/common/middleware/test_swauth.py +++ b/test/unit/common/middleware/test_swauth.py @@ -2354,8 +2354,7 @@ class TestAuth(unittest.TestCase): "auth": "plaintext:key"})), # GET of requested user object ('200 Ok', {}, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], + {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"}))])) resp = Request.blank('/auth/v2/act/usr', headers={'X-Auth-Admin-User': 'act:adm', @@ -2363,11 +2362,86 @@ class TestAuth(unittest.TestCase): ).get_response(self.test_auth) self.assertEquals(resp.status_int, 200) self.assertEquals(resp.body, json.dumps( - {"groups": [{"name": "act:usr"}, {"name": "act"}, - {"name": ".admin"}], + {"groups": [{"name": "act:usr"}, {"name": "act"}], "auth": "plaintext:key"})) self.assertEquals(self.test_auth.app.calls, 2) + def test_get_user_account_admin_fail_getting_account_admin(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who is an .admin as well] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of user object (reseller admin check [and fail here]) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={'X-Auth-Admin-User': 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 3) + + def test_get_user_account_admin_fail_getting_reseller_admin(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who is a .reseller_admin] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={'X-Auth-Admin-User': 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_reseller_admin_fail_getting_reseller_admin(self): + self.test_auth.app = FakeApp(iter([ + # GET of user object (account admin check) + ('200 Ok', {}, json.dumps({"groups": [{"name": "act:adm"}, + {"name": "test"}, {"name": ".reseller_admin"}], + "auth": "plaintext:key"})), + # GET of requested user object [who also is a .reseller_admin] + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={'X-Auth-Admin-User': 'act:adm', + 'X-Auth-Admin-Key': 'key'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 403) + self.assertEquals(self.test_auth.app.calls, 2) + + def test_get_user_super_admin_succeed_getting_reseller_admin(self): + self.test_auth.app = FakeApp(iter([ + # GET of requested user object + ('200 Ok', {}, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"}))])) + resp = Request.blank('/auth/v2/act/usr', + headers={'X-Auth-Admin-User': '.super_admin', + 'X-Auth-Admin-Key': 'supertest'} + ).get_response(self.test_auth) + self.assertEquals(resp.status_int, 200) + self.assertEquals(resp.body, json.dumps( + {"groups": [{"name": "act:usr"}, {"name": "act"}, + {"name": ".reseller_admin"}], + "auth": "plaintext:key"})) + self.assertEquals(self.test_auth.app.calls, 1) + def test_get_user_groups_not_found(self): self.test_auth.app = FakeApp(iter([ # GET of account container (list objects) diff --git a/test/unit/common/middleware/test_swift3.py b/test/unit/common/middleware/test_swift3.py index 8d88da8332..bca9badacb 100644 --- a/test/unit/common/middleware/test_swift3.py +++ b/test/unit/common/middleware/test_swift3.py @@ -207,16 +207,6 @@ class TestSwift3(unittest.TestCase): code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue self.assertEquals(code, 'InvalidArgument') - def test_bad_path(self): - req = Request.blank('/bucket/object/bad', - environ={'REQUEST_METHOD': 'GET'}, - headers={'Authorization': 'AWS test:tester:hmac'}) - resp = self.app(req.environ, start_response) - dom = xml.dom.minidom.parseString("".join(resp)) - self.assertEquals(dom.firstChild.nodeName, 'Error') - code = dom.getElementsByTagName('Code')[0].childNodes[0].nodeValue - self.assertEquals(code, 'InvalidURI') - def test_bad_method(self): req = Request.blank('/', environ={'REQUEST_METHOD': 'PUT'}, @@ -594,5 +584,21 @@ class TestSwift3(unittest.TestCase): self.assertEquals(swift3.canonical_string(req2), swift3.canonical_string(req3)) + def test_signed_urls(self): + class FakeApp(object): + def __call__(self, env, start_response): + self.req = Request(env) + start_response('200 OK') + start_response([]) + app = FakeApp() + local_app = swift3.filter_factory({})(app) + req = Request.blank('/bucket/object?Signature=X&Expires=Y&' + 'AWSAccessKeyId=Z', environ={'REQUEST_METHOD': 'GET'}) + req.date = datetime.now() + req.content_type = 'text/plain' + resp = local_app(req.environ, lambda *args: None) + self.assertEquals(app.req.headers['Authorization'], 'AWS Z:X') + self.assertEquals(app.req.headers['Date'], 'Y') + if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_db_replicator.py b/test/unit/common/test_db_replicator.py index 9e77f2c92f..30db570868 100644 --- a/test/unit/common/test_db_replicator.py +++ b/test/unit/common/test_db_replicator.py @@ -17,8 +17,10 @@ import unittest from contextlib import contextmanager import os import logging +import errno from swift.common import db_replicator +from swift.common import utils from swift.common.utils import normalize_timestamp from swift.container import server as container_server @@ -86,6 +88,8 @@ class ChangingMtimesOs: class FakeBroker: db_file = __file__ + get_repl_missing_table = False + db_type = 'container' def __init__(self, *args, **kwargs): return None @contextmanager @@ -104,6 +108,8 @@ class FakeBroker: def merge_items(self, *args): self.args = args def get_replication_info(self): + if self.get_repl_missing_table: + raise Exception('no such table') return {'delete_timestamp': 0, 'put_timestamp': 1, 'count': 0} def reclaim(self, item_timestamp, sync_timestamp): pass @@ -202,6 +208,35 @@ class TestDBReplicator(unittest.TestCase): replicator = TestReplicator({}) replicator._replicate_object('0', 'file', 'node_id') + def test_replicate_object_quarantine(self): + replicator = TestReplicator({}) + was_db_file = replicator.brokerclass.db_file + try: + + def mock_renamer(was, new, cause_colision=False): + if cause_colision and '-' not in new: + raise OSError(errno.EEXIST, "File already exists") + self.assertEquals('/a/b/c/d/e', was) + if '-' in new: + self.assert_( + new.startswith('/a/quarantined/containers/e-')) + else: + self.assertEquals('/a/quarantined/containers/e', new) + + def mock_renamer_error(was, new): + return mock_renamer(was, new, cause_colision=True) + was_renamer = db_replicator.renamer + db_replicator.renamer = mock_renamer + db_replicator.lock_parent_directory = lock_parent_directory + replicator.brokerclass.get_repl_missing_table = True + replicator.brokerclass.db_file = '/a/b/c/d/e/hey' + replicator._replicate_object('0', 'file', 'node_id') + # try the double quarantine + db_replicator.renamer = mock_renamer_error + replicator._replicate_object('0', 'file', 'node_id') + finally: + replicator.brokerclass.db_file = was_db_file + db_replicator.renamer = was_renamer # def test_dispatch(self): # rpc = db_replicator.ReplicatorRpc('/', '/', FakeBroker, False) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 6628d7dd64..7463e6636c 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -343,6 +343,19 @@ class TestObjectController(unittest.TestCase): "Content-Encoding" in resp.headers) self.assertEquals(resp.headers['Content-Type'], 'application/x-test') + req = Request.blank('/sda1/p/a/c/o', + environ={'REQUEST_METHOD': 'HEAD'}) + resp = self.object_controller.HEAD(req) + self.assert_("X-Object-Meta-1" not in resp.headers and + "X-Object-Meta-Two" not in resp.headers and + "X-Object-Meta-3" in resp.headers and + "X-Object-Meta-4" in resp.headers and + "Foo" in resp.headers and + "Bar" in resp.headers and + "Baz" not in resp.headers and + "Content-Encoding" in resp.headers) + self.assertEquals(resp.headers['Content-Type'], 'application/x-test') + timestamp = normalize_timestamp(time()) req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'POST'}, diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 52e327d1b8..7f43f314d2 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -20,14 +20,17 @@ import unittest from gzip import GzipFile from shutil import rmtree from time import time +from distutils.dir_util import mkpath from eventlet import spawn, TimeoutError, listen from eventlet.timeout import Timeout from swift.obj import updater as object_updater, server as object_server +from swift.obj.server import ASYNCDIR from swift.common.ring import RingData from swift.common import utils -from swift.common.utils import hash_path, normalize_timestamp, mkdirs +from swift.common.utils import hash_path, normalize_timestamp, mkdirs, \ + write_pickle class TestObjectUpdater(unittest.TestCase): @@ -48,7 +51,7 @@ class TestObjectUpdater(unittest.TestCase): os.mkdir(self.devices_dir) self.sda1 = os.path.join(self.devices_dir, 'sda1') os.mkdir(self.sda1) - os.mkdir(os.path.join(self.sda1,'tmp')) + os.mkdir(os.path.join(self.sda1, 'tmp')) def tearDown(self): rmtree(self.testdir, ignore_errors=1) @@ -70,6 +73,45 @@ class TestObjectUpdater(unittest.TestCase): self.assertEquals(cu.node_timeout, 5) self.assert_(cu.get_container_ring() is not None) + def test_object_sweep(self): + prefix_dir = os.path.join(self.sda1, ASYNCDIR, 'abc') + mkpath(prefix_dir) + + objects = { + 'a': [1089.3, 18.37, 12.83, 1.3], + 'b': [49.4, 49.3, 49.2, 49.1], + 'c': [109984.123], + } + + expected = set() + for o, timestamps in objects.iteritems(): + ohash = hash_path('account', 'container', o) + for t in timestamps: + o_path = os.path.join(prefix_dir, ohash + '-' + + normalize_timestamp(t)) + if t == timestamps[0]: + expected.add(o_path) + write_pickle({}, o_path) + + seen = set() + + class MockObjectUpdater(object_updater.ObjectUpdater): + def process_object_update(self, update_path, device): + seen.add(update_path) + os.unlink(update_path) + + cu = MockObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'node_timeout': '5', + }) + cu.object_sweep(self.sda1) + self.assert_(not os.path.exists(prefix_dir)) + self.assertEqual(expected, seen) + def test_run_once(self): cu = object_updater.ObjectUpdater({ 'devices': self.devices_dir, @@ -103,6 +145,7 @@ class TestObjectUpdater(unittest.TestCase): self.assert_(os.path.exists(op_path)) bindsock = listen(('127.0.0.1', 0)) + def accepter(sock, return_code): try: with Timeout(3): @@ -123,6 +166,7 @@ class TestObjectUpdater(unittest.TestCase): except BaseException, err: return err return None + def accept(return_codes): codes = iter(return_codes) try: @@ -139,7 +183,7 @@ class TestObjectUpdater(unittest.TestCase): except BaseException, err: return err return None - event = spawn(accept, [201,500]) + event = spawn(accept, [201, 500]) for dev in cu.get_container_ring().devs: if dev is not None: dev['port'] = bindsock.getsockname()[1] diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 782a31d2c5..a254ac1ad6 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -161,8 +161,10 @@ def fake_http_connect(*code_iter, **kwargs): self.body = body def getresponse(self): - if 'raise_exc' in kwargs: + if kwargs.get('raise_exc'): raise Exception('test') + if kwargs.get('raise_timeout_exc'): + raise TimeoutError() return self def getexpect(self): @@ -341,6 +343,14 @@ class TestController(unittest.TestCase): self.assertEqual(p, partition) self.assertEqual(n, nodes) + def test_make_requests(self): + with save_globals(): + proxy_server.http_connect = fake_http_connect(200) + partition, nodes = self.controller.account_info(self.account) + proxy_server.http_connect = fake_http_connect(201, + raise_timeout_exc=True) + self.controller._make_request(nodes, partition, 'POST','/','','') + # tests if 200 is cached and used def test_account_info_200(self): with save_globals(): @@ -966,6 +976,9 @@ class TestObjectController(unittest.TestCase): if expected < 400: self.assert_('x-works' in res.headers) self.assertEquals(res.headers['x-works'], 'yes') + self.assert_('accept-ranges' in res.headers) + self.assertEquals(res.headers['accept-ranges'], 'bytes') + test_status_map((200, 404, 404), 200) test_status_map((200, 500, 404), 200) test_status_map((304, 500, 404), 304) @@ -1237,7 +1250,7 @@ class TestObjectController(unittest.TestCase): resp = controller.best_response(req, [200] * 3, ['OK'] * 3, [''] * 3, 'Object', etag='68b329da9893e34099c7d8ad5cb9c940') self.assertEquals(resp.etag, '68b329da9893e34099c7d8ad5cb9c940') - + def test_proxy_passes_content_type(self): with save_globals(): req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'}) @@ -1893,8 +1906,8 @@ class TestObjectController(unittest.TestCase): _test_sockets orig_update_request = prosrv.update_request - def broken_update_request(env, req): - raise Exception('fake') + def broken_update_request(*args, **kwargs): + raise Exception('fake: this should be printed') prosrv.update_request = broken_update_request sock = connect_tcp(('localhost', prolis.getsockname()[1])) @@ -1925,6 +1938,35 @@ class TestObjectController(unittest.TestCase): self.assertEquals(headers[:len(exp)], exp) self.assert_('\r\nContent-Length: 0\r\n' in headers) + def test_client_ip_logging(self): + # test that the client ip field in the log gets populated with the + # ip instead of being blank + (prosrv, acc1srv, acc2srv, con2srv, con2srv, obj1srv, obj2srv) = \ + _test_servers + (prolis, acc1lis, acc2lis, con2lis, con2lis, obj1lis, obj2lis) = \ + _test_sockets + + class Logger(object): + + def info(self, msg): + self.msg = msg + + orig_logger, orig_access_logger = prosrv.logger, prosrv.access_logger + prosrv.logger = prosrv.access_logger = Logger() + sock = connect_tcp(('localhost', prolis.getsockname()[1])) + fd = sock.makefile() + fd.write( + 'GET /v1/a?format=json HTTP/1.1\r\nHost: localhost\r\n' + 'Connection: close\r\nX-Auth-Token: t\r\n' + 'Content-Length: 0\r\n' + '\r\n') + fd.flush() + headers = readuntil2crlfs(fd) + exp = 'HTTP/1.1 200' + self.assertEquals(headers[:len(exp)], exp) + exp = '127.0.0.1 127.0.0.1' + self.assert_(exp in prosrv.logger.msg) + def test_chunked_put_logging(self): # GET account with a query string to test that # Application.log_request logs the query string. Also, throws @@ -2448,7 +2490,29 @@ class TestObjectController(unittest.TestCase): self.assert_(res.client_disconnect) finally: self.app.object_chunk_size = orig_object_chunk_size + + def test_response_get_accept_ranges_header(self): + with save_globals(): + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'GET'}) + self.app.update_request(req) + controller = proxy_server.ObjectController(self.app, 'account', + 'container', 'object') + proxy_server.http_connect = fake_http_connect(200, 200, 200) + resp = controller.GET(req) + self.assert_('accept-ranges' in resp.headers) + self.assertEquals(resp.headers['accept-ranges'], 'bytes') + def test_response_head_accept_ranges_header(self): + with save_globals(): + req = Request.blank('/a/c/o', environ={'REQUEST_METHOD': 'HEAD'}) + self.app.update_request(req) + controller = proxy_server.ObjectController(self.app, 'account', + 'container', 'object') + proxy_server.http_connect = fake_http_connect(200, 200, 200) + resp = controller.HEAD(req) + self.assert_('accept-ranges' in resp.headers) + self.assertEquals(resp.headers['accept-ranges'], 'bytes') + def test_GET_calls_authorize(self): called = [False] @@ -2788,6 +2852,28 @@ class TestContainerController(unittest.TestCase): finally: self.app.object_chunk_size = orig_object_chunk_size + def test_response_get_accept_ranges_header(self): + with save_globals(): + proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + controller = proxy_server.ContainerController(self.app, 'account', + 'container') + req = Request.blank('/a/c?format=json') + self.app.update_request(req) + res = controller.GET(req) + self.assert_('accept-ranges' in res.headers) + self.assertEqual(res.headers['accept-ranges'], 'bytes') + + def test_response_head_accept_ranges_header(self): + with save_globals(): + proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + controller = proxy_server.ContainerController(self.app, 'account', + 'container') + req = Request.blank('/a/c?format=json') + self.app.update_request(req) + res = controller.HEAD(req) + self.assert_('accept-ranges' in res.headers) + self.assertEqual(res.headers['accept-ranges'], 'bytes') + def test_PUT_metadata(self): self.metadata_helper('PUT') @@ -3093,7 +3179,28 @@ class TestAccountController(unittest.TestCase): res.body self.assert_(hasattr(res, 'bytes_transferred')) self.assertEquals(res.bytes_transferred, 2) - + + def test_response_get_accept_ranges_header(self): + with save_globals(): + proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + controller = proxy_server.AccountController(self.app, 'account') + req = Request.blank('/a?format=json') + self.app.update_request(req) + res = controller.GET(req) + self.assert_('accept-ranges' in res.headers) + self.assertEqual(res.headers['accept-ranges'], 'bytes') + + def test_response_head_accept_ranges_header(self): + with save_globals(): + proxy_server.http_connect = fake_http_connect(200, 200, body='{}') + controller = proxy_server.AccountController(self.app, 'account') + req = Request.blank('/a?format=json') + self.app.update_request(req) + res = controller.HEAD(req) + res.body + self.assert_('accept-ranges' in res.headers) + self.assertEqual(res.headers['accept-ranges'], 'bytes') + def test_response_client_disconnect_attr(self): with save_globals(): proxy_server.http_connect = fake_http_connect(200, 200, body='{}') diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 80b4822560..c1b3b68b19 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -16,6 +16,10 @@ import unittest from test.unit import tmpfile import Queue +import datetime +import hashlib +import pickle +import time from swift.common import internal_proxy from swift.stats import log_processor @@ -26,7 +30,6 @@ class FakeUploadApp(object): def __init__(self, *args, **kwargs): pass - class DumbLogger(object): def __getattr__(self, n): return self.foo @@ -77,7 +80,7 @@ class DumbInternalProxy(object): return self.code, data() class TestLogProcessor(unittest.TestCase): - + access_test_line = 'Jul 9 04:14:30 saio proxy-server 1.2.3.4 4.5.6.7 '\ '09/Jul/2010/04/14/30 GET '\ '/v1/acct/foo/bar?format=json&foo HTTP/1.0 200 - '\ @@ -85,7 +88,7 @@ class TestLogProcessor(unittest.TestCase): '6 95 - txfa431231-7f07-42fd-8fc7-7da9d8cc1f90 - 0.0262' stats_test_line = 'account,1,2,3' proxy_config = {'log-processor': { - + } } @@ -339,7 +342,7 @@ use = egg:swift#proxy def test_collate_worker_error(self): def get_object_data(*a,**kw): - raise log_processor.BadFileDownload() + raise Exception() orig_get_object_data = log_processor.LogProcessor.get_object_data try: log_processor.LogProcessor.get_object_data = get_object_data @@ -361,8 +364,7 @@ use = egg:swift#proxy self.assertEquals(item, work_request) # these only work for Py2.7+ #self.assertIsInstance(ret, log_processor.BadFileDownload) - self.assertTrue(isinstance(ret, log_processor.BadFileDownload), - type(ret)) + self.assertTrue(isinstance(ret, Exception)) finally: log_processor.LogProcessor.get_object_data = orig_get_object_data @@ -426,3 +428,407 @@ use = egg:swift#proxy finally: log_processor.LogProcessor._internal_proxy = None log_processor.LogProcessor.get_object_data = orig_get_object_data + +class TestLogProcessorDaemon(unittest.TestCase): + + def test_get_lookback_interval(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, lookback_hours, lookback_window): + self.lookback_hours = lookback_hours + self.lookback_window = lookback_window + + try: + d = datetime.datetime + + for x in [ + [d(2011, 1, 1), 0, 0, None, None], + [d(2011, 1, 1), 120, 0, '2010122700', None], + [d(2011, 1, 1), 120, 24, '2010122700', '2010122800'], + [d(2010, 1, 2, 3, 4), 120, 48, '2009122803', '2009123003'], + [d(2009, 5, 6, 7, 8), 1200, 100, '2009031707', '2009032111'], + [d(2008, 9, 10, 11, 12), 3000, 1000, '2008050811', '2008061903'], + ]: + + log_processor.now = lambda: x[0] + + d = MockLogProcessorDaemon(x[1], x[2]) + self.assertEquals((x[3], x[4]), d.get_lookback_interval()) + finally: + log_processor.now = datetime.datetime.now + + def test_get_processed_files_list(self): + class MockLogProcessor(): + def __init__(self, stream): + self.stream = stream + + def get_object_data(self, *args, **kwargs): + return self.stream + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, stream): + self.log_processor = MockLogProcessor(stream) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + file_list = set(['a', 'b', 'c']) + + for s, l in [['', None], + [pickle.dumps(set()).split('\n'), set()], + [pickle.dumps(file_list).split('\n'), file_list], + ]: + + self.assertEquals(l, + MockLogProcessorDaemon(s).get_processed_files_list()) + + def test_get_processed_files_list_bad_file_downloads(self): + class MockLogProcessor(): + def __init__(self, status_code): + self.err = log_processor.BadFileDownload(status_code) + + def get_object_data(self, *a, **k): + raise self.err + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, status_code): + self.log_processor = MockLogProcessor(status_code) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + for c, l in [[404, set()], [503, None], [None, None]]: + self.assertEquals(l, + MockLogProcessorDaemon(c).get_processed_files_list()) + + def test_get_aggregate_data(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys that are easier to work with + + processed_files = set() + + data_in = [ + ['file1', { + 'acct1_time1': {'field1': 1, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + ], + ['file2', {'acct1_time1': {'field1': 10}}], + ] + + expected_data_out = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + pass + + d = MockLogProcessorDaemon() + data_out = d.get_aggregate_data(processed_files, data_in) + + for k, v in expected_data_out.items(): + self.assertEquals(v, data_out[k]) + + self.assertEquals(set(['file1', 'file2']), processed_files) + + def test_get_final_info(self): + # when run "for real" + # the various keys/values in the input and output + # dictionaries are often not simple strings + # for testing we can use keys/values that are easier to work with + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = { + 'out_field1':['field1', 'field2', 'field3'], + 'out_field2':['field2', 'field3'], + 'out_field3':['field3'], + 'out_field4':'field4', + 'out_field5':['field6', 'field7', 'field8'], + 'out_field6':['field6'], + 'out_field7':'field7', + } + + data_in = { + 'acct1_time1': {'field1': 11, 'field2': 2, 'field3': 3, + 'field4': 8, 'field5': 11}, + 'acct1_time2': {'field1': 4, 'field2': 5}, + 'acct2_time1': {'field1': 6, 'field2': 7}, + 'acct3_time3': {'field1': 8, 'field2': 9}, + } + + expected_data_out = { + 'acct1_time1': {'out_field1': 16, 'out_field2': 5, + 'out_field3': 3, 'out_field4': 8, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct1_time2': {'out_field1': 9, 'out_field2': 5, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct2_time1': {'out_field1': 13, 'out_field2': 7, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + 'acct3_time3': {'out_field1': 17, 'out_field2': 9, + 'out_field3': 0, 'out_field4': 0, 'out_field5': 0, + 'out_field6': 0, 'out_field7': 0,}, + } + + self.assertEquals(expected_data_out, + MockLogProcessorDaemon().get_final_info(data_in)) + + def test_store_processed_files_list(self): + class MockInternalProxy: + def __init__(self, test, daemon, processed_files): + self.test = test + self.daemon = daemon + self.processed_files = processed_files + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.processed_files, + pickle.loads(f.getvalue())) + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.daemon.processed_files_filename, + filename) + + class MockLogProcessor: + def __init__(self, test, daemon, processed_files): + self.internal_proxy = MockInternalProxy(test, daemon, + processed_files) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, processed_files): + self.log_processor = \ + MockLogProcessor(test, self, processed_files) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + processed_files = set(['a', 'b', 'c']) + MockLogProcessorDaemon(self, processed_files).\ + store_processed_files_list(processed_files) + + def test_get_output(self): + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self._keylist_mapping = {'a':None, 'b':None, 'c':None} + + data_in = { + ('acct1', 2010, 1, 1, 0): {'a':1, 'b':2, 'c':3}, + ('acct1', 2010, 10, 10, 10): {'a':10, 'b':20, 'c':30}, + ('acct2', 2008, 3, 6, 9): {'a':8, 'b':9, 'c':12}, + ('acct3', 2005, 4, 8, 16): {'a':1, 'b':5, 'c':25}, + } + + expected_data_out = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/01/01 00:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + data_out = MockLogProcessorDaemon().get_output(data_in) + self.assertEquals(expected_data_out[0], data_out[0]) + + for row in data_out[1:]: + self.assert_(row in expected_data_out) + + for row in expected_data_out[1:]: + self.assert_(row in data_out) + + def test_store_output(self): + try: + real_strftime = time.strftime + mock_strftime_return = '2010/03/02/01/' + def mock_strftime(format): + self.assertEquals('%Y/%m/%d/%H/', format) + return mock_strftime_return + log_processor.time.strftime = mock_strftime + + data_in = [ + ['data_ts', 'account', 'a', 'b', 'c'], + ['2010/10/10 10:00:00', 'acct1', '1', '2', '3'], + ['2010/10/10 10:00:00', 'acct1', '10', '20', '30'], + ['2008/03/06 09:00:00', 'acct2', '8', '9', '12'], + ['2005/04/08 16:00:00', 'acct3', '1', '5', '25'], + ] + + expected_output = '\n'.join([','.join(row) for row in data_in]) + h = hashlib.md5(expected_output).hexdigest() + expected_filename = '%s%s.csv.gz' % (mock_strftime_return, h) + + class MockInternalProxy: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.test = test + self.daemon = daemon + self.expected_filename = expected_filename + self.expected_output = expected_output + + def upload_file(self, f, account, container, filename): + self.test.assertEquals(self.daemon.log_processor_account, + account) + self.test.assertEquals(self.daemon.log_processor_container, + container) + self.test.assertEquals(self.expected_filename, filename) + self.test.assertEquals(self.expected_output, f.getvalue()) + + class MockLogProcessor: + def __init__(self, test, daemon, expected_filename, + expected_output): + self.internal_proxy = MockInternalProxy(test, daemon, + expected_filename, expected_output) + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test, expected_filename, expected_output): + self.log_processor = MockLogProcessor(test, self, + expected_filename, expected_output) + self.log_processor_account = 'account' + self.log_processor_container = 'container' + self.processed_files_filename = 'filename' + + MockLogProcessorDaemon(self, expected_filename, expected_output).\ + store_output(data_in) + finally: + log_processor.time.strftime = real_strftime + + def test_keylist_mapping(self): + # Kind of lame test to see if the propery is both + # generated by a particular method and cached properly. + # The method that actually generates the mapping is + # tested elsewhere. + + value_return = 'keylist_mapping' + class MockLogProcessor: + def __init__(self): + self.call_count = 0 + + def generate_keylist_mapping(self): + self.call_count += 1 + return value_return + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.log_processor = MockLogProcessor() + self._keylist_mapping = None + + d = MockLogProcessorDaemon() + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(value_return, d.keylist_mapping) + self.assertEquals(1, d.log_processor.call_count) + + def test_process_logs(self): + try: + mock_logs_to_process = 'logs_to_process' + mock_processed_files = 'processed_files' + + real_multiprocess_collate = log_processor.multiprocess_collate + multiprocess_collate_return = 'multiprocess_collate_return' + + get_aggregate_data_return = 'get_aggregate_data_return' + get_final_info_return = 'get_final_info_return' + get_output_return = 'get_output_return' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.test = test + self.total_conf = 'total_conf' + self.logger = 'logger' + self.worker_count = 'worker_count' + + def get_aggregate_data(self, processed_files, results): + self.test.assertEquals(mock_processed_files, processed_files) + self.test.assertEquals(multiprocess_collate_return, results) + return get_aggregate_data_return + + def get_final_info(self, aggr_data): + self.test.assertEquals(get_aggregate_data_return, aggr_data) + return get_final_info_return + + def get_output(self, final_info): + self.test.assertEquals(get_final_info_return, final_info) + return get_output_return + + d = MockLogProcessorDaemon(self) + + def mock_multiprocess_collate(processor_args, logs_to_process, + worker_count): + self.assertEquals(d.total_conf, processor_args[0]) + self.assertEquals(d.logger, processor_args[1]) + + self.assertEquals(mock_logs_to_process, logs_to_process) + self.assertEquals(d.worker_count, worker_count) + + return multiprocess_collate_return + + log_processor.multiprocess_collate = mock_multiprocess_collate + + output = d.process_logs(mock_logs_to_process, mock_processed_files) + self.assertEquals(get_output_return, output) + finally: + log_processor.multiprocess_collate = real_multiprocess_collate + + def test_run_once_get_processed_files_list_returns_none(self): + class MockLogProcessor: + def get_data_list(self, lookback_start, lookback_end, + processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor() + + def get_lookback_interval(self): + return None, None + + def get_processed_files_list(self): + return None + + MockLogProcessorDaemon().run_once() + + def test_run_once_no_logs_to_process(self): + class MockLogProcessor(): + def __init__(self, daemon, test): + self.daemon = daemon + self.test = test + + def get_data_list(self, lookback_start, lookback_end, + processed_files): + self.test.assertEquals(self.daemon.lookback_start, + lookback_start) + self.test.assertEquals(self.daemon.lookback_end, + lookback_end) + self.test.assertEquals(self.daemon.processed_files, + processed_files) + return [] + + class MockLogProcessorDaemon(log_processor.LogProcessorDaemon): + def __init__(self, test): + self.logger = DumbLogger() + self.log_processor = MockLogProcessor(self, test) + self.lookback_start = 'lookback_start' + self.lookback_end = 'lookback_end' + self.processed_files = ['a', 'b', 'c'] + + def get_lookback_interval(self): + return self.lookback_start, self.lookback_end + + def get_processed_files_list(self): + return self.processed_files + + def process_logs(logs_to_process, processed_files): + raise unittest.TestCase.failureException, \ + 'Method should not be called' + + MockLogProcessorDaemon(self).run_once()