""" Swift tests """ import sys import os import copy import logging from sys import exc_info from contextlib import contextmanager from collections import defaultdict from tempfile import NamedTemporaryFile from eventlet.green import socket from tempfile import mkdtemp from shutil import rmtree from test import get_config from ConfigParser import MissingSectionHeaderError from StringIO import StringIO from swift.common.utils import readconf, config_true_value from logging import Handler import logging.handlers def readuntil2crlfs(fd): rv = '' lc = '' crlfs = 0 while crlfs < 2: c = fd.read(1) rv = rv + c if c == '\r' and lc != '\n': crlfs = 0 if lc == '\r' and c == '\n': crlfs += 1 lc = c return rv def connect_tcp(hostport): rv = socket.socket() rv.connect(hostport) return rv @contextmanager def tmpfile(content): with NamedTemporaryFile('w', delete=False) as f: file_name = f.name f.write(str(content)) try: yield file_name finally: os.unlink(file_name) xattr_data = {} def _get_inode(fd): if not isinstance(fd, int): try: fd = fd.fileno() except AttributeError: return os.stat(fd).st_ino return os.fstat(fd).st_ino def _setxattr(fd, k, v): inode = _get_inode(fd) data = xattr_data.get(inode, {}) data[k] = v xattr_data[inode] = data def _getxattr(fd, k): inode = _get_inode(fd) data = xattr_data.get(inode, {}).get(k) if not data: raise IOError return data import xattr xattr.setxattr = _setxattr xattr.getxattr = _getxattr @contextmanager def temptree(files, contents=''): # generate enough contents to fill the files c = len(files) contents = (list(contents) + [''] * c)[:c] tempdir = mkdtemp() for path, content in zip(files, contents): if os.path.isabs(path): path = '.' + path new_path = os.path.join(tempdir, path) subdir = os.path.dirname(new_path) if not os.path.exists(subdir): os.makedirs(subdir) with open(new_path, 'w') as f: f.write(str(content)) try: yield tempdir finally: rmtree(tempdir) class NullLoggingHandler(logging.Handler): def emit(self, record): pass class FakeLogger(object): # a thread safe logger def __init__(self, *args, **kwargs): self._clear() self.level = logging.NOTSET if 'facility' in kwargs: self.facility = kwargs['facility'] def _clear(self): self.log_dict = defaultdict(list) def _store_in(store_name): def stub_fn(self, *args, **kwargs): self.log_dict[store_name].append((args, kwargs)) return stub_fn error = _store_in('error') info = _store_in('info') warning = _store_in('warning') debug = _store_in('debug') def exception(self, *args, **kwargs): self.log_dict['exception'].append((args, kwargs, str(exc_info()[1]))) # mock out the StatsD logging methods: increment = _store_in('increment') decrement = _store_in('decrement') timing = _store_in('timing') timing_since = _store_in('timing_since') update_stats = _store_in('update_stats') set_statsd_prefix = _store_in('set_statsd_prefix') def get_increments(self): return [call[0][0] for call in self.log_dict['increment']] def get_increment_counts(self): counts = {} for metric in self.get_increments(): if metric not in counts: counts[metric] = 0 counts[metric] += 1 return counts def setFormatter(self, obj): self.formatter = obj def close(self): self._clear() def set_name(self, name): # don't touch _handlers self._name = name def acquire(self): pass def release(self): pass def createLock(self): pass def emit(self, record): pass def handle(self, record): pass def flush(self): pass def handleError(self, record): pass original_syslog_handler = logging.handlers.SysLogHandler def fake_syslog_handler(): for attr in dir(original_syslog_handler): if attr.startswith('LOG'): setattr(FakeLogger, attr, copy.copy(getattr(logging.handlers.SysLogHandler, attr))) FakeLogger.priority_map = \ copy.deepcopy(logging.handlers.SysLogHandler.priority_map) logging.handlers.SysLogHandler = FakeLogger if config_true_value(get_config('unit_test').get('fake_syslog', 'False')): fake_syslog_handler() class MockTrue(object): """ Instances of MockTrue evaluate like True Any attr accessed on an instance of MockTrue will return a MockTrue instance. Any method called on an instance of MockTrue will return a MockTrue instance. >>> thing = MockTrue() >>> thing True >>> thing == True # True == True True >>> thing == False # True == False False >>> thing != True # True != True False >>> thing != False # True != False True >>> thing.attribute True >>> thing.method() True >>> thing.attribute.method() True >>> thing.method().attribute True """ def __getattribute__(self, *args, **kwargs): return self def __call__(self, *args, **kwargs): return self def __repr__(*args, **kwargs): return repr(True) def __eq__(self, other): return other is True def __ne__(self, other): return other is not True @contextmanager def mock(update): returns = [] deletes = [] for key, value in update.items(): imports = key.split('.') attr = imports.pop(-1) module = __import__(imports[0], fromlist=imports[1:]) for modname in imports[1:]: module = getattr(module, modname) if hasattr(module, attr): returns.append((module, attr, getattr(module, attr))) else: deletes.append((module, attr)) setattr(module, attr, value) yield True for module, attr, value in returns: setattr(module, attr, value) for module, attr in deletes: delattr(module, attr)