#!/usr/bin/env python # Copyright (c) 2010-2012 OpenStack, LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import os from ConfigParser import ConfigParser from optparse import OptionParser from sys import exit, stdout, stderr from time import time try: import simplejson as json except ImportError: import json from eventlet import GreenPool, hubs, patcher, Timeout from eventlet.pools import Pool from swift.common import direct_client from swiftclient import ClientException, Connection, get_auth from swift.common.ring import Ring from swift.common.utils import compute_eta, get_time_units, config_true_value unmounted = [] notfound = [] json_output = False debug = False def get_error_log(prefix): def error_log(msg_or_exc): global debug, unmounted, notfound if hasattr(msg_or_exc, 'http_status'): identifier = '%s:%s/%s' % (msg_or_exc.http_host, msg_or_exc.http_port, msg_or_exc.http_device) if msg_or_exc.http_status == 507: if identifier not in unmounted: unmounted.append(identifier) print >>stderr, 'ERROR: %s is unmounted -- This will ' \ 'cause replicas designated for that device to be ' \ 'considered missing until resolved or the ring is ' \ 'updated.' % (identifier) stderr.flush() if debug and identifier not in notfound: notfound.append(identifier) print >>stderr, 'ERROR: %s returned a 404' % (identifier) stderr.flush() if not hasattr(msg_or_exc, 'http_status') or \ msg_or_exc.http_status not in (404, 507): print >>stderr, 'ERROR: %s: %s' % (prefix, msg_or_exc) stderr.flush() return error_log def container_dispersion_report(coropool, connpool, account, container_ring, retries): with connpool.item() as conn: containers = [c['name'] for c in conn.get_account( prefix='dispersion_', full_listing=True)[1]] containers_listed = len(containers) if not containers_listed: print >>stderr, 'No containers to query. Has ' \ 'swift-dispersion-populate been run?' stderr.flush() return retries_done = [0] containers_queried = [0] container_copies_found = [0] * (container_ring.replica_count + 1) begun = time() next_report = [time() + 2] def direct(container, part, nodes): found_count = 0 for node in nodes: error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node) try: attempts, _junk = direct_client.retry( direct_client.direct_head_container, node, part, account, container, error_log=error_log, retries=retries) retries_done[0] += attempts - 1 found_count += 1 except ClientException, err: if err.http_status not in (404, 507): error_log('Giving up on /%s/%s/%s: %s' % (part, account, container, err)) except (Exception, Timeout), err: error_log('Giving up on /%s/%s/%s: %s' % (part, account, container, err)) container_copies_found[found_count] += 1 containers_queried[0] += 1 if time() >= next_report[0]: next_report[0] = time() + 5 eta, eta_unit = compute_eta(begun, containers_queried[0], containers_listed) if not json_output: print '\r\x1B[KQuerying containers: %d of %d, %d%s left, %d ' \ 'retries' % (containers_queried[0], containers_listed, round(eta), eta_unit, retries_done[0]), stdout.flush() container_parts = {} for container in containers: part, nodes = container_ring.get_nodes(account, container) if part not in container_parts: container_parts[part] = part coropool.spawn(direct, container, part, nodes) coropool.waitall() distinct_partitions = len(container_parts) copies_expected = distinct_partitions * container_ring.replica_count copies_found = sum(a * b for a, b in enumerate(container_copies_found)) value = 100.0 * copies_found / copies_expected elapsed, elapsed_unit = get_time_units(time() - begun) if not json_output: print '\r\x1B[KQueried %d containers for dispersion reporting, ' \ '%d%s, %d retries' % (containers_listed, round(elapsed), elapsed_unit, retries_done[0]) if containers_listed - distinct_partitions: print 'There were %d overlapping partitions' % ( containers_listed - distinct_partitions) for copies in xrange(container_ring.replica_count - 1, -1, -1): missing_copies = container_ring.replica_count - copies if container_copies_found[copies]: print missing_string(container_copies_found[copies], missing_copies, container_ring.replica_count) print '%.02f%% of container copies found (%d of %d)' % ( value, copies_found, copies_expected) print 'Sample represents %.02f%% of the container partition space' % ( 100.0 * distinct_partitions / container_ring.partition_count) stdout.flush() return None else: results = {'retries': retries_done[0], 'overlapping': containers_listed - distinct_partitions, 'pct_found': value, 'copies_found': copies_found, 'copies_expected': copies_expected} for copies in xrange(container_ring.replica_count): missing_copies = container_ring.replica_count - copies results['missing_%d' % (missing_copies)] = \ container_copies_found[copies] return results def object_dispersion_report(coropool, connpool, account, object_ring, retries): container = 'dispersion_objects' with connpool.item() as conn: try: objects = [o['name'] for o in conn.get_container( container, prefix='dispersion_', full_listing=True)[1]] except ClientException, err: if err.http_status != 404: raise print >>stderr, 'No objects to query. Has ' \ 'swift-dispersion-populate been run?' stderr.flush() return objects_listed = len(objects) if not objects_listed: print >>stderr, 'No objects to query. Has swift-dispersion-populate ' \ 'been run?' stderr.flush() return retries_done = [0] objects_queried = [0] object_copies_found = [0] * (object_ring.replica_count + 1) begun = time() next_report = [time() + 2] def direct(obj, part, nodes): found_count = 0 for node in nodes: error_log = get_error_log('%(ip)s:%(port)s/%(device)s' % node) try: attempts, _junk = direct_client.retry( direct_client.direct_head_object, node, part, account, container, obj, error_log=error_log, retries=retries) retries_done[0] += attempts - 1 found_count += 1 except ClientException, err: if err.http_status not in (404, 507): error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account, container, obj, err)) except (Exception, Timeout), err: error_log('Giving up on /%s/%s/%s/%s: %s' % (part, account, container, obj, err)) object_copies_found[found_count] += 1 objects_queried[0] += 1 if time() >= next_report[0]: next_report[0] = time() + 5 eta, eta_unit = compute_eta(begun, objects_queried[0], objects_listed) if not json_output: print '\r\x1B[KQuerying objects: %d of %d, %d%s left, %d ' \ 'retries' % (objects_queried[0], objects_listed, round(eta), eta_unit, retries_done[0]), stdout.flush() object_parts = {} for obj in objects: part, nodes = object_ring.get_nodes(account, container, obj) if part not in object_parts: object_parts[part] = part coropool.spawn(direct, obj, part, nodes) coropool.waitall() distinct_partitions = len(object_parts) copies_expected = distinct_partitions * object_ring.replica_count copies_found = sum(a * b for a, b in enumerate(object_copies_found)) value = 100.0 * copies_found / copies_expected elapsed, elapsed_unit = get_time_units(time() - begun) if not json_output: print '\r\x1B[KQueried %d objects for dispersion reporting, ' \ '%d%s, %d retries' % (objects_listed, round(elapsed), elapsed_unit, retries_done[0]) if objects_listed - distinct_partitions: print 'There were %d overlapping partitions' % ( objects_listed - distinct_partitions) for copies in xrange(object_ring.replica_count - 1, -1, -1): missing_copies = object_ring.replica_count - copies if object_copies_found[copies]: print missing_string(object_copies_found[copies], missing_copies, object_ring.replica_count) print '%.02f%% of object copies found (%d of %d)' % \ (value, copies_found, copies_expected) print 'Sample represents %.02f%% of the object partition space' % ( 100.0 * distinct_partitions / object_ring.partition_count) stdout.flush() return None else: results = {'retries': retries_done[0], 'overlapping': objects_listed - distinct_partitions, 'pct_found': value, 'copies_found': copies_found, 'copies_expected': copies_expected} for copies in xrange(object_ring.replica_count): missing_copies = object_ring.replica_count - copies results['missing_%d' % (missing_copies)] = \ object_copies_found[copies] return results def missing_string(partition_count, missing_copies, copy_count): exclamations = '' missing_string = str(missing_copies) if missing_copies == copy_count: exclamations = '!!! ' missing_string = 'all' elif copy_count - missing_copies == 1: exclamations = '! ' verb_string = 'was' partition_string = 'partition' if partition_count > 1: verb_string = 'were' partition_string = 'partitions' copy_string = 'copy' if missing_copies > 1: copy_string = 'copies' return '%sThere %s %d %s missing %s %s.' % ( exclamations, verb_string, partition_count, partition_string, missing_string, copy_string ) if __name__ == '__main__': patcher.monkey_patch() hubs.get_hub().debug_exceptions = False parser = OptionParser(usage=''' Usage: %prog [options] [conf_file] [conf_file] defaults to /etc/swift/stats.conf'''.strip()) parser.add_option('-j', '--dump-json', action='store_true', default=False, help='dump dispersion report in json format') parser.add_option('-d', '--debug', action='store_true', default=False, help='print 404s to standard error') options, args = parser.parse_args() conffile = '/etc/swift/dispersion.conf' if args: conffile = args.pop(0) c = ConfigParser() if not c.read(conffile): exit('Unable to read config file: %s' % conffile) conf = dict(c.items('dispersion')) swift_dir = conf.get('swift_dir', '/etc/swift') dispersion_coverage = int(conf.get('dispersion_coverage', 1)) retries = int(conf.get('retries', 5)) concurrency = int(conf.get('concurrency', 25)) if options.dump_json or config_true_value(conf.get('dump_json', 'no')): json_output = True if options.debug: debug = True coropool = GreenPool(size=concurrency) url, token = get_auth(conf['auth_url'], conf['auth_user'], conf['auth_key'], auth_version=conf.get('auth_version', '1.0')) account = url.rsplit('/', 1)[1] connpool = Pool(max_size=concurrency) connpool.create = lambda: Connection( conf['auth_url'], conf['auth_user'], conf['auth_key'], retries=retries, preauthurl=url, preauthtoken=token) container_ring = Ring(swift_dir, ring_name='container') object_ring = Ring(swift_dir, ring_name='object') container_result = container_dispersion_report(coropool, connpool, account, container_ring, retries) object_result = object_dispersion_report(coropool, connpool, account, object_ring, retries) if json_output: print json.dumps({"container": container_result, "object": object_result})