From bc3a59bdd36fbd90554e73d419d8767a3845f9af Mon Sep 17 00:00:00 2001 From: Shreeya Deshpande Date: Tue, 2 Apr 2024 14:33:29 -0700 Subject: [PATCH] Refactor utils - Move statsd client into it's own module - Move all logging functions into their own module - Move all config functions into their own module - Move all helper functions into their own module Partial-Bug: #2015274 Change-Id: Ic4b5005e3efffa8dba17d91a41e46d5c68533f9a --- swift/common/statsd_client.py | 167 ++ swift/common/utils/__init__.py | 1687 +---------------- swift/common/utils/base.py | 136 ++ swift/common/utils/config.py | 440 +++++ swift/common/utils/logs.py | 995 ++++++++++ test/debug_logger.py | 8 +- test/unit/__init__.py | 3 +- .../common/middleware/s3api/test_s3api.py | 3 +- .../common/middleware/test_proxy_logging.py | 3 +- test/unit/common/middleware/test_tempauth.py | 3 +- test/unit/common/test_utils.py | 105 +- test/unit/helpers.py | 10 +- test/unit/proxy/test_server.py | 12 +- 13 files changed, 1869 insertions(+), 1703 deletions(-) create mode 100644 swift/common/statsd_client.py create mode 100644 swift/common/utils/base.py create mode 100644 swift/common/utils/config.py create mode 100644 swift/common/utils/logs.py diff --git a/swift/common/statsd_client.py b/swift/common/statsd_client.py new file mode 100644 index 0000000000..c34679c497 --- /dev/null +++ b/swift/common/statsd_client.py @@ -0,0 +1,167 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Statsd Client """ + +import socket +import time +import warnings +from contextlib import closing +from random import random + +import six + + +class StatsdClient(object): + def __init__(self, host, port, base_prefix='', tail_prefix='', + default_sample_rate=1, sample_rate_factor=1, logger=None): + self._host = host + self._port = port + self._base_prefix = base_prefix + self._set_prefix(tail_prefix) + self._default_sample_rate = default_sample_rate + self._sample_rate_factor = sample_rate_factor + self.random = random + self.logger = logger + + # Determine if host is IPv4 or IPv6 + addr_info, self._sock_family = self._determine_sock_family(host, port) + + # NOTE: we use the original host value, not the DNS-resolved one + # because if host is a hostname, we don't want to cache the DNS + # resolution for the entire lifetime of this process. Let standard + # name resolution caching take effect. This should help operators use + # DNS trickery if they want. + if addr_info is not None: + # addr_info is a list of 5-tuples with the following structure: + # (family, socktype, proto, canonname, sockaddr) + # where sockaddr is the only thing of interest to us, and we only + # use the first result. We want to use the originally supplied + # host (see note above) and the remainder of the variable-length + # sockaddr: IPv4 has (address, port) while IPv6 has (address, + # port, flow info, scope id). + sockaddr = addr_info[0][-1] + self._target = (host,) + (sockaddr[1:]) + else: + self._target = (host, port) + + def _determine_sock_family(self, host, port): + addr_info = sock_family = None + try: + addr_info = socket.getaddrinfo(host, port, socket.AF_INET) + sock_family = socket.AF_INET + except socket.gaierror: + try: + addr_info = socket.getaddrinfo(host, port, socket.AF_INET6) + sock_family = socket.AF_INET6 + except socket.gaierror: + # Don't keep the server from starting from what could be a + # transient DNS failure. Any hostname will get re-resolved as + # necessary in the .sendto() calls. + # However, we don't know if we're IPv4 or IPv6 in this case, so + # we assume legacy IPv4. + sock_family = socket.AF_INET + return addr_info, sock_family + + def _set_prefix(self, tail_prefix): + """ + Modifies the prefix that is added to metric names. The resulting prefix + is the concatenation of the component parts `base_prefix` and + `tail_prefix`. Only truthy components are included. Each included + component is followed by a period, e.g.:: + + .. + . + . + + + Note: this method is expected to be called from the constructor only, + but exists to provide backwards compatible functionality for the + deprecated set_prefix() method. + + :param tail_prefix: The new value of tail_prefix + """ + if tail_prefix and self._base_prefix: + self._prefix = '.'.join([self._base_prefix, tail_prefix, '']) + elif tail_prefix: + self._prefix = tail_prefix + '.' + elif self._base_prefix: + self._prefix = self._base_prefix + '.' + else: + self._prefix = '' + + def set_prefix(self, tail_prefix): + """ + This method is deprecated; use the ``tail_prefix`` argument of the + constructor when instantiating the class instead. + """ + warnings.warn( + 'set_prefix() is deprecated; use the ``tail_prefix`` argument of ' + 'the constructor when instantiating the class instead.', + DeprecationWarning, stacklevel=2 + ) + self._set_prefix(tail_prefix) + + def _send(self, m_name, m_value, m_type, sample_rate): + if sample_rate is None: + sample_rate = self._default_sample_rate + sample_rate = sample_rate * self._sample_rate_factor + parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type] + if sample_rate < 1: + if self.random() < sample_rate: + parts.append('@%s' % (sample_rate,)) + else: + return + if six.PY3: + parts = [part.encode('utf-8') for part in parts] + # Ideally, we'd cache a sending socket in self, but that + # results in a socket getting shared by multiple green threads. + with closing(self._open_socket()) as sock: + try: + return sock.sendto(b'|'.join(parts), self._target) + except IOError as err: + if self.logger: + self.logger.warning( + 'Error sending UDP message to %(target)r: %(err)s', + {'target': self._target, 'err': err}) + + def _open_socket(self): + return socket.socket(self._sock_family, socket.SOCK_DGRAM) + + def update_stats(self, m_name, m_value, sample_rate=None): + return self._send(m_name, m_value, 'c', sample_rate) + + def increment(self, metric, sample_rate=None): + return self.update_stats(metric, 1, sample_rate) + + def decrement(self, metric, sample_rate=None): + return self.update_stats(metric, -1, sample_rate) + + def _timing(self, metric, timing_ms, sample_rate): + # This method was added to disagregate timing metrics when testing + return self._send(metric, round(timing_ms, 4), 'ms', sample_rate) + + def timing(self, metric, timing_ms, sample_rate=None): + return self._timing(metric, timing_ms, sample_rate) + + def timing_since(self, metric, orig_time, sample_rate=None): + return self._timing(metric, (time.time() - orig_time) * 1000, + sample_rate) + + def transfer_rate(self, metric, elapsed_time, byte_xfer, sample_rate=None): + if byte_xfer: + return self.timing(metric, + elapsed_time * 1000 / byte_xfer * 1000, + sample_rate) diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index a735e85a63..f1ac1ed83e 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -24,9 +24,7 @@ import collections import errno import fcntl import grp -import hashlib import json -import operator import os import pwd import re @@ -37,7 +35,7 @@ import time import uuid import functools import email.parser -from random import random, shuffle +from random import shuffle from contextlib import contextmanager, closing import ctypes import ctypes.util @@ -49,7 +47,6 @@ from tempfile import gettempdir, mkstemp, NamedTemporaryFile import glob import itertools import stat -import datetime import eventlet import eventlet.debug @@ -64,25 +61,16 @@ except ImportError: import pkg_resources from eventlet import GreenPool, sleep, Timeout from eventlet.event import Event -from eventlet.green import socket, threading +from eventlet.green import socket import eventlet.hubs import eventlet.queue -import codecs -utf8_decoder = codecs.getdecoder('utf-8') -utf8_encoder = codecs.getencoder('utf-8') import six -if six.PY2: - from eventlet.green import httplib as green_http_client -else: - from eventlet.green.http import client as green_http_client - utf16_decoder = codecs.getdecoder('utf-16') - utf16_encoder = codecs.getencoder('utf-16') + from six.moves import cPickle as pickle -from six.moves import configparser from six.moves.configparser import (ConfigParser, NoSectionError, - NoOptionError, RawConfigParser) -from six.moves import range, http_client -from six.moves.urllib.parse import quote as _quote, unquote + NoOptionError) +from six.moves import range +from six.moves.urllib.parse import unquote from six.moves.urllib.parse import urlparse from six.moves import UserList @@ -93,6 +81,53 @@ from swift.common.linkat import linkat # For backwards compatability with 3rd party middlewares from swift.common.registry import register_swift_info, get_swift_info # noqa + +from .base import ( # noqa + md5, get_valid_utf8_str, quote, split_path) +from swift.common.utils.logs import ( # noqa + SysLogHandler, # t.u.helpers.setup_servers monkey patch is sketch + logging_monkey_patch, + get_logger, + PrefixLoggerAdapter, + LogLevelFilter, + NullLogger, + capture_stdio, + SwiftLogFormatter, + SwiftLoggerAdapter, + LogAdapter, + LoggerFileObject, + PipeMutex, + NoopMutex, + ThreadSafeSysLogHandler, + StrAnonymizer, + get_log_line, + StrFormatTime, + LogStringFormatter, + get_policy_index, + LOG_LINE_DEFAULT_FORMAT, + NOTICE, +) +from swift.common.utils.config import ( # noqa + TRUE_VALUES, + NicerInterpolation, + config_true_value, + append_underscore, + non_negative_float, + non_negative_int, + config_positive_int_value, + config_float_value, + config_auto_int_value, + config_percent_value, + config_request_node_count_value, + config_fallocate_value, + config_read_prefixed_options, + config_read_reseller_options, + parse_prefixed_conf, + affinity_locality_predicate, + affinity_key_function, + readconf, + read_conf_dir, +) from swift.common.utils.libc import ( # noqa F_SETPIPE_SZ, load_libc_function, @@ -124,11 +159,9 @@ from swift.common.utils.ipaddrs import ( # noqa parse_socket_string, whataremyips, ) -from logging.handlers import SysLogHandler +from swift.common.statsd_client import StatsdClient # noqa import logging -NOTICE = 25 - # These are lazily pulled from libc elsewhere _sys_fallocate = None @@ -164,12 +197,6 @@ RESERVED_STR = u'\x00' RESERVED = '\x00' -LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \ - ':{time.H}:{time.M}:{time.S} +0000] ' \ - '"{method} {path}" {status} {content_length} ' \ - '"{referer}" "{txn_id}" "{user_agent}" ' \ - '{trans_time:.4f} "{additional_info}" {pid} ' \ - '{policy_index}' DEFAULT_LOCK_TIMEOUT = 10 # this is coupled with object-server.conf's network_chunk_size; if someone is # running that unreasonably small they may find this number inefficient, but in @@ -280,199 +307,6 @@ def backward(f, blocksize=4096): yield last_row -# Used when reading config values -TRUE_VALUES = {'true', '1', 'yes', 'on', 't', 'y'} - - -def non_negative_float(value): - """ - Check that the value casts to a float and is non-negative. - - :param value: value to check - :raises ValueError: if the value cannot be cast to a float or is negative. - :return: a float - """ - try: - value = float(value) - if value < 0: - raise ValueError - except (TypeError, ValueError): - raise ValueError('Value must be a non-negative float number, not "%s".' - % value) - return value - - -def non_negative_int(value): - """ - Check that the value casts to an int and is a whole number. - - :param value: value to check - :raises ValueError: if the value cannot be cast to an int or does not - represent a whole number. - :return: an int - """ - int_value = int(value) - if int_value != non_negative_float(value): - raise ValueError - return int_value - - -def config_true_value(value): - """ - Returns True if the value is either True or a string in TRUE_VALUES. - Returns False otherwise. - """ - return value is True or \ - (isinstance(value, six.string_types) and value.lower() in TRUE_VALUES) - - -def config_positive_int_value(value): - """ - Returns positive int value if it can be cast by int() and it's an - integer > 0. (not including zero) Raises ValueError otherwise. - """ - try: - result = int(value) - if result < 1: - raise ValueError() - except (TypeError, ValueError): - raise ValueError( - 'Config option must be an positive int number, not "%s".' % value) - return result - - -def config_float_value(value, minimum=None, maximum=None): - try: - val = float(value) - if minimum is not None and val < minimum: - raise ValueError() - if maximum is not None and val > maximum: - raise ValueError() - return val - except (TypeError, ValueError): - min_ = ', greater than %s' % minimum if minimum is not None else '' - max_ = ', less than %s' % maximum if maximum is not None else '' - raise ValueError('Config option must be a number%s%s, not "%s".' % - (min_, max_, value)) - - -def config_auto_int_value(value, default): - """ - Returns default if value is None or 'auto'. - Returns value as an int or raises ValueError otherwise. - """ - if value is None or \ - (isinstance(value, six.string_types) and value.lower() == 'auto'): - return default - try: - value = int(value) - except (TypeError, ValueError): - raise ValueError('Config option must be an integer or the ' - 'string "auto", not "%s".' % value) - return value - - -def config_percent_value(value): - try: - return config_float_value(value, 0, 100) / 100.0 - except ValueError as err: - raise ValueError("%s: %s" % (str(err), value)) - - -def config_request_node_count_value(value): - try: - value_parts = value.lower().split() - rnc_value = int(value_parts[0]) - except (ValueError, AttributeError): - pass - else: - if len(value_parts) == 1: - return lambda replicas: rnc_value - elif (len(value_parts) == 3 and - value_parts[1] == '*' and - value_parts[2] == 'replicas'): - return lambda replicas: rnc_value * replicas - raise ValueError( - 'Invalid request_node_count value: %r' % value) - - -def append_underscore(prefix): - if prefix and not prefix.endswith('_'): - prefix += '_' - return prefix - - -def config_read_reseller_options(conf, defaults): - """ - Read reseller_prefix option and associated options from configuration - - Reads the reseller_prefix option, then reads options that may be - associated with a specific reseller prefix. Reads options such that an - option without a prefix applies to all reseller prefixes unless an option - has an explicit prefix. - - :param conf: the configuration - :param defaults: a dict of default values. The key is the option - name. The value is either an array of strings or a string - :return: tuple of an array of reseller prefixes and a dict of option values - """ - reseller_prefix_opt = conf.get('reseller_prefix', 'AUTH').split(',') - reseller_prefixes = [] - for prefix in [pre.strip() for pre in reseller_prefix_opt if pre.strip()]: - if prefix == "''": - prefix = '' - prefix = append_underscore(prefix) - if prefix not in reseller_prefixes: - reseller_prefixes.append(prefix) - if len(reseller_prefixes) == 0: - reseller_prefixes.append('') - - # Get prefix-using config options - associated_options = {} - for prefix in reseller_prefixes: - associated_options[prefix] = dict(defaults) - associated_options[prefix].update( - config_read_prefixed_options(conf, '', defaults)) - prefix_name = prefix if prefix != '' else "''" - associated_options[prefix].update( - config_read_prefixed_options(conf, prefix_name, defaults)) - return reseller_prefixes, associated_options - - -def config_read_prefixed_options(conf, prefix_name, defaults): - """ - Read prefixed options from configuration - - :param conf: the configuration - :param prefix_name: the prefix (including, if needed, an underscore) - :param defaults: a dict of default values. The dict supplies the - option name and type (string or comma separated string) - :return: a dict containing the options - """ - params = {} - for option_name in defaults.keys(): - value = conf.get('%s%s' % (prefix_name, option_name)) - if value: - if isinstance(defaults.get(option_name), list): - params[option_name] = [] - for role in value.lower().split(','): - params[option_name].append(role.strip()) - else: - params[option_name] = value.strip() - return params - - -def logging_monkey_patch(): - # explicitly patch the logging lock - logging._lock = logging.threading.RLock() - # setup notice level logging - logging.addLevelName(NOTICE, 'NOTICE') - SysLogHandler.priority_map['NOTICE'] = 'notice' - # Trying to log threads while monkey-patched can lead to deadlocks; see - # https://bugs.launchpad.net/swift/+bug/1895739 - logging.logThreads = 0 - - def eventlet_monkey_patch(): """ Install the appropriate Eventlet monkey patches. @@ -505,171 +339,6 @@ def generate_trans_id(trans_id_suffix): uuid.uuid4().hex[:21], int(time.time()), quote(trans_id_suffix)) -def get_policy_index(req_headers, res_headers): - """ - Returns the appropriate index of the storage policy for the request from - a proxy server - - :param req_headers: dict of the request headers. - :param res_headers: dict of the response headers. - - :returns: string index of storage policy, or None - """ - header = 'X-Backend-Storage-Policy-Index' - policy_index = res_headers.get(header, req_headers.get(header)) - if isinstance(policy_index, six.binary_type) and not six.PY2: - policy_index = policy_index.decode('ascii') - return str(policy_index) if policy_index is not None else None - - -class LogStringFormatter(string.Formatter): - def __init__(self, default='', quote=False): - super(LogStringFormatter, self).__init__() - self.default = default - self.quote = quote - - def format_field(self, value, spec): - if not value: - return self.default - else: - log = super(LogStringFormatter, self).format_field(value, spec) - if self.quote: - return quote(log, ':/{}') - else: - return log - - -class StrAnonymizer(str): - """ - Class that permits to get a string anonymized or simply quoted. - """ - - def __new__(cls, data, method, salt): - method = method.lower() - if method not in (hashlib.algorithms if six.PY2 else - hashlib.algorithms_guaranteed): - raise ValueError('Unsupported hashing method: %r' % method) - s = str.__new__(cls, data or '') - s.method = method - s.salt = salt - return s - - @property - def anonymized(self): - if not self: - return self - else: - if self.method == 'md5': - h = md5(usedforsecurity=False) - else: - h = getattr(hashlib, self.method)() - if self.salt: - h.update(six.b(self.salt)) - h.update(six.b(self)) - return '{%s%s}%s' % ('S' if self.salt else '', self.method.upper(), - h.hexdigest()) - - -class StrFormatTime(object): - """ - Class that permits to get formats or parts of a time. - """ - - def __init__(self, ts): - self.time = ts - self.time_struct = time.gmtime(ts) - - def __str__(self): - return "%.9f" % self.time - - def __getattr__(self, attr): - if attr not in ['a', 'A', 'b', 'B', 'c', 'd', 'H', - 'I', 'j', 'm', 'M', 'p', 'S', 'U', - 'w', 'W', 'x', 'X', 'y', 'Y', 'Z']: - raise ValueError(("The attribute %s is not a correct directive " - "for time.strftime formater.") % attr) - return datetime.datetime(*self.time_struct[:-2], - tzinfo=UTC).strftime('%' + attr) - - @property - def asctime(self): - return time.asctime(self.time_struct) - - @property - def datetime(self): - return time.strftime('%d/%b/%Y/%H/%M/%S', self.time_struct) - - @property - def iso8601(self): - return time.strftime('%Y-%m-%dT%H:%M:%S', self.time_struct) - - @property - def ms(self): - return self.__str__().split('.')[1][:3] - - @property - def us(self): - return self.__str__().split('.')[1][:6] - - @property - def ns(self): - return self.__str__().split('.')[1] - - @property - def s(self): - return self.__str__().split('.')[0] - - -def get_log_line(req, res, trans_time, additional_info, fmt, - anonymization_method, anonymization_salt): - """ - Make a line for logging that matches the documented log line format - for backend servers. - - :param req: the request. - :param res: the response. - :param trans_time: the time the request took to complete, a float. - :param additional_info: a string to log at the end of the line - - :returns: a properly formatted line for logging. - """ - - policy_index = get_policy_index(req.headers, res.headers) - if req.path.startswith('/'): - disk, partition, account, container, obj = split_path(req.path, 0, 5, - True) - else: - disk, partition, account, container, obj = (None, ) * 5 - replacements = { - 'remote_addr': StrAnonymizer(req.remote_addr, anonymization_method, - anonymization_salt), - 'time': StrFormatTime(time.time()), - 'method': req.method, - 'path': StrAnonymizer(req.path, anonymization_method, - anonymization_salt), - 'disk': disk, - 'partition': partition, - 'account': StrAnonymizer(account, anonymization_method, - anonymization_salt), - 'container': StrAnonymizer(container, anonymization_method, - anonymization_salt), - 'object': StrAnonymizer(obj, anonymization_method, - anonymization_salt), - 'status': res.status.split()[0], - 'content_length': res.content_length, - 'referer': StrAnonymizer(req.referer, anonymization_method, - anonymization_salt), - 'txn_id': req.headers.get('x-trans-id'), - 'user_agent': StrAnonymizer(req.user_agent, anonymization_method, - anonymization_salt), - 'trans_time': trans_time, - 'additional_info': additional_info, - 'pid': os.getpid(), - 'policy_index': policy_index, - } - return LogStringFormatter(default='-').format(fmt, **replacements) - - def get_trans_id_time(trans_id): if len(trans_id) >= 34 and \ trans_id.startswith('tx') and trans_id[23] == '-': @@ -680,25 +349,6 @@ def get_trans_id_time(trans_id): return None -def config_fallocate_value(reserve_value): - """ - Returns fallocate reserve_value as an int or float. - Returns is_percent as a boolean. - Returns a ValueError on invalid fallocate value. - """ - try: - if str(reserve_value[-1:]) == '%': - reserve_value = float(reserve_value[:-1]) - is_percent = True - else: - reserve_value = int(reserve_value) - is_percent = False - except ValueError: - raise ValueError('Error: %s is an invalid value for fallocate' - '_reserve.' % reserve_value) - return reserve_value, is_percent - - class FileLikeIter(object): def __init__(self, iterable): @@ -1130,53 +780,6 @@ def link_fd_to_path(fd, target_path, dirs_created=0, retries=2, fsync=True): dirpath = os.path.dirname(dirpath) -def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): - """ - Validate and split the given HTTP request path. - - **Examples**:: - - ['a'] = split_path('/a') - ['a', None] = split_path('/a', 1, 2) - ['a', 'c'] = split_path('/a/c', 1, 2) - ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True) - - :param path: HTTP Request path to be split - :param minsegs: Minimum number of segments to be extracted - :param maxsegs: Maximum number of segments to be extracted - :param rest_with_last: If True, trailing data will be returned as part - of last segment. If False, and there is - trailing data, raises ValueError. - :returns: list of segments with a length of maxsegs (non-existent - segments will return as None) - :raises ValueError: if given an invalid path - """ - if not maxsegs: - maxsegs = minsegs - if minsegs > maxsegs: - raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs)) - if rest_with_last: - segs = path.split('/', maxsegs) - minsegs += 1 - maxsegs += 1 - count = len(segs) - if (segs[0] or count < minsegs or count > maxsegs or - '' in segs[1:minsegs]): - raise ValueError('Invalid path: %s' % quote(path)) - else: - minsegs += 1 - maxsegs += 1 - segs = path.split('/', maxsegs) - count = len(segs) - if (segs[0] or count < minsegs or count > maxsegs + 1 or - '' in segs[1:minsegs] or - (count == maxsegs + 1 and segs[maxsegs])): - raise ValueError('Invalid path: %s' % quote(path)) - segs = segs[1:maxsegs] - segs.extend([None] * (maxsegs - 1 - len(segs))) - return segs - - def validate_device_partition(device, partition): """ Validate that a device and a partition are valid and won't lead to @@ -1251,260 +854,6 @@ class GreenthreadSafeIterator(object): __next__ = next -class NullLogger(object): - """A no-op logger for eventlet wsgi.""" - - def write(self, *args): - # "Logs" the args to nowhere - pass - - def exception(self, *args): - pass - - def critical(self, *args): - pass - - def error(self, *args): - pass - - def warning(self, *args): - pass - - def info(self, *args): - pass - - def debug(self, *args): - pass - - def log(self, *args): - pass - - -class LoggerFileObject(object): - - # Note: this is greenthread-local storage - _cls_thread_local = threading.local() - - def __init__(self, logger, log_type='STDOUT'): - self.logger = logger - self.log_type = log_type - - def write(self, value): - # We can get into a nasty situation when logs are going to syslog - # and syslog dies. - # - # It's something like this: - # - # (A) someone logs something - # - # (B) there's an exception in sending to /dev/log since syslog is - # not working - # - # (C) logging takes that exception and writes it to stderr (see - # logging.Handler.handleError) - # - # (D) stderr was replaced with a LoggerFileObject at process start, - # so the LoggerFileObject takes the provided string and tells - # its logger to log it (to syslog, naturally). - # - # Then, steps B through D repeat until we run out of stack. - if getattr(self._cls_thread_local, 'already_called_write', False): - return - - self._cls_thread_local.already_called_write = True - try: - value = value.strip() - if value: - if 'Connection reset by peer' in value: - self.logger.error( - '%s: Connection reset by peer', self.log_type) - else: - self.logger.error('%(type)s: %(value)s', - {'type': self.log_type, 'value': value}) - finally: - self._cls_thread_local.already_called_write = False - - def writelines(self, values): - if getattr(self._cls_thread_local, 'already_called_writelines', False): - return - - self._cls_thread_local.already_called_writelines = True - try: - self.logger.error('%(type)s: %(value)s', - {'type': self.log_type, - 'value': '#012'.join(values)}) - finally: - self._cls_thread_local.already_called_writelines = False - - def close(self): - pass - - def flush(self): - pass - - def __iter__(self): - return self - - def next(self): - raise IOError(errno.EBADF, 'Bad file descriptor') - __next__ = next - - def read(self, size=-1): - raise IOError(errno.EBADF, 'Bad file descriptor') - - def readline(self, size=-1): - raise IOError(errno.EBADF, 'Bad file descriptor') - - def tell(self): - return 0 - - def xreadlines(self): - return self - - -class StatsdClient(object): - def __init__(self, host, port, base_prefix='', tail_prefix='', - default_sample_rate=1, sample_rate_factor=1, logger=None): - self._host = host - self._port = port - self._base_prefix = base_prefix - self._set_prefix(tail_prefix) - self._default_sample_rate = default_sample_rate - self._sample_rate_factor = sample_rate_factor - self.random = random - self.logger = logger - - # Determine if host is IPv4 or IPv6 - addr_info, self._sock_family = self._determine_sock_family(host, port) - - # NOTE: we use the original host value, not the DNS-resolved one - # because if host is a hostname, we don't want to cache the DNS - # resolution for the entire lifetime of this process. Let standard - # name resolution caching take effect. This should help operators use - # DNS trickery if they want. - if addr_info is not None: - # addr_info is a list of 5-tuples with the following structure: - # (family, socktype, proto, canonname, sockaddr) - # where sockaddr is the only thing of interest to us, and we only - # use the first result. We want to use the originally supplied - # host (see note above) and the remainder of the variable-length - # sockaddr: IPv4 has (address, port) while IPv6 has (address, - # port, flow info, scope id). - sockaddr = addr_info[0][-1] - self._target = (host,) + (sockaddr[1:]) - else: - self._target = (host, port) - - def _determine_sock_family(self, host, port): - addr_info = sock_family = None - try: - addr_info = socket.getaddrinfo(host, port, socket.AF_INET) - sock_family = socket.AF_INET - except socket.gaierror: - try: - addr_info = socket.getaddrinfo(host, port, socket.AF_INET6) - sock_family = socket.AF_INET6 - except socket.gaierror: - # Don't keep the server from starting from what could be a - # transient DNS failure. Any hostname will get re-resolved as - # necessary in the .sendto() calls. - # However, we don't know if we're IPv4 or IPv6 in this case, so - # we assume legacy IPv4. - sock_family = socket.AF_INET - return addr_info, sock_family - - def _set_prefix(self, tail_prefix): - """ - Modifies the prefix that is added to metric names. The resulting prefix - is the concatenation of the component parts `base_prefix` and - `tail_prefix`. Only truthy components are included. Each included - component is followed by a period, e.g.:: - - .. - . - . - - - Note: this method is expected to be called from the constructor only, - but exists to provide backwards compatible functionality for the - deprecated set_prefix() method. - - :param tail_prefix: The new value of tail_prefix - """ - if tail_prefix and self._base_prefix: - self._prefix = '.'.join([self._base_prefix, tail_prefix, '']) - elif tail_prefix: - self._prefix = tail_prefix + '.' - elif self._base_prefix: - self._prefix = self._base_prefix + '.' - else: - self._prefix = '' - - def set_prefix(self, tail_prefix): - """ - This method is deprecated; use the ``tail_prefix`` argument of the - constructor when instantiating the class instead. - """ - warnings.warn( - 'set_prefix() is deprecated; use the ``tail_prefix`` argument of ' - 'the constructor when instantiating the class instead.', - DeprecationWarning, stacklevel=2 - ) - self._set_prefix(tail_prefix) - - def _send(self, m_name, m_value, m_type, sample_rate): - if sample_rate is None: - sample_rate = self._default_sample_rate - sample_rate = sample_rate * self._sample_rate_factor - parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type] - if sample_rate < 1: - if self.random() < sample_rate: - parts.append('@%s' % (sample_rate,)) - else: - return - if six.PY3: - parts = [part.encode('utf-8') for part in parts] - # Ideally, we'd cache a sending socket in self, but that - # results in a socket getting shared by multiple green threads. - with closing(self._open_socket()) as sock: - try: - return sock.sendto(b'|'.join(parts), self._target) - except IOError as err: - if self.logger: - self.logger.warning( - 'Error sending UDP message to %(target)r: %(err)s', - {'target': self._target, 'err': err}) - - def _open_socket(self): - return socket.socket(self._sock_family, socket.SOCK_DGRAM) - - def update_stats(self, m_name, m_value, sample_rate=None): - return self._send(m_name, m_value, 'c', sample_rate) - - def increment(self, metric, sample_rate=None): - return self.update_stats(metric, 1, sample_rate) - - def decrement(self, metric, sample_rate=None): - return self.update_stats(metric, -1, sample_rate) - - def _timing(self, metric, timing_ms, sample_rate): - # This method was added to disagregate timing metrics when testing - return self._send(metric, round(timing_ms, 4), 'ms', sample_rate) - - def timing(self, metric, timing_ms, sample_rate=None): - return self._timing(metric, timing_ms, sample_rate) - - def timing_since(self, metric, orig_time, sample_rate=None): - return self._timing(metric, (time.time() - orig_time) * 1000, - sample_rate) - - def transfer_rate(self, metric, elapsed_time, byte_xfer, sample_rate=None): - if byte_xfer: - return self.timing(metric, - elapsed_time * 1000 / byte_xfer * 1000, - sample_rate) - - def timing_stats(**dec_kwargs): """ Returns a decorator that logs timing events or errors for public methods in @@ -1557,467 +906,6 @@ def memcached_timing_stats(**dec_kwargs): return decorating_func -class SwiftLoggerAdapter(logging.LoggerAdapter): - """ - A logging.LoggerAdapter subclass that also passes through StatsD method - calls. - - Like logging.LoggerAdapter, you have to subclass this and override the - process() method to accomplish anything useful. - """ - - @property - def name(self): - # py3 does this for us already; add it for py2 - return self.logger.name - - def update_stats(self, *a, **kw): - return self.logger.update_stats(*a, **kw) - - def increment(self, *a, **kw): - return self.logger.increment(*a, **kw) - - def decrement(self, *a, **kw): - return self.logger.decrement(*a, **kw) - - def timing(self, *a, **kw): - return self.logger.timing(*a, **kw) - - def timing_since(self, *a, **kw): - return self.logger.timing_since(*a, **kw) - - def transfer_rate(self, *a, **kw): - return self.logger.transfer_rate(*a, **kw) - - @property - def thread_locals(self): - return self.logger.thread_locals - - @thread_locals.setter - def thread_locals(self, thread_locals): - self.logger.thread_locals = thread_locals - - def exception(self, msg, *a, **kw): - # We up-call to exception() where stdlib uses error() so we can get - # some of the traceback suppression from LogAdapter, below - self.logger.exception(msg, *a, **kw) - - -class PrefixLoggerAdapter(SwiftLoggerAdapter): - """ - Adds an optional prefix to all its log messages. When the prefix has not - been set, messages are unchanged. - """ - - def set_prefix(self, prefix): - self.extra['prefix'] = prefix - - def exception(self, msg, *a, **kw): - if 'prefix' in self.extra: - msg = self.extra['prefix'] + msg - super(PrefixLoggerAdapter, self).exception(msg, *a, **kw) - - def process(self, msg, kwargs): - msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs) - if 'prefix' in self.extra: - msg = self.extra['prefix'] + msg - return (msg, kwargs) - - -# double inheritance to support property with setter -class LogAdapter(logging.LoggerAdapter, object): - """ - A Logger like object which performs some reformatting on calls to - :meth:`exception`. Can be used to store a threadlocal transaction id and - client ip. - """ - - _cls_thread_local = threading.local() - - def __init__(self, logger, server): - logging.LoggerAdapter.__init__(self, logger, {}) - self.server = server - self.warn = self.warning - - # There are a few properties needed for py35; see - # - https://bugs.python.org/issue31457 - # - https://github.com/python/cpython/commit/1bbd482 - # - https://github.com/python/cpython/commit/0b6a118 - # - https://github.com/python/cpython/commit/ce9e625 - def _log(self, level, msg, args, exc_info=None, extra=None, - stack_info=False): - """ - Low-level log implementation, proxied to allow nested logger adapters. - """ - return self.logger._log( - level, - msg, - args, - exc_info=exc_info, - extra=extra, - stack_info=stack_info, - ) - - @property - def manager(self): - return self.logger.manager - - @manager.setter - def manager(self, value): - self.logger.manager = value - - @property - def name(self): - return self.logger.name - - @property - def txn_id(self): - if hasattr(self._cls_thread_local, 'txn_id'): - return self._cls_thread_local.txn_id - - @txn_id.setter - def txn_id(self, value): - self._cls_thread_local.txn_id = value - - @property - def client_ip(self): - if hasattr(self._cls_thread_local, 'client_ip'): - return self._cls_thread_local.client_ip - - @client_ip.setter - def client_ip(self, value): - self._cls_thread_local.client_ip = value - - @property - def thread_locals(self): - return (self.txn_id, self.client_ip) - - @thread_locals.setter - def thread_locals(self, value): - self.txn_id, self.client_ip = value - - def getEffectiveLevel(self): - return self.logger.getEffectiveLevel() - - def process(self, msg, kwargs): - """ - Add extra info to message - """ - kwargs['extra'] = {'server': self.server, 'txn_id': self.txn_id, - 'client_ip': self.client_ip} - return msg, kwargs - - def notice(self, msg, *args, **kwargs): - """ - Convenience function for syslog priority LOG_NOTICE. The python - logging lvl is set to 25, just above info. SysLogHandler is - monkey patched to map this log lvl to the LOG_NOTICE syslog - priority. - """ - self.log(NOTICE, msg, *args, **kwargs) - - def _exception(self, msg, *args, **kwargs): - logging.LoggerAdapter.exception(self, msg, *args, **kwargs) - - def exception(self, msg, *args, **kwargs): - _junk, exc, _junk = sys.exc_info() - call = self.error - emsg = '' - if isinstance(exc, (http_client.BadStatusLine, - green_http_client.BadStatusLine)): - # Use error(); not really exceptional - emsg = repr(exc) - # Note that on py3, we've seen a RemoteDisconnected error getting - # raised, which inherits from *both* BadStatusLine and OSError; - # we want it getting caught here - elif isinstance(exc, (OSError, socket.error)): - if exc.errno in (errno.EIO, errno.ENOSPC): - emsg = str(exc) - elif exc.errno == errno.ECONNREFUSED: - emsg = 'Connection refused' - elif exc.errno == errno.ECONNRESET: - emsg = 'Connection reset' - elif exc.errno == errno.EHOSTUNREACH: - emsg = 'Host unreachable' - elif exc.errno == errno.ENETUNREACH: - emsg = 'Network unreachable' - elif exc.errno == errno.ETIMEDOUT: - emsg = 'Connection timeout' - elif exc.errno == errno.EPIPE: - emsg = 'Broken pipe' - else: - call = self._exception - elif isinstance(exc, eventlet.Timeout): - emsg = exc.__class__.__name__ - detail = '%ss' % exc.seconds - if hasattr(exc, 'created_at'): - detail += ' after %0.2fs' % (time.time() - exc.created_at) - emsg += ' (%s)' % detail - if isinstance(exc, swift.common.exceptions.MessageTimeout): - if exc.msg: - emsg += ' %s' % exc.msg - else: - call = self._exception - call('%s: %s' % (msg, emsg), *args, **kwargs) - - def set_statsd_prefix(self, prefix): - """ - This method is deprecated. Callers should use the - ``statsd_tail_prefix`` argument of ``get_logger`` when instantiating a - logger. - - The StatsD client prefix defaults to the "name" of the logger. This - method may override that default with a specific value. Currently used - in the proxy-server to differentiate the Account, Container, and Object - controllers. - """ - warnings.warn( - 'set_statsd_prefix() is deprecated; use the ' - '``statsd_tail_prefix`` argument to ``get_logger`` instead.', - DeprecationWarning, stacklevel=2 - ) - if self.logger.statsd_client: - self.logger.statsd_client._set_prefix(prefix) - - def statsd_delegate(statsd_func_name): - """ - Factory to create methods which delegate to methods on - self.logger.statsd_client (an instance of StatsdClient). The - created methods conditionally delegate to a method whose name is given - in 'statsd_func_name'. The created delegate methods are a no-op when - StatsD logging is not configured. - - :param statsd_func_name: the name of a method on StatsdClient. - """ - func = getattr(StatsdClient, statsd_func_name) - - @functools.wraps(func) - def wrapped(self, *a, **kw): - if getattr(self.logger, 'statsd_client'): - func = getattr(self.logger.statsd_client, statsd_func_name) - return func(*a, **kw) - return wrapped - - update_stats = statsd_delegate('update_stats') - increment = statsd_delegate('increment') - decrement = statsd_delegate('decrement') - timing = statsd_delegate('timing') - timing_since = statsd_delegate('timing_since') - transfer_rate = statsd_delegate('transfer_rate') - - -class SwiftLogFormatter(logging.Formatter): - """ - Custom logging.Formatter will append txn_id to a log message if the - record has one and the message does not. Optionally it can shorten - overly long log lines. - """ - - def __init__(self, fmt=None, datefmt=None, max_line_length=0): - logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt) - self.max_line_length = max_line_length - - def format(self, record): - if not hasattr(record, 'server'): - # Catch log messages that were not initiated by swift - # (for example, the keystone auth middleware) - record.server = record.name - - # Included from Python's logging.Formatter and then altered slightly to - # replace \n with #012 - record.message = record.getMessage() - if self._fmt.find('%(asctime)') >= 0: - record.asctime = self.formatTime(record, self.datefmt) - msg = (self._fmt % record.__dict__).replace('\n', '#012') - if record.exc_info: - # Cache the traceback text to avoid converting it multiple times - # (it's constant anyway) - if not record.exc_text: - record.exc_text = self.formatException( - record.exc_info).replace('\n', '#012') - if record.exc_text: - if not msg.endswith('#012'): - msg = msg + '#012' - msg = msg + record.exc_text - - if (hasattr(record, 'txn_id') and record.txn_id and - record.txn_id not in msg): - msg = "%s (txn: %s)" % (msg, record.txn_id) - if (hasattr(record, 'client_ip') and record.client_ip and - record.levelno != logging.INFO and - record.client_ip not in msg): - msg = "%s (client_ip: %s)" % (msg, record.client_ip) - if self.max_line_length > 0 and len(msg) > self.max_line_length: - if self.max_line_length < 7: - msg = msg[:self.max_line_length] - else: - approxhalf = (self.max_line_length - 5) // 2 - msg = msg[:approxhalf] + " ... " + msg[-approxhalf:] - return msg - - -class LogLevelFilter(object): - """ - Drop messages for the logger based on level. - - This is useful when dependencies log too much information. - - :param level: All messages at or below this level are dropped - (DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL) - Default: DEBUG - """ - - def __init__(self, level=logging.DEBUG): - self.level = level - - def filter(self, record): - if record.levelno <= self.level: - return 0 - return 1 - - -def get_logger(conf, name=None, log_to_console=False, log_route=None, - fmt="%(server)s: %(message)s", statsd_tail_prefix=None): - """ - Get the current system logger using config settings. - - **Log config and defaults**:: - - log_facility = LOG_LOCAL0 - log_level = INFO - log_name = swift - log_max_line_length = 0 - log_udp_host = (disabled) - log_udp_port = logging.handlers.SYSLOG_UDP_PORT - log_address = /dev/log - log_statsd_host = (disabled) - log_statsd_port = 8125 - log_statsd_default_sample_rate = 1.0 - log_statsd_sample_rate_factor = 1.0 - log_statsd_metric_prefix = (empty-string) - - :param conf: Configuration dict to read settings from - :param name: This value is used to populate the ``server`` field in the log - format, as the prefix for statsd messages, and as the default - value for ``log_route``; defaults to the ``log_name`` value in - ``conf``, if it exists, or to 'swift'. - :param log_to_console: Add handler which writes to console on stderr - :param log_route: Route for the logging, not emitted to the log, just used - to separate logging configurations; defaults to the value - of ``name`` or whatever ``name`` defaults to. This value - is used as the name attribute of the - ``logging.LogAdapter`` that is returned. - :param fmt: Override log format - :param statsd_tail_prefix: tail prefix to pass to statsd client; if None - then the tail prefix defaults to the value of ``name``. - :return: an instance of ``LogAdapter`` - """ - # note: log_name is typically specified in conf (i.e. defined by - # operators), whereas log_route is typically hard-coded in callers of - # get_logger (i.e. defined by developers) - if not conf: - conf = {} - if name is None: - name = conf.get('log_name', 'swift') - if not log_route: - log_route = name - logger = logging.getLogger(log_route) - logger.propagate = False - # all new handlers will get the same formatter - formatter = SwiftLogFormatter( - fmt=fmt, max_line_length=int(conf.get('log_max_line_length', 0))) - - # get_logger will only ever add one SysLog Handler to a logger - if not hasattr(get_logger, 'handler4logger'): - get_logger.handler4logger = {} - if logger in get_logger.handler4logger: - logger.removeHandler(get_logger.handler4logger[logger]) - - # facility for this logger will be set by last call wins - facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'), - SysLogHandler.LOG_LOCAL0) - udp_host = conf.get('log_udp_host') - if udp_host: - udp_port = int(conf.get('log_udp_port', - logging.handlers.SYSLOG_UDP_PORT)) - handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port), - facility=facility) - else: - log_address = conf.get('log_address', '/dev/log') - handler = None - try: - mode = os.stat(log_address).st_mode - if stat.S_ISSOCK(mode): - handler = ThreadSafeSysLogHandler(address=log_address, - facility=facility) - except (OSError, socket.error) as e: - # If either /dev/log isn't a UNIX socket or it does not exist at - # all then py2 would raise an error - if e.errno not in [errno.ENOTSOCK, errno.ENOENT]: - raise - if handler is None: - # fallback to default UDP - handler = ThreadSafeSysLogHandler(facility=facility) - handler.setFormatter(formatter) - logger.addHandler(handler) - get_logger.handler4logger[logger] = handler - - # setup console logging - if log_to_console or hasattr(get_logger, 'console_handler4logger'): - # remove pre-existing console handler for this logger - if not hasattr(get_logger, 'console_handler4logger'): - get_logger.console_handler4logger = {} - if logger in get_logger.console_handler4logger: - logger.removeHandler(get_logger.console_handler4logger[logger]) - - console_handler = logging.StreamHandler(sys.__stderr__) - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) - get_logger.console_handler4logger[logger] = console_handler - - # set the level for the logger - logger.setLevel( - getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO)) - - # Setup logger with a StatsD client if so configured - statsd_host = conf.get('log_statsd_host') - if statsd_host: - statsd_port = int(conf.get('log_statsd_port', 8125)) - base_prefix = conf.get('log_statsd_metric_prefix', '') - default_sample_rate = float(conf.get( - 'log_statsd_default_sample_rate', 1)) - sample_rate_factor = float(conf.get( - 'log_statsd_sample_rate_factor', 1)) - if statsd_tail_prefix is None: - statsd_tail_prefix = name - statsd_client = StatsdClient(statsd_host, statsd_port, base_prefix, - statsd_tail_prefix, default_sample_rate, - sample_rate_factor, logger=logger) - logger.statsd_client = statsd_client - else: - logger.statsd_client = None - - adapted_logger = LogAdapter(logger, name) - other_handlers = conf.get('log_custom_handlers', None) - if other_handlers: - log_custom_handlers = [s.strip() for s in other_handlers.split(',') - if s.strip()] - for hook in log_custom_handlers: - try: - mod, fnc = hook.rsplit('.', 1) - logger_hook = getattr(__import__(mod, fromlist=[fnc]), fnc) - logger_hook(conf, name, log_to_console, log_route, fmt, - logger, adapted_logger) - except (AttributeError, ImportError): - print('Error calling custom handler [%s]' % hook, - file=sys.stderr) - except ValueError: - print('Invalid custom handler format [%s]' % hook, - file=sys.stderr) - - return adapted_logger - - def get_hub(): """ Checks whether poll is available and falls back @@ -2095,43 +983,6 @@ def clean_up_daemon_hygiene(): os.umask(0o22) # ensure files are created with the correct privileges -def capture_stdio(logger, **kwargs): - """ - Log unhandled exceptions, close stdio, capture stdout and stderr. - - param logger: Logger object to use - """ - # log uncaught exceptions - sys.excepthook = lambda * exc_info: \ - logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) - - # collect stdio file desc not in use for logging - stdio_files = [sys.stdin, sys.stdout, sys.stderr] - console_fds = [h.stream.fileno() for _junk, h in getattr( - get_logger, 'console_handler4logger', {}).items()] - stdio_files = [f for f in stdio_files if f.fileno() not in console_fds] - - with open(os.devnull, 'r+b') as nullfile: - # close stdio (excludes fds open for logging) - for f in stdio_files: - # some platforms throw an error when attempting an stdin flush - try: - f.flush() - except IOError: - pass - - try: - os.dup2(nullfile.fileno(), f.fileno()) - except OSError: - pass - - # redirect stdio - if kwargs.pop('capture_stdout', True): - sys.stdout = LoggerFileObject(logger) - if kwargs.pop('capture_stderr', True): - sys.stderr = LoggerFileObject(logger, 'STDERR') - - def parse_options(parser=None, once=False, test_config=False, test_args=None): """Parse standard swift server/daemon options with optparse.OptionParser. @@ -2518,119 +1369,6 @@ def cache_from_env(env, allow_none=False): return item_from_env(env, 'swift.cache', allow_none) -def read_conf_dir(parser, conf_dir): - conf_files = [] - for f in os.listdir(conf_dir): - if f.endswith('.conf') and not f.startswith('.'): - conf_files.append(os.path.join(conf_dir, f)) - return parser.read(sorted(conf_files)) - - -if six.PY2: - NicerInterpolation = None # just don't cause ImportErrors over in wsgi.py -else: - class NicerInterpolation(configparser.BasicInterpolation): - def before_get(self, parser, section, option, value, defaults): - if '%(' not in value: - return value - return super(NicerInterpolation, self).before_get( - parser, section, option, value, defaults) - - -def readconf(conf_path, section_name=None, log_name=None, defaults=None, - raw=False): - """ - Read config file(s) and return config items as a dict - - :param conf_path: path to config file/directory, or a file-like object - (hasattr readline) - :param section_name: config section to read (will return all sections if - not defined) - :param log_name: name to be used with logging (will use section_name if - not defined) - :param defaults: dict of default values to pre-populate the config with - :returns: dict of config items - :raises ValueError: if section_name does not exist - :raises IOError: if reading the file failed - """ - if defaults is None: - defaults = {} - if raw: - c = RawConfigParser(defaults) - else: - if six.PY2: - c = ConfigParser(defaults) - else: - # In general, we haven't really thought much about interpolation - # in configs. Python's default ConfigParser has always supported - # it, though, so *we* got it "for free". Unfortunatley, since we - # "supported" interpolation, we have to assume there are - # deployments in the wild that use it, and try not to break them. - # So, do what we can to mimic the py2 behavior of passing through - # values like "1%" (which we want to support for - # fallocate_reserve). - c = ConfigParser(defaults, interpolation=NicerInterpolation()) - c.optionxform = str # Don't lower-case keys - - if hasattr(conf_path, 'readline'): - if hasattr(conf_path, 'seek'): - conf_path.seek(0) - if six.PY2: - c.readfp(conf_path) - else: - c.read_file(conf_path) - else: - if os.path.isdir(conf_path): - # read all configs in directory - success = read_conf_dir(c, conf_path) - else: - success = c.read(conf_path) - if not success: - raise IOError("Unable to read config from %s" % - conf_path) - if section_name: - if c.has_section(section_name): - conf = dict(c.items(section_name)) - else: - raise ValueError( - "Unable to find %(section)s config section in %(conf)s" % - {'section': section_name, 'conf': conf_path}) - if "log_name" not in conf: - if log_name is not None: - conf['log_name'] = log_name - else: - conf['log_name'] = section_name - else: - conf = {} - for s in c.sections(): - conf.update({s: dict(c.items(s))}) - if 'log_name' not in conf: - conf['log_name'] = log_name - conf['__file__'] = conf_path - return conf - - -def parse_prefixed_conf(conf_file, prefix): - """ - Search the config file for any common-prefix sections and load those - sections to a dict mapping the after-prefix reference to options. - - :param conf_file: the file name of the config to parse - :param prefix: the common prefix of the sections - :return: a dict mapping policy reference -> dict of policy options - :raises ValueError: if a policy config section has an invalid name - """ - - ret_config = {} - all_conf = readconf(conf_file) - for section, options in all_conf.items(): - if not section.startswith(prefix): - continue - target_ref = section[len(prefix):] - ret_config[target_ref] = options - return ret_config - - def write_pickle(obj, dest, tmp=None, pickle_protocol=0): """ Ensure that a pickle file gets written to disk. The file @@ -3255,112 +1993,6 @@ def validate_sync_to(value, allowed_sync_hosts, realms_conf): return (None, value, None, None) -def affinity_key_function(affinity_str): - """Turns an affinity config value into a function suitable for passing to - sort(). After doing so, the array will be sorted with respect to the given - ordering. - - For example, if affinity_str is "r1=1, r2z7=2, r2z8=2", then the array - will be sorted with all nodes from region 1 (r1=1) first, then all the - nodes from region 2 zones 7 and 8 (r2z7=2 and r2z8=2), then everything - else. - - Note that the order of the pieces of affinity_str is irrelevant; the - priority values are what comes after the equals sign. - - If affinity_str is empty or all whitespace, then the resulting function - will not alter the ordering of the nodes. - - :param affinity_str: affinity config value, e.g. "r1z2=3" - or "r1=1, r2z1=2, r2z2=2" - :returns: single-argument function - :raises ValueError: if argument invalid - """ - affinity_str = affinity_str.strip() - - if not affinity_str: - return lambda x: 0 - - priority_matchers = [] - pieces = [s.strip() for s in affinity_str.split(',')] - for piece in pieces: - # matches r= or rz= - match = re.match(r"r(\d+)(?:z(\d+))?=(\d+)$", piece) - if match: - region, zone, priority = match.groups() - region = int(region) - priority = int(priority) - zone = int(zone) if zone else None - - matcher = {'region': region, 'priority': priority} - if zone is not None: - matcher['zone'] = zone - priority_matchers.append(matcher) - else: - raise ValueError("Invalid affinity value: %r" % affinity_str) - - priority_matchers.sort(key=operator.itemgetter('priority')) - - def keyfn(ring_node): - for matcher in priority_matchers: - if (matcher['region'] == ring_node['region'] - and ('zone' not in matcher - or matcher['zone'] == ring_node['zone'])): - return matcher['priority'] - return 4294967296 # 2^32, i.e. "a big number" - return keyfn - - -def affinity_locality_predicate(write_affinity_str): - """ - Turns a write-affinity config value into a predicate function for nodes. - The returned value will be a 1-arg function that takes a node dictionary - and returns a true value if it is "local" and a false value otherwise. The - definition of "local" comes from the affinity_str argument passed in here. - - For example, if affinity_str is "r1, r2z2", then only nodes where region=1 - or where (region=2 and zone=2) are considered local. - - If affinity_str is empty or all whitespace, then the resulting function - will consider everything local - - :param write_affinity_str: affinity config value, e.g. "r1z2" - or "r1, r2z1, r2z2" - :returns: single-argument function, or None if affinity_str is empty - :raises ValueError: if argument invalid - """ - affinity_str = write_affinity_str.strip() - - if not affinity_str: - return None - - matchers = [] - pieces = [s.strip() for s in affinity_str.split(',')] - for piece in pieces: - # matches r or rz - match = re.match(r"r(\d+)(?:z(\d+))?$", piece) - if match: - region, zone = match.groups() - region = int(region) - zone = int(zone) if zone else None - - matcher = {'region': region} - if zone is not None: - matcher['zone'] = zone - matchers.append(matcher) - else: - raise ValueError("Invalid write-affinity value: %r" % affinity_str) - - def is_local(ring_node): - for matcher in matchers: - if (matcher['region'] == ring_node['region'] - and ('zone' not in matcher - or matcher['zone'] == ring_node['zone'])): - return True - return False - return is_local - - def get_remote_client(req): # remote host for zeus client = req.headers.get('x-cluster-client-ip') @@ -3616,30 +2248,6 @@ def rsync_module_interpolation(template, device): return module -def get_valid_utf8_str(str_or_unicode): - """ - Get valid parts of utf-8 str from str, unicode and even invalid utf-8 str - - :param str_or_unicode: a string or an unicode which can be invalid utf-8 - """ - if six.PY2: - if isinstance(str_or_unicode, six.text_type): - (str_or_unicode, _len) = utf8_encoder(str_or_unicode, 'replace') - (valid_unicode_str, _len) = utf8_decoder(str_or_unicode, 'replace') - else: - # Apparently under py3 we need to go to utf-16 to collapse surrogates? - if isinstance(str_or_unicode, six.binary_type): - try: - (str_or_unicode, _len) = utf8_decoder(str_or_unicode, - 'surrogatepass') - except UnicodeDecodeError: - (str_or_unicode, _len) = utf8_decoder(str_or_unicode, - 'replace') - (str_or_unicode, _len) = utf16_encoder(str_or_unicode, 'surrogatepass') - (valid_unicode_str, _len) = utf16_decoder(str_or_unicode, 'replace') - return valid_unicode_str.encode('utf-8') - - class Everything(object): """ A container that contains everything. If "e" is an instance of @@ -4172,16 +2780,6 @@ def clean_content_type(value): return value -def quote(value, safe='/'): - """ - Patched version of urllib.quote that encodes utf-8 strings before quoting - """ - quoted = _quote(get_valid_utf8_str(value), safe) - if isinstance(value, six.binary_type): - quoted = quoted.encode('utf-8') - return quoted - - def get_expirer_container(x_delete_at, expirer_divisor, acc, cont, obj): """ Returns an expiring object container name for given X-Delete-At and @@ -4564,27 +3162,6 @@ def parse_content_disposition(header): return header, attributes -try: - _test_md5 = hashlib.md5(usedforsecurity=False) # nosec - - def md5(string=b'', usedforsecurity=True): - """Return an md5 hashlib object using usedforsecurity parameter - - For python distributions that support the usedforsecurity keyword - parameter, this passes the parameter through as expected. - See https://bugs.python.org/issue9216 - """ - return hashlib.md5(string, usedforsecurity=usedforsecurity) # nosec -except TypeError: - def md5(string=b'', usedforsecurity=True): - """Return an md5 hashlib object without usedforsecurity parameter - - For python distributions that do not yet support this keyword - parameter, we drop the parameter - """ - return hashlib.md5(string) # nosec - - class NamespaceOuterBound(object): """ A custom singleton type to be subclassed for the outer bounds of @@ -5975,162 +4552,6 @@ def load_pkg_resource(group, uri): return entry_points[0].load() -class PipeMutex(object): - """ - Mutex using a pipe. Works across both greenlets and real threads, even - at the same time. - """ - - def __init__(self): - self.rfd, self.wfd = os.pipe() - - # You can't create a pipe in non-blocking mode; you must set it - # later. - rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) - fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) - os.write(self.wfd, b'-') # start unlocked - - self.owner = None - self.recursion_depth = 0 - - # Usually, it's an error to have multiple greenthreads all waiting - # to read the same file descriptor. It's often a sign of inadequate - # concurrency control; for example, if you have two greenthreads - # trying to use the same memcache connection, they'll end up writing - # interleaved garbage to the socket or stealing part of each others' - # responses. - # - # In this case, we have multiple greenthreads waiting on the same - # file descriptor by design. This lets greenthreads in real thread A - # wait with greenthreads in real thread B for the same mutex. - # Therefore, we must turn off eventlet's multiple-reader detection. - # - # It would be better to turn off multiple-reader detection for only - # our calls to trampoline(), but eventlet does not support that. - eventlet.debug.hub_prevent_multiple_readers(False) - - def acquire(self, blocking=True): - """ - Acquire the mutex. - - If called with blocking=False, returns True if the mutex was - acquired and False if it wasn't. Otherwise, blocks until the mutex - is acquired and returns True. - - This lock is recursive; the same greenthread may acquire it as many - times as it wants to, though it must then release it that many times - too. - """ - current_greenthread_id = id(eventlet.greenthread.getcurrent()) - if self.owner == current_greenthread_id: - self.recursion_depth += 1 - return True - - while True: - try: - # If there is a byte available, this will read it and remove - # it from the pipe. If not, this will raise OSError with - # errno=EAGAIN. - os.read(self.rfd, 1) - self.owner = current_greenthread_id - return True - except OSError as err: - if err.errno != errno.EAGAIN: - raise - - if not blocking: - return False - - # Tell eventlet to suspend the current greenthread until - # self.rfd becomes readable. This will happen when someone - # else writes to self.wfd. - eventlet.hubs.trampoline(self.rfd, read=True) - - def release(self): - """ - Release the mutex. - """ - current_greenthread_id = id(eventlet.greenthread.getcurrent()) - if self.owner != current_greenthread_id: - raise RuntimeError("cannot release un-acquired lock") - - if self.recursion_depth > 0: - self.recursion_depth -= 1 - return - - self.owner = None - os.write(self.wfd, b'X') - - def close(self): - """ - Close the mutex. This releases its file descriptors. - - You can't use a mutex after it's been closed. - """ - if self.wfd is not None: - os.close(self.rfd) - self.rfd = None - os.close(self.wfd) - self.wfd = None - self.owner = None - self.recursion_depth = 0 - - def __del__(self): - # We need this so we don't leak file descriptors. Otherwise, if you - # call get_logger() and don't explicitly dispose of it by calling - # logger.logger.handlers[0].lock.close() [1], the pipe file - # descriptors are leaked. - # - # This only really comes up in tests. Swift processes tend to call - # get_logger() once and then hang on to it until they exit, but the - # test suite calls get_logger() a lot. - # - # [1] and that's a completely ridiculous thing to expect callers to - # do, so nobody does it and that's okay. - self.close() - - -class NoopMutex(object): - """ - "Mutex" that doesn't lock anything. - - We only allow our syslog logging to be configured via UDS or UDP, neither - of which have the message-interleaving trouble you'd expect from TCP or - file handlers. - """ - - def __init__(self): - # Usually, it's an error to have multiple greenthreads all waiting - # to write to the same file descriptor. It's often a sign of inadequate - # concurrency control; for example, if you have two greenthreads - # trying to use the same memcache connection, they'll end up writing - # interleaved garbage to the socket or stealing part of each others' - # responses. - # - # In this case, we have multiple greenthreads waiting on the same - # (logging) file descriptor by design. So, similar to the PipeMutex, - # we must turn off eventlet's multiple-waiter detection. - # - # It would be better to turn off multiple-reader detection for only - # the logging socket fd, but eventlet does not support that. - eventlet.debug.hub_prevent_multiple_readers(False) - - def acquire(self, blocking=True): - pass - - def release(self): - pass - - -class ThreadSafeSysLogHandler(SysLogHandler): - def createLock(self): - if config_true_value(os.environ.get( - 'SWIFT_NOOP_LOGGING_MUTEX') or 'true'): - self.lock = NoopMutex() - else: - self.lock = PipeMutex() - - def round_robin_iter(its): """ Takes a list of iterators, yield an element from each in a round-robin diff --git a/swift/common/utils/base.py b/swift/common/utils/base.py new file mode 100644 index 0000000000..50cbac1ab0 --- /dev/null +++ b/swift/common/utils/base.py @@ -0,0 +1,136 @@ +# Copyright (c) 2010-2024 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Miscellaneous utility functions that may be used in other utils modules. + +This module is imported by other utils modules. +This module should not import from other utils modules. +""" + +import codecs +import hashlib + +import six +from six.moves.urllib.parse import quote as _quote + + +try: + _test_md5 = hashlib.md5(usedforsecurity=False) # nosec + + def md5(string=b'', usedforsecurity=True): + """Return an md5 hashlib object using usedforsecurity parameter + + For python distributions that support the usedforsecurity keyword + parameter, this passes the parameter through as expected. + See https://bugs.python.org/issue9216 + """ + return hashlib.md5(string, usedforsecurity=usedforsecurity) # nosec +except TypeError: + def md5(string=b'', usedforsecurity=True): + """Return an md5 hashlib object without usedforsecurity parameter + + For python distributions that do not yet support this keyword + parameter, we drop the parameter + """ + return hashlib.md5(string) # nosec + + +utf8_decoder = codecs.getdecoder('utf-8') +utf8_encoder = codecs.getencoder('utf-8') +if not six.PY2: + # Apparently under py3 we need to go to utf-16 to collapse surrogates? + utf16_decoder = codecs.getdecoder('utf-16') + utf16_encoder = codecs.getencoder('utf-16') + + +def get_valid_utf8_str(str_or_unicode): + """ + Get valid parts of utf-8 str from str, unicode and even invalid utf-8 str + + :param str_or_unicode: a string or an unicode which can be invalid utf-8 + """ + if six.PY2: + if isinstance(str_or_unicode, six.text_type): + (str_or_unicode, _len) = utf8_encoder(str_or_unicode, 'replace') + (valid_unicode_str, _len) = utf8_decoder(str_or_unicode, 'replace') + else: + if isinstance(str_or_unicode, six.binary_type): + try: + (str_or_unicode, _len) = utf8_decoder(str_or_unicode, + 'surrogatepass') + except UnicodeDecodeError: + (str_or_unicode, _len) = utf8_decoder(str_or_unicode, + 'replace') + (str_or_unicode, _len) = utf16_encoder(str_or_unicode, 'surrogatepass') + (valid_unicode_str, _len) = utf16_decoder(str_or_unicode, 'replace') + return valid_unicode_str.encode('utf-8') + + +def quote(value, safe='/'): + """ + Patched version of urllib.quote that encodes utf-8 strings before quoting + """ + quoted = _quote(get_valid_utf8_str(value), safe) + if isinstance(value, six.binary_type): + quoted = quoted.encode('utf-8') + return quoted + + +def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False): + """ + Validate and split the given HTTP request path. + + **Examples**:: + + ['a'] = split_path('/a') + ['a', None] = split_path('/a', 1, 2) + ['a', 'c'] = split_path('/a/c', 1, 2) + ['a', 'c', 'o/r'] = split_path('/a/c/o/r', 1, 3, True) + + :param path: HTTP Request path to be split + :param minsegs: Minimum number of segments to be extracted + :param maxsegs: Maximum number of segments to be extracted + :param rest_with_last: If True, trailing data will be returned as part + of last segment. If False, and there is + trailing data, raises ValueError. + :returns: list of segments with a length of maxsegs (non-existent + segments will return as None) + :raises ValueError: if given an invalid path + """ + if not maxsegs: + maxsegs = minsegs + if minsegs > maxsegs: + raise ValueError('minsegs > maxsegs: %d > %d' % (minsegs, maxsegs)) + if rest_with_last: + segs = path.split('/', maxsegs) + minsegs += 1 + maxsegs += 1 + count = len(segs) + if (segs[0] or count < minsegs or count > maxsegs or + '' in segs[1:minsegs]): + raise ValueError('Invalid path: %s' % quote(path)) + else: + minsegs += 1 + maxsegs += 1 + segs = path.split('/', maxsegs) + count = len(segs) + if (segs[0] or count < minsegs or count > maxsegs + 1 or + '' in segs[1:minsegs] or + (count == maxsegs + 1 and segs[maxsegs])): + raise ValueError('Invalid path: %s' % quote(path)) + segs = segs[1:maxsegs] + segs.extend([None] * (maxsegs - 1 - len(segs))) + return segs diff --git a/swift/common/utils/config.py b/swift/common/utils/config.py new file mode 100644 index 0000000000..057efc3133 --- /dev/null +++ b/swift/common/utils/config.py @@ -0,0 +1,440 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six +import os +import operator +import re +from six.moves import configparser +from six.moves.configparser import (ConfigParser, RawConfigParser) + +# Used when reading config values +TRUE_VALUES = {'true', '1', 'yes', 'on', 't', 'y'} + + +def config_true_value(value): + """ + Returns True if the value is either True or a string in TRUE_VALUES. + Returns False otherwise. + """ + return value is True or \ + (isinstance(value, six.string_types) and value.lower() in TRUE_VALUES) + + +def non_negative_float(value): + """ + Check that the value casts to a float and is non-negative. + + :param value: value to check + :raises ValueError: if the value cannot be cast to a float or is negative. + :return: a float + """ + try: + value = float(value) + if value < 0: + raise ValueError + except (TypeError, ValueError): + raise ValueError('Value must be a non-negative float number, not "%s".' + % value) + return value + + +def non_negative_int(value): + """ + Check that the value casts to an int and is a whole number. + + :param value: value to check + :raises ValueError: if the value cannot be cast to an int or does not + represent a whole number. + :return: an int + """ + int_value = int(value) + if int_value != non_negative_float(value): + raise ValueError + return int_value + + +def config_positive_int_value(value): + """ + Returns positive int value if it can be cast by int() and it's an + integer > 0. (not including zero) Raises ValueError otherwise. + """ + try: + result = int(value) + if result < 1: + raise ValueError() + except (TypeError, ValueError): + raise ValueError( + 'Config option must be an positive int number, not "%s".' % value) + return result + + +def config_float_value(value, minimum=None, maximum=None): + try: + val = float(value) + if minimum is not None and val < minimum: + raise ValueError() + if maximum is not None and val > maximum: + raise ValueError() + return val + except (TypeError, ValueError): + min_ = ', greater than %s' % minimum if minimum is not None else '' + max_ = ', less than %s' % maximum if maximum is not None else '' + raise ValueError('Config option must be a number%s%s, not "%s".' % + (min_, max_, value)) + + +def config_auto_int_value(value, default): + """ + Returns default if value is None or 'auto'. + Returns value as an int or raises ValueError otherwise. + """ + if value is None or \ + (isinstance(value, six.string_types) and value.lower() == 'auto'): + return default + try: + value = int(value) + except (TypeError, ValueError): + raise ValueError('Config option must be an integer or the ' + 'string "auto", not "%s".' % value) + return value + + +def config_percent_value(value): + try: + return config_float_value(value, 0, 100) / 100.0 + except ValueError as err: + raise ValueError("%s: %s" % (str(err), value)) + + +def config_request_node_count_value(value): + try: + value_parts = value.lower().split() + rnc_value = int(value_parts[0]) + except (ValueError, AttributeError): + pass + else: + if len(value_parts) == 1: + return lambda replicas: rnc_value + elif (len(value_parts) == 3 and + value_parts[1] == '*' and + value_parts[2] == 'replicas'): + return lambda replicas: rnc_value * replicas + raise ValueError( + 'Invalid request_node_count value: %r' % value) + + +def config_fallocate_value(reserve_value): + """ + Returns fallocate reserve_value as an int or float. + Returns is_percent as a boolean. + Returns a ValueError on invalid fallocate value. + """ + try: + if str(reserve_value[-1:]) == '%': + reserve_value = float(reserve_value[:-1]) + is_percent = True + else: + reserve_value = int(reserve_value) + is_percent = False + except ValueError: + raise ValueError('Error: %s is an invalid value for fallocate' + '_reserve.' % reserve_value) + return reserve_value, is_percent + + +def config_read_prefixed_options(conf, prefix_name, defaults): + """ + Read prefixed options from configuration + + :param conf: the configuration + :param prefix_name: the prefix (including, if needed, an underscore) + :param defaults: a dict of default values. The dict supplies the + option name and type (string or comma separated string) + :return: a dict containing the options + """ + params = {} + for option_name in defaults.keys(): + value = conf.get('%s%s' % (prefix_name, option_name)) + if value: + if isinstance(defaults.get(option_name), list): + params[option_name] = [] + for role in value.lower().split(','): + params[option_name].append(role.strip()) + else: + params[option_name] = value.strip() + return params + + +def append_underscore(prefix): + if prefix and not prefix.endswith('_'): + prefix += '_' + return prefix + + +def config_read_reseller_options(conf, defaults): + """ + Read reseller_prefix option and associated options from configuration + + Reads the reseller_prefix option, then reads options that may be + associated with a specific reseller prefix. Reads options such that an + option without a prefix applies to all reseller prefixes unless an option + has an explicit prefix. + + :param conf: the configuration + :param defaults: a dict of default values. The key is the option + name. The value is either an array of strings or a string + :return: tuple of an array of reseller prefixes and a dict of option values + """ + reseller_prefix_opt = conf.get('reseller_prefix', 'AUTH').split(',') + reseller_prefixes = [] + for prefix in [pre.strip() for pre in reseller_prefix_opt if pre.strip()]: + if prefix == "''": + prefix = '' + prefix = append_underscore(prefix) + if prefix not in reseller_prefixes: + reseller_prefixes.append(prefix) + if len(reseller_prefixes) == 0: + reseller_prefixes.append('') + + # Get prefix-using config options + associated_options = {} + for prefix in reseller_prefixes: + associated_options[prefix] = dict(defaults) + associated_options[prefix].update( + config_read_prefixed_options(conf, '', defaults)) + prefix_name = prefix if prefix != '' else "''" + associated_options[prefix].update( + config_read_prefixed_options(conf, prefix_name, defaults)) + return reseller_prefixes, associated_options + + +def affinity_key_function(affinity_str): + """Turns an affinity config value into a function suitable for passing to + sort(). After doing so, the array will be sorted with respect to the given + ordering. + + For example, if affinity_str is "r1=1, r2z7=2, r2z8=2", then the array + will be sorted with all nodes from region 1 (r1=1) first, then all the + nodes from region 2 zones 7 and 8 (r2z7=2 and r2z8=2), then everything + else. + + Note that the order of the pieces of affinity_str is irrelevant; the + priority values are what comes after the equals sign. + + If affinity_str is empty or all whitespace, then the resulting function + will not alter the ordering of the nodes. + + :param affinity_str: affinity config value, e.g. "r1z2=3" + or "r1=1, r2z1=2, r2z2=2" + :returns: single-argument function + :raises ValueError: if argument invalid + """ + affinity_str = affinity_str.strip() + + if not affinity_str: + return lambda x: 0 + + priority_matchers = [] + pieces = [s.strip() for s in affinity_str.split(',')] + for piece in pieces: + # matches r= or rz= + match = re.match(r"r(\d+)(?:z(\d+))?=(\d+)$", piece) + if match: + region, zone, priority = match.groups() + region = int(region) + priority = int(priority) + zone = int(zone) if zone else None + + matcher = {'region': region, 'priority': priority} + if zone is not None: + matcher['zone'] = zone + priority_matchers.append(matcher) + else: + raise ValueError("Invalid affinity value: %r" % affinity_str) + + priority_matchers.sort(key=operator.itemgetter('priority')) + + def keyfn(ring_node): + for matcher in priority_matchers: + if (matcher['region'] == ring_node['region'] + and ('zone' not in matcher + or matcher['zone'] == ring_node['zone'])): + return matcher['priority'] + return 4294967296 # 2^32, i.e. "a big number" + return keyfn + + +def affinity_locality_predicate(write_affinity_str): + """ + Turns a write-affinity config value into a predicate function for nodes. + The returned value will be a 1-arg function that takes a node dictionary + and returns a true value if it is "local" and a false value otherwise. The + definition of "local" comes from the affinity_str argument passed in here. + + For example, if affinity_str is "r1, r2z2", then only nodes where region=1 + or where (region=2 and zone=2) are considered local. + + If affinity_str is empty or all whitespace, then the resulting function + will consider everything local + + :param write_affinity_str: affinity config value, e.g. "r1z2" + or "r1, r2z1, r2z2" + :returns: single-argument function, or None if affinity_str is empty + :raises ValueError: if argument invalid + """ + affinity_str = write_affinity_str.strip() + + if not affinity_str: + return None + + matchers = [] + pieces = [s.strip() for s in affinity_str.split(',')] + for piece in pieces: + # matches r or rz + match = re.match(r"r(\d+)(?:z(\d+))?$", piece) + if match: + region, zone = match.groups() + region = int(region) + zone = int(zone) if zone else None + + matcher = {'region': region} + if zone is not None: + matcher['zone'] = zone + matchers.append(matcher) + else: + raise ValueError("Invalid write-affinity value: %r" % affinity_str) + + def is_local(ring_node): + for matcher in matchers: + if (matcher['region'] == ring_node['region'] + and ('zone' not in matcher + or matcher['zone'] == ring_node['zone'])): + return True + return False + return is_local + + +def read_conf_dir(parser, conf_dir): + conf_files = [] + for f in os.listdir(conf_dir): + if f.endswith('.conf') and not f.startswith('.'): + conf_files.append(os.path.join(conf_dir, f)) + return parser.read(sorted(conf_files)) + + +if six.PY2: + NicerInterpolation = None # just don't cause ImportErrors over in wsgi.py +else: + class NicerInterpolation(configparser.BasicInterpolation): + def before_get(self, parser, section, option, value, defaults): + if '%(' not in value: + return value + return super(NicerInterpolation, self).before_get( + parser, section, option, value, defaults) + + +def readconf(conf_path, section_name=None, log_name=None, defaults=None, + raw=False): + """ + Read config file(s) and return config items as a dict + + :param conf_path: path to config file/directory, or a file-like object + (hasattr readline) + :param section_name: config section to read (will return all sections if + not defined) + :param log_name: name to be used with logging (will use section_name if + not defined) + :param defaults: dict of default values to pre-populate the config with + :returns: dict of config items + :raises ValueError: if section_name does not exist + :raises IOError: if reading the file failed + """ + if defaults is None: + defaults = {} + if raw: + c = RawConfigParser(defaults) + else: + if six.PY2: + c = ConfigParser(defaults) + else: + # In general, we haven't really thought much about interpolation + # in configs. Python's default ConfigParser has always supported + # it, though, so *we* got it "for free". Unfortunatley, since we + # "supported" interpolation, we have to assume there are + # deployments in the wild that use it, and try not to break them. + # So, do what we can to mimic the py2 behavior of passing through + # values like "1%" (which we want to support for + # fallocate_reserve). + c = ConfigParser(defaults, interpolation=NicerInterpolation()) + c.optionxform = str # Don't lower-case keys + + if hasattr(conf_path, 'readline'): + if hasattr(conf_path, 'seek'): + conf_path.seek(0) + if six.PY2: + c.readfp(conf_path) + else: + c.read_file(conf_path) + else: + if os.path.isdir(conf_path): + # read all configs in directory + success = read_conf_dir(c, conf_path) + else: + success = c.read(conf_path) + if not success: + raise IOError("Unable to read config from %s" % + conf_path) + if section_name: + if c.has_section(section_name): + conf = dict(c.items(section_name)) + else: + raise ValueError( + "Unable to find %(section)s config section in %(conf)s" % + {'section': section_name, 'conf': conf_path}) + if "log_name" not in conf: + if log_name is not None: + conf['log_name'] = log_name + else: + conf['log_name'] = section_name + else: + conf = {} + for s in c.sections(): + conf.update({s: dict(c.items(s))}) + if 'log_name' not in conf: + conf['log_name'] = log_name + conf['__file__'] = conf_path + return conf + + +def parse_prefixed_conf(conf_file, prefix): + """ + Search the config file for any common-prefix sections and load those + sections to a dict mapping the after-prefix reference to options. + + :param conf_file: the file name of the config to parse + :param prefix: the common prefix of the sections + :return: a dict mapping policy reference -> dict of policy options + :raises ValueError: if a policy config section has an invalid name + """ + + ret_config = {} + all_conf = readconf(conf_file) + for section, options in all_conf.items(): + if not section.startswith(prefix): + continue + target_ref = section[len(prefix):] + ret_config[target_ref] = options + return ret_config diff --git a/swift/common/utils/logs.py b/swift/common/utils/logs.py new file mode 100644 index 0000000000..3a69f4b720 --- /dev/null +++ b/swift/common/utils/logs.py @@ -0,0 +1,995 @@ +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import errno +import hashlib +import logging +from logging.handlers import SysLogHandler +import os +import socket +import stat +import string +import sys +import functools +import time +import warnings +import fcntl +import eventlet +import six +import datetime + +from swift.common.utils.base import md5, quote, split_path +from swift.common.utils.timestamp import UTC +from swift.common.utils.config import config_true_value +from swift.common import statsd_client, exceptions + +if six.PY2: + from eventlet.green import httplib as green_http_client +else: + from eventlet.green.http import client as green_http_client +from six.moves import http_client +from eventlet.green import threading + + +NOTICE = 25 + +LOG_LINE_DEFAULT_FORMAT = '{remote_addr} - - [{time.d}/{time.b}/{time.Y}' \ + ':{time.H}:{time.M}:{time.S} +0000] ' \ + '"{method} {path}" {status} {content_length} ' \ + '"{referer}" "{txn_id}" "{user_agent}" ' \ + '{trans_time:.4f} "{additional_info}" {pid} ' \ + '{policy_index}' + + +def logging_monkey_patch(): + # explicitly patch the logging lock + logging._lock = logging.threading.RLock() + # setup notice level logging + logging.addLevelName(NOTICE, 'NOTICE') + SysLogHandler.priority_map['NOTICE'] = 'notice' + # Trying to log threads while monkey-patched can lead to deadlocks; see + # https://bugs.launchpad.net/swift/+bug/1895739 + logging.logThreads = 0 + + +class PipeMutex(object): + """ + Mutex using a pipe. Works across both greenlets and real threads, even + at the same time. + """ + + def __init__(self): + self.rfd, self.wfd = os.pipe() + + # You can't create a pipe in non-blocking mode; you must set it + # later. + rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) + fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) + os.write(self.wfd, b'-') # start unlocked + + self.owner = None + self.recursion_depth = 0 + + # Usually, it's an error to have multiple greenthreads all waiting + # to read the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # file descriptor by design. This lets greenthreads in real thread A + # wait with greenthreads in real thread B for the same mutex. + # Therefore, we must turn off eventlet's multiple-reader detection. + # + # It would be better to turn off multiple-reader detection for only + # our calls to trampoline(), but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + """ + Acquire the mutex. + + If called with blocking=False, returns True if the mutex was + acquired and False if it wasn't. Otherwise, blocks until the mutex + is acquired and returns True. + + This lock is recursive; the same greenthread may acquire it as many + times as it wants to, though it must then release it that many times + too. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner == current_greenthread_id: + self.recursion_depth += 1 + return True + + while True: + try: + # If there is a byte available, this will read it and remove + # it from the pipe. If not, this will raise OSError with + # errno=EAGAIN. + os.read(self.rfd, 1) + self.owner = current_greenthread_id + return True + except OSError as err: + if err.errno != errno.EAGAIN: + raise + + if not blocking: + return False + + # Tell eventlet to suspend the current greenthread until + # self.rfd becomes readable. This will happen when someone + # else writes to self.wfd. + eventlet.hubs.trampoline(self.rfd, read=True) + + def release(self): + """ + Release the mutex. + """ + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner != current_greenthread_id: + raise RuntimeError("cannot release un-acquired lock") + + if self.recursion_depth > 0: + self.recursion_depth -= 1 + return + + self.owner = None + os.write(self.wfd, b'X') + + def close(self): + """ + Close the mutex. This releases its file descriptors. + + You can't use a mutex after it's been closed. + """ + if self.wfd is not None: + os.close(self.rfd) + self.rfd = None + os.close(self.wfd) + self.wfd = None + self.owner = None + self.recursion_depth = 0 + + def __del__(self): + # We need this so we don't leak file descriptors. Otherwise, if you + # call get_logger() and don't explicitly dispose of it by calling + # logger.logger.handlers[0].lock.close() [1], the pipe file + # descriptors are leaked. + # + # This only really comes up in tests. Swift processes tend to call + # get_logger() once and then hang on to it until they exit, but the + # test suite calls get_logger() a lot. + # + # [1] and that's a completely ridiculous thing to expect callers to + # do, so nobody does it and that's okay. + self.close() + + +class NoopMutex(object): + """ + "Mutex" that doesn't lock anything. + + We only allow our syslog logging to be configured via UDS or UDP, neither + of which have the message-interleaving trouble you'd expect from TCP or + file handlers. + """ + + def __init__(self): + # Usually, it's an error to have multiple greenthreads all waiting + # to write to the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # (logging) file descriptor by design. So, similar to the PipeMutex, + # we must turn off eventlet's multiple-waiter detection. + # + # It would be better to turn off multiple-reader detection for only + # the logging socket fd, but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def acquire(self, blocking=True): + pass + + def release(self): + pass + + +class ThreadSafeSysLogHandler(SysLogHandler): + def createLock(self): + if config_true_value(os.environ.get( + 'SWIFT_NOOP_LOGGING_MUTEX') or 'true'): + self.lock = NoopMutex() + else: + self.lock = PipeMutex() + + +# double inheritance to support property with setter +class LogAdapter(logging.LoggerAdapter, object): + """ + A Logger like object which performs some reformatting on calls to + :meth:`exception`. Can be used to store a threadlocal transaction id and + client ip. + """ + + _cls_thread_local = threading.local() + + def __init__(self, logger, server): + logging.LoggerAdapter.__init__(self, logger, {}) + self.server = server + self.warn = self.warning + + # There are a few properties needed for py35; see + # - https://bugs.python.org/issue31457 + # - https://github.com/python/cpython/commit/1bbd482 + # - https://github.com/python/cpython/commit/0b6a118 + # - https://github.com/python/cpython/commit/ce9e625 + def _log(self, level, msg, args, exc_info=None, extra=None, + stack_info=False): + """ + Low-level log implementation, proxied to allow nested logger adapters. + """ + return self.logger._log( + level, + msg, + args, + exc_info=exc_info, + extra=extra, + stack_info=stack_info, + ) + + @property + def manager(self): + return self.logger.manager + + @manager.setter + def manager(self, value): + self.logger.manager = value + + @property + def name(self): + return self.logger.name + + @property + def txn_id(self): + if hasattr(self._cls_thread_local, 'txn_id'): + return self._cls_thread_local.txn_id + + @txn_id.setter + def txn_id(self, value): + self._cls_thread_local.txn_id = value + + @property + def client_ip(self): + if hasattr(self._cls_thread_local, 'client_ip'): + return self._cls_thread_local.client_ip + + @client_ip.setter + def client_ip(self, value): + self._cls_thread_local.client_ip = value + + @property + def thread_locals(self): + return (self.txn_id, self.client_ip) + + @thread_locals.setter + def thread_locals(self, value): + self.txn_id, self.client_ip = value + + def getEffectiveLevel(self): + return self.logger.getEffectiveLevel() + + def process(self, msg, kwargs): + """ + Add extra info to message + """ + kwargs['extra'] = {'server': self.server, 'txn_id': self.txn_id, + 'client_ip': self.client_ip} + return msg, kwargs + + def notice(self, msg, *args, **kwargs): + """ + Convenience function for syslog priority LOG_NOTICE. The python + logging lvl is set to 25, just above info. SysLogHandler is + monkey patched to map this log lvl to the LOG_NOTICE syslog + priority. + """ + self.log(NOTICE, msg, *args, **kwargs) + + def _exception(self, msg, *args, **kwargs): + logging.LoggerAdapter.exception(self, msg, *args, **kwargs) + + def exception(self, msg, *args, **kwargs): + _junk, exc, _junk = sys.exc_info() + call = self.error + emsg = '' + if isinstance(exc, (http_client.BadStatusLine, + green_http_client.BadStatusLine)): + # Use error(); not really exceptional + emsg = repr(exc) + # Note that on py3, we've seen a RemoteDisconnected error getting + # raised, which inherits from *both* BadStatusLine and OSError; + # we want it getting caught here + elif isinstance(exc, (OSError, socket.error)): + if exc.errno in (errno.EIO, errno.ENOSPC): + emsg = str(exc) + elif exc.errno == errno.ECONNREFUSED: + emsg = 'Connection refused' + elif exc.errno == errno.ECONNRESET: + emsg = 'Connection reset' + elif exc.errno == errno.EHOSTUNREACH: + emsg = 'Host unreachable' + elif exc.errno == errno.ENETUNREACH: + emsg = 'Network unreachable' + elif exc.errno == errno.ETIMEDOUT: + emsg = 'Connection timeout' + elif exc.errno == errno.EPIPE: + emsg = 'Broken pipe' + else: + call = self._exception + elif isinstance(exc, eventlet.Timeout): + emsg = exc.__class__.__name__ + detail = '%ss' % exc.seconds + if hasattr(exc, 'created_at'): + detail += ' after %0.2fs' % (time.time() - exc.created_at) + emsg += ' (%s)' % detail + if isinstance(exc, exceptions.MessageTimeout): + if exc.msg: + emsg += ' %s' % exc.msg + else: + call = self._exception + call('%s: %s' % (msg, emsg), *args, **kwargs) + + def set_statsd_prefix(self, prefix): + """ + This method is deprecated. Callers should use the + ``statsd_tail_prefix`` argument of ``get_logger`` when instantiating a + logger. + + The StatsD client prefix defaults to the "name" of the logger. This + method may override that default with a specific value. Currently used + in the proxy-server to differentiate the Account, Container, and Object + controllers. + """ + warnings.warn( + 'set_statsd_prefix() is deprecated; use the ' + '``statsd_tail_prefix`` argument to ``get_logger`` instead.', + DeprecationWarning, stacklevel=2 + ) + if self.logger.statsd_client: + self.logger.statsd_client._set_prefix(prefix) + + def statsd_delegate(statsd_func_name): + """ + Factory to create methods which delegate to methods on + self.logger.statsd_client (an instance of StatsdClient). The + created methods conditionally delegate to a method whose name is given + in 'statsd_func_name'. The created delegate methods are a no-op when + StatsD logging is not configured. + + :param statsd_func_name: the name of a method on StatsdClient. + """ + func = getattr(statsd_client.StatsdClient, statsd_func_name) + + @functools.wraps(func) + def wrapped(self, *a, **kw): + if getattr(self.logger, 'statsd_client'): + func = getattr(self.logger.statsd_client, statsd_func_name) + return func(*a, **kw) + return wrapped + + update_stats = statsd_delegate('update_stats') + increment = statsd_delegate('increment') + decrement = statsd_delegate('decrement') + timing = statsd_delegate('timing') + timing_since = statsd_delegate('timing_since') + transfer_rate = statsd_delegate('transfer_rate') + + +class SwiftLogFormatter(logging.Formatter): + """ + Custom logging.Formatter will append txn_id to a log message if the + record has one and the message does not. Optionally it can shorten + overly long log lines. + """ + + def __init__(self, fmt=None, datefmt=None, max_line_length=0): + logging.Formatter.__init__(self, fmt=fmt, datefmt=datefmt) + self.max_line_length = max_line_length + + def format(self, record): + if not hasattr(record, 'server'): + # Catch log messages that were not initiated by swift + # (for example, the keystone auth middleware) + record.server = record.name + + # Included from Python's logging.Formatter and then altered slightly to + # replace \n with #012 + record.message = record.getMessage() + if self._fmt.find('%(asctime)') >= 0: + record.asctime = self.formatTime(record, self.datefmt) + msg = (self._fmt % record.__dict__).replace('\n', '#012') + if record.exc_info: + # Cache the traceback text to avoid converting it multiple times + # (it's constant anyway) + if not record.exc_text: + record.exc_text = self.formatException( + record.exc_info).replace('\n', '#012') + if record.exc_text: + if not msg.endswith('#012'): + msg = msg + '#012' + msg = msg + record.exc_text + + if (hasattr(record, 'txn_id') and record.txn_id and + record.txn_id not in msg): + msg = "%s (txn: %s)" % (msg, record.txn_id) + if (hasattr(record, 'client_ip') and record.client_ip and + record.levelno != logging.INFO and + record.client_ip not in msg): + msg = "%s (client_ip: %s)" % (msg, record.client_ip) + if self.max_line_length > 0 and len(msg) > self.max_line_length: + if self.max_line_length < 7: + msg = msg[:self.max_line_length] + else: + approxhalf = (self.max_line_length - 5) // 2 + msg = msg[:approxhalf] + " ... " + msg[-approxhalf:] + return msg + + +class LoggerFileObject(object): + + # Note: this is greenthread-local storage + _cls_thread_local = threading.local() + + def __init__(self, logger, log_type='STDOUT'): + self.logger = logger + self.log_type = log_type + + def write(self, value): + # We can get into a nasty situation when logs are going to syslog + # and syslog dies. + # + # It's something like this: + # + # (A) someone logs something + # + # (B) there's an exception in sending to /dev/log since syslog is + # not working + # + # (C) logging takes that exception and writes it to stderr (see + # logging.Handler.handleError) + # + # (D) stderr was replaced with a LoggerFileObject at process start, + # so the LoggerFileObject takes the provided string and tells + # its logger to log it (to syslog, naturally). + # + # Then, steps B through D repeat until we run out of stack. + if getattr(self._cls_thread_local, 'already_called_write', False): + return + + self._cls_thread_local.already_called_write = True + try: + value = value.strip() + if value: + if 'Connection reset by peer' in value: + self.logger.error( + '%s: Connection reset by peer', self.log_type) + else: + self.logger.error('%(type)s: %(value)s', + {'type': self.log_type, 'value': value}) + finally: + self._cls_thread_local.already_called_write = False + + def writelines(self, values): + if getattr(self._cls_thread_local, 'already_called_writelines', False): + return + + self._cls_thread_local.already_called_writelines = True + try: + self.logger.error('%(type)s: %(value)s', + {'type': self.log_type, + 'value': '#012'.join(values)}) + finally: + self._cls_thread_local.already_called_writelines = False + + def close(self): + pass + + def flush(self): + pass + + def __iter__(self): + return self + + def next(self): + raise IOError(errno.EBADF, 'Bad file descriptor') + __next__ = next + + def read(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def readline(self, size=-1): + raise IOError(errno.EBADF, 'Bad file descriptor') + + def tell(self): + return 0 + + def xreadlines(self): + return self + + +class SwiftLoggerAdapter(logging.LoggerAdapter): + """ + A logging.LoggerAdapter subclass that also passes through StatsD method + calls. + + Like logging.LoggerAdapter, you have to subclass this and override the + process() method to accomplish anything useful. + """ + + @property + def name(self): + # py3 does this for us already; add it for py2 + return self.logger.name + + def update_stats(self, *a, **kw): + return self.logger.update_stats(*a, **kw) + + def increment(self, *a, **kw): + return self.logger.increment(*a, **kw) + + def decrement(self, *a, **kw): + return self.logger.decrement(*a, **kw) + + def timing(self, *a, **kw): + return self.logger.timing(*a, **kw) + + def timing_since(self, *a, **kw): + return self.logger.timing_since(*a, **kw) + + def transfer_rate(self, *a, **kw): + return self.logger.transfer_rate(*a, **kw) + + @property + def thread_locals(self): + return self.logger.thread_locals + + @thread_locals.setter + def thread_locals(self, thread_locals): + self.logger.thread_locals = thread_locals + + def exception(self, msg, *a, **kw): + # We up-call to exception() where stdlib uses error() so we can get + # some of the traceback suppression from LogAdapter, below + self.logger.exception(msg, *a, **kw) + + +class PrefixLoggerAdapter(SwiftLoggerAdapter): + """ + Adds an optional prefix to all its log messages. When the prefix has not + been set, messages are unchanged. + """ + + def set_prefix(self, prefix): + self.extra['prefix'] = prefix + + def exception(self, msg, *a, **kw): + if 'prefix' in self.extra: + msg = self.extra['prefix'] + msg + super(PrefixLoggerAdapter, self).exception(msg, *a, **kw) + + def process(self, msg, kwargs): + msg, kwargs = super(PrefixLoggerAdapter, self).process(msg, kwargs) + if 'prefix' in self.extra: + msg = self.extra['prefix'] + msg + return (msg, kwargs) + + +class LogLevelFilter(object): + """ + Drop messages for the logger based on level. + + This is useful when dependencies log too much information. + + :param level: All messages at or below this level are dropped + (DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL) + Default: DEBUG + """ + + def __init__(self, level=logging.DEBUG): + self.level = level + + def filter(self, record): + if record.levelno <= self.level: + return 0 + return 1 + + +def get_logger(conf, name=None, log_to_console=False, log_route=None, + fmt="%(server)s: %(message)s", statsd_tail_prefix=None): + """ + Get the current system logger using config settings. + + **Log config and defaults**:: + + log_facility = LOG_LOCAL0 + log_level = INFO + log_name = swift + log_max_line_length = 0 + log_udp_host = (disabled) + log_udp_port = logging.handlers.SYSLOG_UDP_PORT + log_address = /dev/log + log_statsd_host = (disabled) + log_statsd_port = 8125 + log_statsd_default_sample_rate = 1.0 + log_statsd_sample_rate_factor = 1.0 + log_statsd_metric_prefix = (empty-string) + + :param conf: Configuration dict to read settings from + :param name: This value is used to populate the ``server`` field in the log + format, as the prefix for statsd messages, and as the default + value for ``log_route``; defaults to the ``log_name`` value in + ``conf``, if it exists, or to 'swift'. + :param log_to_console: Add handler which writes to console on stderr + :param log_route: Route for the logging, not emitted to the log, just used + to separate logging configurations; defaults to the value + of ``name`` or whatever ``name`` defaults to. This value + is used as the name attribute of the + ``logging.LogAdapter`` that is returned. + :param fmt: Override log format + :param statsd_tail_prefix: tail prefix to pass to statsd client; if None + then the tail prefix defaults to the value of ``name``. + :return: an instance of ``LogAdapter`` + """ + # note: log_name is typically specified in conf (i.e. defined by + # operators), whereas log_route is typically hard-coded in callers of + # get_logger (i.e. defined by developers) + if not conf: + conf = {} + if name is None: + name = conf.get('log_name', 'swift') + if not log_route: + log_route = name + logger = logging.getLogger(log_route) + logger.propagate = False + # all new handlers will get the same formatter + formatter = SwiftLogFormatter( + fmt=fmt, max_line_length=int(conf.get('log_max_line_length', 0))) + + # get_logger will only ever add one SysLog Handler to a logger + if not hasattr(get_logger, 'handler4logger'): + get_logger.handler4logger = {} + if logger in get_logger.handler4logger: + logger.removeHandler(get_logger.handler4logger[logger]) + + # facility for this logger will be set by last call wins + facility = getattr(SysLogHandler, conf.get('log_facility', 'LOG_LOCAL0'), + SysLogHandler.LOG_LOCAL0) + udp_host = conf.get('log_udp_host') + if udp_host: + udp_port = int(conf.get('log_udp_port', + logging.handlers.SYSLOG_UDP_PORT)) + handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port), + facility=facility) + else: + log_address = conf.get('log_address', '/dev/log') + handler = None + try: + mode = os.stat(log_address).st_mode + if stat.S_ISSOCK(mode): + handler = ThreadSafeSysLogHandler(address=log_address, + facility=facility) + except (OSError, socket.error) as e: + # If either /dev/log isn't a UNIX socket or it does not exist at + # all then py2 would raise an error + if e.errno not in [errno.ENOTSOCK, errno.ENOENT]: + raise + if handler is None: + # fallback to default UDP + handler = ThreadSafeSysLogHandler(facility=facility) + handler.setFormatter(formatter) + logger.addHandler(handler) + get_logger.handler4logger[logger] = handler + + # setup console logging + if log_to_console or hasattr(get_logger, 'console_handler4logger'): + # remove pre-existing console handler for this logger + if not hasattr(get_logger, 'console_handler4logger'): + get_logger.console_handler4logger = {} + if logger in get_logger.console_handler4logger: + logger.removeHandler(get_logger.console_handler4logger[logger]) + + console_handler = logging.StreamHandler(sys.__stderr__) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + get_logger.console_handler4logger[logger] = console_handler + + # set the level for the logger + logger.setLevel( + getattr(logging, conf.get('log_level', 'INFO').upper(), logging.INFO)) + + # Setup logger with a StatsD client if so configured + statsd_host = conf.get('log_statsd_host') + if statsd_host: + statsd_port = int(conf.get('log_statsd_port', 8125)) + base_prefix = conf.get('log_statsd_metric_prefix', '') + default_sample_rate = float(conf.get( + 'log_statsd_default_sample_rate', 1)) + sample_rate_factor = float(conf.get( + 'log_statsd_sample_rate_factor', 1)) + if statsd_tail_prefix is None: + statsd_tail_prefix = name + logger.statsd_client = statsd_client.StatsdClient( + statsd_host, statsd_port, base_prefix, statsd_tail_prefix, + default_sample_rate, sample_rate_factor, logger=logger) + else: + logger.statsd_client = None + + adapted_logger = LogAdapter(logger, name) + other_handlers = conf.get('log_custom_handlers', None) + if other_handlers: + log_custom_handlers = [s.strip() for s in other_handlers.split(',') + if s.strip()] + for hook in log_custom_handlers: + try: + mod, fnc = hook.rsplit('.', 1) + logger_hook = getattr(__import__(mod, fromlist=[fnc]), fnc) + logger_hook(conf, name, log_to_console, log_route, fmt, + logger, adapted_logger) + except (AttributeError, ImportError): + print('Error calling custom handler [%s]' % hook, + file=sys.stderr) + except ValueError: + print('Invalid custom handler format [%s]' % hook, + file=sys.stderr) + + return adapted_logger + + +class NullLogger(object): + """A no-op logger for eventlet wsgi.""" + + def write(self, *args): + # "Logs" the args to nowhere + pass + + def exception(self, *args): + pass + + def critical(self, *args): + pass + + def error(self, *args): + pass + + def warning(self, *args): + pass + + def info(self, *args): + pass + + def debug(self, *args): + pass + + def log(self, *args): + pass + + +def capture_stdio(logger, **kwargs): + """ + Log unhandled exceptions, close stdio, capture stdout and stderr. + + param logger: Logger object to use + """ + # log uncaught exceptions + sys.excepthook = lambda * exc_info: \ + logger.critical('UNCAUGHT EXCEPTION', exc_info=exc_info) + + # collect stdio file desc not in use for logging + stdio_files = [sys.stdin, sys.stdout, sys.stderr] + console_fds = [h.stream.fileno() for _junk, h in getattr( + get_logger, 'console_handler4logger', {}).items()] + stdio_files = [f for f in stdio_files if f.fileno() not in console_fds] + + with open(os.devnull, 'r+b') as nullfile: + # close stdio (excludes fds open for logging) + for f in stdio_files: + # some platforms throw an error when attempting an stdin flush + try: + f.flush() + except IOError: + pass + + try: + os.dup2(nullfile.fileno(), f.fileno()) + except OSError: + pass + + # redirect stdio + if kwargs.pop('capture_stdout', True): + sys.stdout = LoggerFileObject(logger) + if kwargs.pop('capture_stderr', True): + sys.stderr = LoggerFileObject(logger, 'STDERR') + + +class StrAnonymizer(str): + """ + Class that permits to get a string anonymized or simply quoted. + """ + + def __new__(cls, data, method, salt): + method = method.lower() + if method not in (hashlib.algorithms if six.PY2 else + hashlib.algorithms_guaranteed): + raise ValueError('Unsupported hashing method: %r' % method) + s = str.__new__(cls, data or '') + s.method = method + s.salt = salt + return s + + @property + def anonymized(self): + if not self: + return self + else: + if self.method == 'md5': + h = md5(usedforsecurity=False) + else: + h = getattr(hashlib, self.method)() + if self.salt: + h.update(six.b(self.salt)) + h.update(six.b(self)) + return '{%s%s}%s' % ('S' if self.salt else '', self.method.upper(), + h.hexdigest()) + + +class StrFormatTime(object): + """ + Class that permits to get formats or parts of a time. + """ + + def __init__(self, ts): + self.time = ts + self.time_struct = time.gmtime(ts) + + def __str__(self): + return "%.9f" % self.time + + def __getattr__(self, attr): + if attr not in ['a', 'A', 'b', 'B', 'c', 'd', 'H', + 'I', 'j', 'm', 'M', 'p', 'S', 'U', + 'w', 'W', 'x', 'X', 'y', 'Y', 'Z']: + raise ValueError(("The attribute %s is not a correct directive " + "for time.strftime formater.") % attr) + return datetime.datetime(*self.time_struct[:-2], + tzinfo=UTC).strftime('%' + attr) + + @property + def asctime(self): + return time.asctime(self.time_struct) + + @property + def datetime(self): + return time.strftime('%d/%b/%Y/%H/%M/%S', self.time_struct) + + @property + def iso8601(self): + return time.strftime('%Y-%m-%dT%H:%M:%S', self.time_struct) + + @property + def ms(self): + return self.__str__().split('.')[1][:3] + + @property + def us(self): + return self.__str__().split('.')[1][:6] + + @property + def ns(self): + return self.__str__().split('.')[1] + + @property + def s(self): + return self.__str__().split('.')[0] + + +def get_log_line(req, res, trans_time, additional_info, fmt, + anonymization_method, anonymization_salt): + """ + Make a line for logging that matches the documented log line format + for backend servers. + + :param req: the request. + :param res: the response. + :param trans_time: the time the request took to complete, a float. + :param additional_info: a string to log at the end of the line + + :returns: a properly formatted line for logging. + """ + + policy_index = get_policy_index(req.headers, res.headers) + if req.path.startswith('/'): + disk, partition, account, container, obj = split_path(req.path, 0, 5, + True) + else: + disk, partition, account, container, obj = (None, ) * 5 + replacements = { + 'remote_addr': StrAnonymizer(req.remote_addr, anonymization_method, + anonymization_salt), + 'time': StrFormatTime(time.time()), + 'method': req.method, + 'path': StrAnonymizer(req.path, anonymization_method, + anonymization_salt), + 'disk': disk, + 'partition': partition, + 'account': StrAnonymizer(account, anonymization_method, + anonymization_salt), + 'container': StrAnonymizer(container, anonymization_method, + anonymization_salt), + 'object': StrAnonymizer(obj, anonymization_method, + anonymization_salt), + 'status': res.status.split()[0], + 'content_length': res.content_length, + 'referer': StrAnonymizer(req.referer, anonymization_method, + anonymization_salt), + 'txn_id': req.headers.get('x-trans-id'), + 'user_agent': StrAnonymizer(req.user_agent, anonymization_method, + anonymization_salt), + 'trans_time': trans_time, + 'additional_info': additional_info, + 'pid': os.getpid(), + 'policy_index': policy_index, + } + return LogStringFormatter(default='-').format(fmt, **replacements) + + +def get_policy_index(req_headers, res_headers): + """ + Returns the appropriate index of the storage policy for the request from + a proxy server + + :param req_headers: dict of the request headers. + :param res_headers: dict of the response headers. + + :returns: string index of storage policy, or None + """ + header = 'X-Backend-Storage-Policy-Index' + policy_index = res_headers.get(header, req_headers.get(header)) + if isinstance(policy_index, six.binary_type) and not six.PY2: + policy_index = policy_index.decode('ascii') + return str(policy_index) if policy_index is not None else None + + +class LogStringFormatter(string.Formatter): + def __init__(self, default='', quote=False): + super(LogStringFormatter, self).__init__() + self.default = default + self.quote = quote + + def format_field(self, value, spec): + if not value: + return self.default + else: + log = super(LogStringFormatter, self).format_field(value, spec) + if self.quote: + return quote(log, ':/{}') + else: + return log diff --git a/test/debug_logger.py b/test/debug_logger.py index 071885c605..52e39777ca 100644 --- a/test/debug_logger.py +++ b/test/debug_logger.py @@ -19,7 +19,7 @@ import sys from collections import defaultdict -from swift.common import utils +from swift.common import utils, statsd_client from swift.common.utils import NOTICE @@ -29,7 +29,7 @@ class WARN_DEPRECATED(Exception): print(self.msg) -class FakeStatsdClient(utils.StatsdClient): +class FakeStatsdClient(statsd_client.StatsdClient): def __init__(self, host, port, base_prefix='', tail_prefix='', default_sample_rate=1, sample_rate_factor=1, logger=None): super(FakeStatsdClient, self).__init__( @@ -313,8 +313,8 @@ def capture_logger(conf, *args, **kwargs): accessor methods (e.g. get_lines_for_level) directly on the logger instance. """ - with mock.patch('swift.common.utils.LogAdapter', CaptureLogAdapter): - log_adapter = utils.get_logger(conf, *args, **kwargs) + with mock.patch('swift.common.utils.logs.LogAdapter', CaptureLogAdapter): + log_adapter = utils.logs.get_logger(conf, *args, **kwargs) log_adapter.start_capture() try: yield log_adapter diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 6eb1db2c45..5114787d96 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -48,6 +48,7 @@ import six import six.moves.cPickle as pickle from six.moves import range from six.moves.http_client import HTTPException +from six.moves import configparser from swift.common import storage_policy, swob, utils, exceptions from swift.common.memcached import MemcacheConnectionError @@ -1448,7 +1449,7 @@ class ConfigAssertMixin(object): def assertDuplicateOptionError(self, app_config, option_name): with self.assertRaises( - utils.configparser.DuplicateOptionError) as ctx: + configparser.DuplicateOptionError) as ctx: app_config() msg = str(ctx.exception) self.assertIn(option_name, msg) diff --git a/test/unit/common/middleware/s3api/test_s3api.py b/test/unit/common/middleware/s3api/test_s3api.py index d2d8edd6df..b5f715d990 100644 --- a/test/unit/common/middleware/s3api/test_s3api.py +++ b/test/unit/common/middleware/s3api/test_s3api.py @@ -225,7 +225,8 @@ class TestS3ApiMiddleware(S3ApiTestCase): self.assertEqual('swift', s3api.logger.server) self.assertIsNone(s3api.logger.logger.statsd_client) - with mock.patch('swift.common.utils.StatsdClient', FakeStatsdClient): + with mock.patch('swift.common.statsd_client.StatsdClient', + FakeStatsdClient): s3api = S3ApiMiddleware(None, {'log_name': 'proxy-server', 'log_statsd_host': '1.2.3.4'}) s3api.logger.increment('test-metric') diff --git a/test/unit/common/middleware/test_proxy_logging.py b/test/unit/common/middleware/test_proxy_logging.py index 4d34254f0d..526f93aea5 100644 --- a/test/unit/common/middleware/test_proxy_logging.py +++ b/test/unit/common/middleware/test_proxy_logging.py @@ -21,7 +21,8 @@ from logging.handlers import SysLogHandler import six from six.moves.urllib.parse import unquote -from swift.common.utils import get_logger, split_path, StatsdClient +from swift.common.utils import get_logger, split_path +from swift.common.statsd_client import StatsdClient from swift.common.middleware import proxy_logging from swift.common.registry import register_sensitive_header, \ register_sensitive_param, get_sensitive_headers diff --git a/test/unit/common/middleware/test_tempauth.py b/test/unit/common/middleware/test_tempauth.py index 9d2cb38abc..3cdb1b5ffd 100644 --- a/test/unit/common/middleware/test_tempauth.py +++ b/test/unit/common/middleware/test_tempauth.py @@ -24,7 +24,8 @@ from six.moves.urllib.parse import quote, urlparse from swift.common.middleware import tempauth as auth from swift.common.middleware.acl import format_acl from swift.common.swob import Request, Response, bytes_to_wsgi -from swift.common.utils import split_path, StatsdClient +from swift.common.statsd_client import StatsdClient +from swift.common.utils import split_path from test.unit import FakeMemcache NO_CONTENT_RESP = (('204 No Content', {}, ''),) # mock server response diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 5742f550e4..726e2674cf 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -73,6 +73,7 @@ from swift.common.exceptions import Timeout, MessageTimeout, \ ConnectionTimeout, LockTimeout, ReplicationLockTimeout, \ MimeInvalid from swift.common import utils +from swift.common.statsd_client import StatsdClient from swift.common.utils import set_swift_dir, md5, ShardRangeList, \ SwiftLogFormatter from swift.common.container_sync_realms import ContainerSyncRealms @@ -1190,7 +1191,7 @@ class TestUtils(unittest.TestCase): @with_tempdir def test_get_logger_sysloghandler_plumbing(self, tempdir): - orig_sysloghandler = utils.ThreadSafeSysLogHandler + orig_sysloghandler = utils.logs.ThreadSafeSysLogHandler syslog_handler_args = [] def syslog_handler_catcher(*args, **kwargs): @@ -1207,7 +1208,7 @@ class TestUtils(unittest.TestCase): def fake_getaddrinfo(host, *args): return orig_getaddrinfo('localhost', *args) - with mock.patch.object(utils, 'ThreadSafeSysLogHandler', + with mock.patch.object(utils.logs, 'ThreadSafeSysLogHandler', syslog_handler_catcher), \ mock.patch.object(socket, 'getaddrinfo', fake_getaddrinfo): # default log_address @@ -1289,7 +1290,7 @@ class TestUtils(unittest.TestCase): 'facility': orig_sysloghandler.LOG_LOCAL0})], syslog_handler_args) - with mock.patch.object(utils, 'ThreadSafeSysLogHandler', + with mock.patch.object(utils.logs, 'ThreadSafeSysLogHandler', side_effect=OSError(errno.EPERM, 'oops')): with self.assertRaises(OSError) as cm: utils.get_logger({ @@ -2011,54 +2012,57 @@ log_name = %(yarr)s''' @reset_logger_state def test_capture_stdio(self): # stubs - logger = utils.get_logger(None, 'dummy') + logger = utils.logs.get_logger(None, 'dummy') # mock utils system modules - _orig_sys = utils.sys - _orig_os = utils.os - try: - utils.sys = MockSys() - utils.os = MockOs() - + mock_os = MockOs() + mock_sys = MockSys() + with mock.patch.object(utils.logs, 'os', mock_os), \ + mock.patch.object(utils.logs, 'sys', mock_sys): # basic test - utils.capture_stdio(logger) - self.assertTrue(utils.sys.excepthook is not None) - self.assertEqual(utils.os.closed_fds, utils.sys.stdio_fds) - self.assertIsInstance(utils.sys.stdout, utils.LoggerFileObject) - self.assertIsInstance(utils.sys.stderr, utils.LoggerFileObject) - - # reset; test same args, but exc when trying to close stdio - utils.os = MockOs(raise_funcs=('dup2',)) - utils.sys = MockSys() + utils.logs.capture_stdio(logger) + self.assertTrue(mock_sys.excepthook is not None) + self.assertEqual(mock_os.closed_fds, mock_sys.stdio_fds) + self.assertIsInstance(mock_sys.stdout, + utils.logs.LoggerFileObject) + self.assertIsInstance(mock_sys.stderr, + utils.logs.LoggerFileObject) + # reset; test same args, but exc when trying to close stdio + mock_os = MockOs(raise_funcs=('dup2',)) + mock_sys = MockSys() + with mock.patch.object(utils.logs, 'os', mock_os), \ + mock.patch.object(utils.logs, 'sys', mock_sys): # test unable to close stdio - utils.capture_stdio(logger) - self.assertTrue(utils.sys.excepthook is not None) - self.assertEqual(utils.os.closed_fds, []) - self.assertIsInstance(utils.sys.stdout, utils.LoggerFileObject) - self.assertIsInstance(utils.sys.stderr, utils.LoggerFileObject) + utils.logs.capture_stdio(logger) + self.assertTrue(utils.logs.sys.excepthook is not None) + self.assertEqual(utils.logs.os.closed_fds, []) + self.assertIsInstance(mock_sys.stdout, + utils.logs.LoggerFileObject) + self.assertIsInstance(mock_sys.stderr, + utils.logs.LoggerFileObject) - # reset; test some other args - utils.os = MockOs() - utils.sys = MockSys() + # reset; test some other args + mock_os = MockOs() + mock_sys = MockSys() + with mock.patch.object(utils.logs, 'os', mock_os), \ + mock.patch.object(utils.logs, 'sys', mock_sys): logger = utils.get_logger(None, log_to_console=True) # test console log - utils.capture_stdio(logger, capture_stdout=False, - capture_stderr=False) - self.assertTrue(utils.sys.excepthook is not None) + utils.logs.capture_stdio(logger, capture_stdout=False, + capture_stderr=False) + self.assertTrue(utils.logs.sys.excepthook is not None) # when logging to console, stderr remains open - self.assertEqual(utils.os.closed_fds, utils.sys.stdio_fds[:2]) + self.assertEqual(mock_os.closed_fds, + mock_sys.stdio_fds[:2]) reset_loggers() # stdio not captured - self.assertFalse(isinstance(utils.sys.stdout, - utils.LoggerFileObject)) - self.assertFalse(isinstance(utils.sys.stderr, - utils.LoggerFileObject)) - finally: - utils.sys = _orig_sys - utils.os = _orig_os + self.assertFalse(isinstance(mock_sys.stdout, + utils.logs.LoggerFileObject)) + self.assertFalse(isinstance(mock_sys.stderr, + utils.logs.LoggerFileObject)) @reset_logger_state def test_get_logger_console(self): @@ -2464,18 +2468,14 @@ cluster_dfw1 = http://dfw1.host/v1/ for v in utils.TRUE_VALUES: self.assertEqual(v, v.lower()) + @mock.patch.object(utils.config, 'TRUE_VALUES', 'hello world'.split()) def test_config_true_value(self): - orig_trues = utils.TRUE_VALUES - try: - utils.TRUE_VALUES = 'hello world'.split() - for val in 'hello world HELLO WORLD'.split(): - self.assertTrue(utils.config_true_value(val) is True) - self.assertTrue(utils.config_true_value(True) is True) - self.assertTrue(utils.config_true_value('foo') is False) - self.assertTrue(utils.config_true_value(False) is False) - self.assertTrue(utils.config_true_value(None) is False) - finally: - utils.TRUE_VALUES = orig_trues + for val in 'hello world HELLO WORLD'.split(): + self.assertTrue(utils.config_true_value(val) is True) + self.assertTrue(utils.config_true_value(True) is True) + self.assertTrue(utils.config_true_value('foo') is False) + self.assertTrue(utils.config_true_value(False) is False) + self.assertTrue(utils.config_true_value(None) is False) def test_non_negative_float(self): self.assertEqual(0, utils.non_negative_float('0.0')) @@ -3361,7 +3361,7 @@ cluster_dfw1 = http://dfw1.host/v1/ 'Swift is great!', 'sha257', '') def test_str_anonymizer_python_maddness(self): - with mock.patch('swift.common.utils.hashlib') as mocklib: + with mock.patch('swift.common.utils.base.hashlib') as mocklib: if six.PY2: # python <2.7.9 doesn't have this algorithms_guaranteed, but # our if block short-circuts before we explode @@ -4724,7 +4724,7 @@ class TestStatsdLogging(unittest.TestCase): logger = utils.get_logger({'log_statsd_host': 'some.host.com'}, 'some-name', log_route='some-route') # white-box construction validation - self.assertIsInstance(logger.logger.statsd_client, utils.StatsdClient) + self.assertIsInstance(logger.logger.statsd_client, StatsdClient) self.assertEqual(logger.logger.statsd_client._host, 'some.host.com') self.assertEqual(logger.logger.statsd_client._port, 8125) self.assertEqual(logger.logger.statsd_client._prefix, 'some-name.') @@ -4890,7 +4890,8 @@ class TestStatsdLogging(unittest.TestCase): '', ('::1', port, 0, 0))] - with mock.patch.object(utils.socket, 'getaddrinfo', fake_getaddrinfo): + with mock.patch.object(utils.logs.socket, + 'getaddrinfo', fake_getaddrinfo): logger = utils.get_logger({ 'log_statsd_host': '::1', 'log_statsd_port': '9876', diff --git a/test/unit/helpers.py b/test/unit/helpers.py index ab50e7200b..2d22884cca 100644 --- a/test/unit/helpers.py +++ b/test/unit/helpers.py @@ -64,8 +64,8 @@ def setup_servers(the_object_server=object_server, extra_conf=None): :returns: A dict containing the following entries: orig_POLICIES: the value of storage_policy.POLICIES prior to it being patched with fake policies - orig_SysLogHandler: the value of utils.SysLogHandler prior to - it being patched + orig_SysLogHandler: the value of utils.logs.SysLogHandler + prior to it being patched testdir: root directory used for test files test_POLICIES: a StoragePolicyCollection of fake policies test_servers: a tuple of test server instances @@ -75,10 +75,10 @@ def setup_servers(the_object_server=object_server, extra_conf=None): """ context = { "orig_POLICIES": storage_policy._POLICIES, - "orig_SysLogHandler": utils.SysLogHandler} + "orig_SysLogHandler": utils.logs.SysLogHandler} utils.HASH_PATH_SUFFIX = b'endcap' - utils.SysLogHandler = mock.MagicMock() + utils.logs.SysLogHandler = mock.MagicMock() # Since we're starting up a lot here, we're going to test more than # just chunked puts; we're also going to test parts of # proxy_server.Application we couldn't get to easily otherwise. @@ -336,5 +336,5 @@ def teardown_servers(context): for server in context["test_coros"]: server.kill() rmtree(os.path.dirname(context["testdir"])) - utils.SysLogHandler = context["orig_SysLogHandler"] + utils.logs.SysLogHandler = context["orig_SysLogHandler"] storage_policy._POLICIES = context["orig_POLICIES"] diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index d3fcb961fb..10a4d77848 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -60,6 +60,7 @@ from test.unit import ( DEFAULT_TEST_EC_TYPE, make_timestamp_iter, skip_if_no_xattrs, FakeHTTPResponse, node_error_count, node_last_error, set_node_errors) from test.unit.helpers import setup_servers, teardown_servers +from swift.common.statsd_client import StatsdClient from swift.proxy import server as proxy_server from swift.proxy.controllers.obj import ReplicatedObjectController from swift.obj import server as object_server @@ -71,9 +72,8 @@ from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist, \ APIVersionError, ChunkReadError from swift.common import utils, constraints, registry from swift.common.utils import hash_path, storage_directory, \ - parse_content_type, parse_mime_headers, StatsdClient, \ - iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \ - node_to_string, NamespaceBoundList + parse_content_type, parse_mime_headers, iter_multipart_mime_documents, \ + public, mkdirs, NullLogger, md5, node_to_string, NamespaceBoundList from swift.common.wsgi import loadapp, ConfigString from swift.common.http_protocol import SwiftHttpProtocol from swift.proxy.controllers import base as proxy_base @@ -2293,7 +2293,8 @@ class TestProxyServerConfigLoading(unittest.TestCase): """ % self.tempdir conf_path = self._write_conf(dedent(conf_sections)) - with mock.patch('swift.common.utils.StatsdClient') as mock_statsd: + with mock.patch('swift.common.statsd_client.StatsdClient')\ + as mock_statsd: app = loadapp(conf_path, allow_modify_pipeline=False) # logger name is hard-wired 'proxy-server' self.assertEqual('proxy-server', app.logger.name) @@ -2316,7 +2317,8 @@ class TestProxyServerConfigLoading(unittest.TestCase): """ % self.tempdir conf_path = self._write_conf(dedent(conf_sections)) - with mock.patch('swift.common.utils.StatsdClient') as mock_statsd: + with mock.patch('swift.common.statsd_client.StatsdClient') \ + as mock_statsd: app = loadapp(conf_path, allow_modify_pipeline=False) # logger name is hard-wired 'proxy-server' self.assertEqual('proxy-server', app.logger.name)