Moved some code out of swift.obj.replicator
This will be needed in future replication work to avoid circular imports. I used swift.obj.base as the module name just because we seemed to avoid putting code in __init__.py files so far and I didn't want to buck the trend. I would love to see other obj things like *_metadata and DiskFile move into swift.obj.base as well and swift.obj.server just be the WSGI server logic, but I'll leave that for the future. I have changed the tests as little as possible (just the references to where they get the code to test) to show the refactor has not broken anything. I did add a test for tpool_reraise since there was none before. There will be a follow on patch for moving the tests to their new location(s). I figured I'd wait to put the bikes in the shed until everyone's done painting it. Change-Id: I32b4ac88be21eb76c877d3f4cc1e6ac33304835b
This commit is contained in:
parent
c3e6f3a1d6
commit
9fe15dd15a
@ -43,7 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult
|
|||||||
import itertools
|
import itertools
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from eventlet import GreenPool, sleep, Timeout
|
from eventlet import GreenPool, sleep, Timeout, tpool
|
||||||
from eventlet.green import socket, threading
|
from eventlet.green import socket, threading
|
||||||
import netifaces
|
import netifaces
|
||||||
import codecs
|
import codecs
|
||||||
@ -1716,3 +1716,18 @@ class InputProxy(object):
|
|||||||
raise
|
raise
|
||||||
self.bytes_received += len(line)
|
self.bytes_received += len(line)
|
||||||
return line
|
return line
|
||||||
|
|
||||||
|
|
||||||
|
def tpool_reraise(func, *args, **kwargs):
|
||||||
|
"""
|
||||||
|
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
|
||||||
|
"""
|
||||||
|
def inner():
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except BaseException, err:
|
||||||
|
return err
|
||||||
|
resp = tpool.execute(inner)
|
||||||
|
if isinstance(resp, BaseException):
|
||||||
|
raise resp
|
||||||
|
return resp
|
||||||
|
207
swift/obj/base.py
Normal file
207
swift/obj/base.py
Normal file
@ -0,0 +1,207 @@
|
|||||||
|
# Copyright (c) 2010-2013 OpenStack, LLC.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||||
|
# implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""General object server functions."""
|
||||||
|
|
||||||
|
import cPickle as pickle
|
||||||
|
import errno
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
from os.path import basename, dirname, join
|
||||||
|
|
||||||
|
from swift.common.exceptions import PathNotDir
|
||||||
|
from swift.common.utils import lock_path, renamer, write_pickle
|
||||||
|
|
||||||
|
|
||||||
|
PICKLE_PROTOCOL = 2
|
||||||
|
ONE_WEEK = 604800
|
||||||
|
HASH_FILE = 'hashes.pkl'
|
||||||
|
|
||||||
|
|
||||||
|
def quarantine_renamer(device_path, corrupted_file_path):
|
||||||
|
"""
|
||||||
|
In the case that a file is corrupted, move it to a quarantined
|
||||||
|
area to allow replication to fix it.
|
||||||
|
|
||||||
|
:params device_path: The path to the device the corrupted file is on.
|
||||||
|
:params corrupted_file_path: The path to the file you want quarantined.
|
||||||
|
|
||||||
|
:returns: path (str) of directory the file was moved to
|
||||||
|
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
|
||||||
|
exceptions from rename
|
||||||
|
"""
|
||||||
|
from_dir = dirname(corrupted_file_path)
|
||||||
|
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
|
||||||
|
invalidate_hash(dirname(from_dir))
|
||||||
|
try:
|
||||||
|
renamer(from_dir, to_dir)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||||
|
raise
|
||||||
|
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
||||||
|
renamer(from_dir, to_dir)
|
||||||
|
return to_dir
|
||||||
|
|
||||||
|
|
||||||
|
def hash_suffix(path, reclaim_age):
|
||||||
|
"""
|
||||||
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
|
|
||||||
|
:param reclaim_age: age in seconds at which to remove tombstones
|
||||||
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
|
:raises OSError: for non-ENOTDIR errors
|
||||||
|
"""
|
||||||
|
md5 = hashlib.md5()
|
||||||
|
try:
|
||||||
|
path_contents = sorted(os.listdir(path))
|
||||||
|
except OSError, err:
|
||||||
|
if err.errno in (errno.ENOTDIR, errno.ENOENT):
|
||||||
|
raise PathNotDir()
|
||||||
|
raise
|
||||||
|
for hsh in path_contents:
|
||||||
|
hsh_path = join(path, hsh)
|
||||||
|
try:
|
||||||
|
files = os.listdir(hsh_path)
|
||||||
|
except OSError, err:
|
||||||
|
if err.errno == errno.ENOTDIR:
|
||||||
|
partition_path = dirname(path)
|
||||||
|
objects_path = dirname(partition_path)
|
||||||
|
device_path = dirname(objects_path)
|
||||||
|
quar_path = quarantine_renamer(device_path, hsh_path)
|
||||||
|
logging.exception(
|
||||||
|
_('Quarantined %s to %s because it is not a directory') %
|
||||||
|
(hsh_path, quar_path))
|
||||||
|
continue
|
||||||
|
raise
|
||||||
|
if len(files) == 1:
|
||||||
|
if files[0].endswith('.ts'):
|
||||||
|
# remove tombstones older than reclaim_age
|
||||||
|
ts = files[0].rsplit('.', 1)[0]
|
||||||
|
if (time.time() - float(ts)) > reclaim_age:
|
||||||
|
os.unlink(join(hsh_path, files[0]))
|
||||||
|
files.remove(files[0])
|
||||||
|
elif files:
|
||||||
|
files.sort(reverse=True)
|
||||||
|
meta = data = tomb = None
|
||||||
|
for filename in list(files):
|
||||||
|
if not meta and filename.endswith('.meta'):
|
||||||
|
meta = filename
|
||||||
|
if not data and filename.endswith('.data'):
|
||||||
|
data = filename
|
||||||
|
if not tomb and filename.endswith('.ts'):
|
||||||
|
tomb = filename
|
||||||
|
if (filename < tomb or # any file older than tomb
|
||||||
|
filename < data or # any file older than data
|
||||||
|
(filename.endswith('.meta') and
|
||||||
|
filename < meta)): # old meta
|
||||||
|
os.unlink(join(hsh_path, filename))
|
||||||
|
files.remove(filename)
|
||||||
|
if not files:
|
||||||
|
os.rmdir(hsh_path)
|
||||||
|
for filename in files:
|
||||||
|
md5.update(filename)
|
||||||
|
try:
|
||||||
|
os.rmdir(path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return md5.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def invalidate_hash(suffix_dir):
|
||||||
|
"""
|
||||||
|
Invalidates the hash for a suffix_dir in the partition's hashes file.
|
||||||
|
|
||||||
|
:param suffix_dir: absolute path to suffix dir whose hash needs
|
||||||
|
invalidating
|
||||||
|
"""
|
||||||
|
|
||||||
|
suffix = os.path.basename(suffix_dir)
|
||||||
|
partition_dir = os.path.dirname(suffix_dir)
|
||||||
|
hashes_file = join(partition_dir, HASH_FILE)
|
||||||
|
with lock_path(partition_dir):
|
||||||
|
try:
|
||||||
|
with open(hashes_file, 'rb') as fp:
|
||||||
|
hashes = pickle.load(fp)
|
||||||
|
if suffix in hashes and not hashes[suffix]:
|
||||||
|
return
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
hashes[suffix] = None
|
||||||
|
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
||||||
|
|
||||||
|
|
||||||
|
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
|
||||||
|
reclaim_age=ONE_WEEK):
|
||||||
|
"""
|
||||||
|
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
|
||||||
|
the hash cache for suffix existence at the (unexpectedly high) cost of a
|
||||||
|
listdir. reclaim_age is just passed on to hash_suffix.
|
||||||
|
|
||||||
|
:param partition_dir: absolute path of partition to get hashes for
|
||||||
|
:param recalculate: list of suffixes which should be recalculated when got
|
||||||
|
:param do_listdir: force existence check for all hashes in the partition
|
||||||
|
:param reclaim_age: age at which to remove tombstones
|
||||||
|
|
||||||
|
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
|
||||||
|
"""
|
||||||
|
|
||||||
|
hashed = 0
|
||||||
|
hashes_file = join(partition_dir, HASH_FILE)
|
||||||
|
modified = False
|
||||||
|
force_rewrite = False
|
||||||
|
hashes = {}
|
||||||
|
mtime = -1
|
||||||
|
|
||||||
|
if recalculate is None:
|
||||||
|
recalculate = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(hashes_file, 'rb') as fp:
|
||||||
|
hashes = pickle.load(fp)
|
||||||
|
mtime = os.path.getmtime(hashes_file)
|
||||||
|
except Exception:
|
||||||
|
do_listdir = True
|
||||||
|
force_rewrite = True
|
||||||
|
if do_listdir:
|
||||||
|
for suff in os.listdir(partition_dir):
|
||||||
|
if len(suff) == 3:
|
||||||
|
hashes.setdefault(suff, None)
|
||||||
|
modified = True
|
||||||
|
hashes.update((hash_, None) for hash_ in recalculate)
|
||||||
|
for suffix, hash_ in hashes.items():
|
||||||
|
if not hash_:
|
||||||
|
suffix_dir = join(partition_dir, suffix)
|
||||||
|
try:
|
||||||
|
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
|
||||||
|
hashed += 1
|
||||||
|
except PathNotDir:
|
||||||
|
del hashes[suffix]
|
||||||
|
except OSError:
|
||||||
|
logging.exception(_('Error hashing suffix'))
|
||||||
|
modified = True
|
||||||
|
if modified:
|
||||||
|
with lock_path(partition_dir):
|
||||||
|
if force_rewrite or not os.path.exists(hashes_file) or \
|
||||||
|
os.path.getmtime(hashes_file) == mtime:
|
||||||
|
write_pickle(
|
||||||
|
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
||||||
|
return hashed, hashes
|
||||||
|
return get_hashes(partition_dir, recalculate, do_listdir,
|
||||||
|
reclaim_age)
|
||||||
|
else:
|
||||||
|
return hashed, hashes
|
@ -14,16 +14,12 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from os.path import basename, dirname, isdir, isfile, join
|
from os.path import isdir, isfile, join
|
||||||
import random
|
import random
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
import time
|
||||||
import logging
|
|
||||||
import hashlib
|
|
||||||
import itertools
|
import itertools
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
import errno
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from eventlet import GreenPool, tpool, Timeout, sleep, hubs
|
from eventlet import GreenPool, tpool, Timeout, sleep, hubs
|
||||||
@ -31,209 +27,18 @@ from eventlet.green import subprocess
|
|||||||
from eventlet.support.greenlets import GreenletExit
|
from eventlet.support.greenlets import GreenletExit
|
||||||
|
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
|
from swift.common.utils import whataremyips, unlink_older_than, \
|
||||||
compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \
|
compute_eta, get_logger, dump_recon_cache, \
|
||||||
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub
|
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
|
||||||
|
tpool_reraise
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||||
from swift.common.exceptions import PathNotDir
|
from swift.obj.base import get_hashes
|
||||||
|
|
||||||
|
|
||||||
hubs.use_hub(get_hub())
|
hubs.use_hub(get_hub())
|
||||||
|
|
||||||
PICKLE_PROTOCOL = 2
|
|
||||||
ONE_WEEK = 604800
|
|
||||||
HASH_FILE = 'hashes.pkl'
|
|
||||||
|
|
||||||
|
|
||||||
def quarantine_renamer(device_path, corrupted_file_path):
|
|
||||||
"""
|
|
||||||
In the case that a file is corrupted, move it to a quarantined
|
|
||||||
area to allow replication to fix it.
|
|
||||||
|
|
||||||
:params device_path: The path to the device the corrupted file is on.
|
|
||||||
:params corrupted_file_path: The path to the file you want quarantined.
|
|
||||||
|
|
||||||
:returns: path (str) of directory the file was moved to
|
|
||||||
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
|
|
||||||
exceptions from rename
|
|
||||||
"""
|
|
||||||
from_dir = dirname(corrupted_file_path)
|
|
||||||
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
|
|
||||||
invalidate_hash(dirname(from_dir))
|
|
||||||
try:
|
|
||||||
renamer(from_dir, to_dir)
|
|
||||||
except OSError, e:
|
|
||||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
|
||||||
raise
|
|
||||||
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
|
||||||
renamer(from_dir, to_dir)
|
|
||||||
return to_dir
|
|
||||||
|
|
||||||
|
|
||||||
def hash_suffix(path, reclaim_age):
|
|
||||||
"""
|
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
|
||||||
|
|
||||||
:param reclaim_age: age in seconds at which to remove tombstones
|
|
||||||
:raises PathNotDir: if given path is not a valid directory
|
|
||||||
:raises OSError: for non-ENOTDIR errors
|
|
||||||
"""
|
|
||||||
md5 = hashlib.md5()
|
|
||||||
try:
|
|
||||||
path_contents = sorted(os.listdir(path))
|
|
||||||
except OSError, err:
|
|
||||||
if err.errno in (errno.ENOTDIR, errno.ENOENT):
|
|
||||||
raise PathNotDir()
|
|
||||||
raise
|
|
||||||
for hsh in path_contents:
|
|
||||||
hsh_path = join(path, hsh)
|
|
||||||
try:
|
|
||||||
files = os.listdir(hsh_path)
|
|
||||||
except OSError, err:
|
|
||||||
if err.errno == errno.ENOTDIR:
|
|
||||||
partition_path = dirname(path)
|
|
||||||
objects_path = dirname(partition_path)
|
|
||||||
device_path = dirname(objects_path)
|
|
||||||
quar_path = quarantine_renamer(device_path, hsh_path)
|
|
||||||
logging.exception(
|
|
||||||
_('Quarantined %s to %s because it is not a directory') %
|
|
||||||
(hsh_path, quar_path))
|
|
||||||
continue
|
|
||||||
raise
|
|
||||||
if len(files) == 1:
|
|
||||||
if files[0].endswith('.ts'):
|
|
||||||
# remove tombstones older than reclaim_age
|
|
||||||
ts = files[0].rsplit('.', 1)[0]
|
|
||||||
if (time.time() - float(ts)) > reclaim_age:
|
|
||||||
os.unlink(join(hsh_path, files[0]))
|
|
||||||
files.remove(files[0])
|
|
||||||
elif files:
|
|
||||||
files.sort(reverse=True)
|
|
||||||
meta = data = tomb = None
|
|
||||||
for filename in list(files):
|
|
||||||
if not meta and filename.endswith('.meta'):
|
|
||||||
meta = filename
|
|
||||||
if not data and filename.endswith('.data'):
|
|
||||||
data = filename
|
|
||||||
if not tomb and filename.endswith('.ts'):
|
|
||||||
tomb = filename
|
|
||||||
if (filename < tomb or # any file older than tomb
|
|
||||||
filename < data or # any file older than data
|
|
||||||
(filename.endswith('.meta') and
|
|
||||||
filename < meta)): # old meta
|
|
||||||
os.unlink(join(hsh_path, filename))
|
|
||||||
files.remove(filename)
|
|
||||||
if not files:
|
|
||||||
os.rmdir(hsh_path)
|
|
||||||
for filename in files:
|
|
||||||
md5.update(filename)
|
|
||||||
try:
|
|
||||||
os.rmdir(path)
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
return md5.hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
def invalidate_hash(suffix_dir):
|
|
||||||
"""
|
|
||||||
Invalidates the hash for a suffix_dir in the partition's hashes file.
|
|
||||||
|
|
||||||
:param suffix_dir: absolute path to suffix dir whose hash needs
|
|
||||||
invalidating
|
|
||||||
"""
|
|
||||||
|
|
||||||
suffix = os.path.basename(suffix_dir)
|
|
||||||
partition_dir = os.path.dirname(suffix_dir)
|
|
||||||
hashes_file = join(partition_dir, HASH_FILE)
|
|
||||||
with lock_path(partition_dir):
|
|
||||||
try:
|
|
||||||
with open(hashes_file, 'rb') as fp:
|
|
||||||
hashes = pickle.load(fp)
|
|
||||||
if suffix in hashes and not hashes[suffix]:
|
|
||||||
return
|
|
||||||
except Exception:
|
|
||||||
return
|
|
||||||
hashes[suffix] = None
|
|
||||||
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
|
||||||
|
|
||||||
|
|
||||||
def get_hashes(partition_dir, recalculate=None, do_listdir=False,
|
|
||||||
reclaim_age=ONE_WEEK):
|
|
||||||
"""
|
|
||||||
Get a list of hashes for the suffix dir. do_listdir causes it to mistrust
|
|
||||||
the hash cache for suffix existence at the (unexpectedly high) cost of a
|
|
||||||
listdir. reclaim_age is just passed on to hash_suffix.
|
|
||||||
|
|
||||||
:param partition_dir: absolute path of partition to get hashes for
|
|
||||||
:param recalculate: list of suffixes which should be recalculated when got
|
|
||||||
:param do_listdir: force existence check for all hashes in the partition
|
|
||||||
:param reclaim_age: age at which to remove tombstones
|
|
||||||
|
|
||||||
:returns: tuple of (number of suffix dirs hashed, dictionary of hashes)
|
|
||||||
"""
|
|
||||||
|
|
||||||
hashed = 0
|
|
||||||
hashes_file = join(partition_dir, HASH_FILE)
|
|
||||||
modified = False
|
|
||||||
force_rewrite = False
|
|
||||||
hashes = {}
|
|
||||||
mtime = -1
|
|
||||||
|
|
||||||
if recalculate is None:
|
|
||||||
recalculate = []
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(hashes_file, 'rb') as fp:
|
|
||||||
hashes = pickle.load(fp)
|
|
||||||
mtime = os.path.getmtime(hashes_file)
|
|
||||||
except Exception:
|
|
||||||
do_listdir = True
|
|
||||||
force_rewrite = True
|
|
||||||
if do_listdir:
|
|
||||||
for suff in os.listdir(partition_dir):
|
|
||||||
if len(suff) == 3:
|
|
||||||
hashes.setdefault(suff, None)
|
|
||||||
modified = True
|
|
||||||
hashes.update((hash_, None) for hash_ in recalculate)
|
|
||||||
for suffix, hash_ in hashes.items():
|
|
||||||
if not hash_:
|
|
||||||
suffix_dir = join(partition_dir, suffix)
|
|
||||||
try:
|
|
||||||
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
|
|
||||||
hashed += 1
|
|
||||||
except PathNotDir:
|
|
||||||
del hashes[suffix]
|
|
||||||
except OSError:
|
|
||||||
logging.exception(_('Error hashing suffix'))
|
|
||||||
modified = True
|
|
||||||
if modified:
|
|
||||||
with lock_path(partition_dir):
|
|
||||||
if force_rewrite or not os.path.exists(hashes_file) or \
|
|
||||||
os.path.getmtime(hashes_file) == mtime:
|
|
||||||
write_pickle(
|
|
||||||
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
|
||||||
return hashed, hashes
|
|
||||||
return get_hashes(partition_dir, recalculate, do_listdir,
|
|
||||||
reclaim_age)
|
|
||||||
else:
|
|
||||||
return hashed, hashes
|
|
||||||
|
|
||||||
|
|
||||||
def tpool_reraise(func, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
|
|
||||||
"""
|
|
||||||
def inner():
|
|
||||||
try:
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
except BaseException, err:
|
|
||||||
return err
|
|
||||||
resp = tpool.execute(inner)
|
|
||||||
if isinstance(resp, BaseException):
|
|
||||||
raise resp
|
|
||||||
return resp
|
|
||||||
|
|
||||||
|
|
||||||
class ObjectReplicator(Daemon):
|
class ObjectReplicator(Daemon):
|
||||||
"""
|
"""
|
||||||
|
@ -33,13 +33,14 @@ from eventlet import sleep, Timeout, tpool
|
|||||||
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
from swift.common.utils import mkdirs, normalize_timestamp, public, \
|
||||||
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \
|
||||||
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
split_path, drop_buffer_cache, get_logger, write_pickle, \
|
||||||
config_true_value, validate_device_partition, timing_stats
|
config_true_value, validate_device_partition, timing_stats, \
|
||||||
|
tpool_reraise
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.constraints import check_object_creation, check_mount, \
|
from swift.common.constraints import check_object_creation, check_mount, \
|
||||||
check_float, check_utf8
|
check_float, check_utf8
|
||||||
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
||||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
|
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace
|
||||||
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
|
from swift.obj.base import invalidate_hash, \
|
||||||
quarantine_renamer, get_hashes
|
quarantine_renamer, get_hashes
|
||||||
from swift.common.http import is_success
|
from swift.common.http import is_success
|
||||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
|
||||||
|
@ -36,7 +36,7 @@ from StringIO import StringIO
|
|||||||
from functools import partial
|
from functools import partial
|
||||||
from tempfile import TemporaryFile, NamedTemporaryFile
|
from tempfile import TemporaryFile, NamedTemporaryFile
|
||||||
|
|
||||||
from mock import patch
|
from mock import MagicMock, patch
|
||||||
|
|
||||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||||
ConnectionTimeout)
|
ConnectionTimeout)
|
||||||
@ -1308,6 +1308,16 @@ log_name = %(yarr)s'''
|
|||||||
ts = utils.get_trans_id_time('tx1df4ff4f55ea45f7b2ec2-almostright')
|
ts = utils.get_trans_id_time('tx1df4ff4f55ea45f7b2ec2-almostright')
|
||||||
self.assertEquals(ts, None)
|
self.assertEquals(ts, None)
|
||||||
|
|
||||||
|
def test_tpool_reraise(self):
|
||||||
|
with patch.object(utils.tpool, 'execute', lambda f: f()):
|
||||||
|
self.assertTrue(
|
||||||
|
utils.tpool_reraise(MagicMock(return_value='test1')), 'test1')
|
||||||
|
self.assertRaises(Exception,
|
||||||
|
utils.tpool_reraise, MagicMock(side_effect=Exception('test2')))
|
||||||
|
self.assertRaises(BaseException,
|
||||||
|
utils.tpool_reraise,
|
||||||
|
MagicMock(side_effect=BaseException('test3')))
|
||||||
|
|
||||||
|
|
||||||
class TestStatsdLogging(unittest.TestCase):
|
class TestStatsdLogging(unittest.TestCase):
|
||||||
def test_get_logger_statsd_client_not_specified(self):
|
def test_get_logger_statsd_client_not_specified(self):
|
||||||
|
@ -26,7 +26,7 @@ from swift.obj import server as object_server
|
|||||||
from swift.obj.server import DiskFile, write_metadata, DATADIR
|
from swift.obj.server import DiskFile, write_metadata, DATADIR
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||||
storage_directory
|
storage_directory
|
||||||
from swift.obj.replicator import invalidate_hash
|
from swift.obj.base import invalidate_hash
|
||||||
|
|
||||||
|
|
||||||
class TestAuditor(unittest.TestCase):
|
class TestAuditor(unittest.TestCase):
|
||||||
|
@ -29,7 +29,7 @@ from test.unit import FakeLogger, mock
|
|||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
|
||||||
from swift.common import ring
|
from swift.common import ring
|
||||||
from swift.obj import replicator as object_replicator
|
from swift.obj import base as object_base, replicator as object_replicator
|
||||||
from swift.obj.server import DiskFile
|
from swift.obj.server import DiskFile
|
||||||
|
|
||||||
|
|
||||||
@ -241,7 +241,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
|
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
|
||||||
mkdirs(df.datadir)
|
mkdirs(df.datadir)
|
||||||
part = os.path.join(self.objects, '0')
|
part = os.path.join(self.objects, '0')
|
||||||
open(os.path.join(part, object_replicator.HASH_FILE), 'w')
|
open(os.path.join(part, object_base.HASH_FILE), 'w')
|
||||||
# Now the hash file is zero bytes.
|
# Now the hash file is zero bytes.
|
||||||
i = [0]
|
i = [0]
|
||||||
def getmtime(filename):
|
def getmtime(filename):
|
||||||
@ -282,7 +282,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
ohash = hash_path('a', 'c', 'o')
|
ohash = hash_path('a', 'c', 'o')
|
||||||
data_dir = ohash[-3:]
|
data_dir = ohash[-3:]
|
||||||
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
||||||
orig_quarantine_renamer = object_replicator.quarantine_renamer
|
orig_quarantine_renamer = object_base.quarantine_renamer
|
||||||
called = [False]
|
called = [False]
|
||||||
|
|
||||||
def wrapped(*args, **kwargs):
|
def wrapped(*args, **kwargs):
|
||||||
@ -290,10 +290,10 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
return orig_quarantine_renamer(*args, **kwargs)
|
return orig_quarantine_renamer(*args, **kwargs)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
object_replicator.quarantine_renamer = wrapped
|
object_base.quarantine_renamer = wrapped
|
||||||
object_replicator.hash_suffix(whole_path_from, 101)
|
object_base.hash_suffix(whole_path_from, 101)
|
||||||
finally:
|
finally:
|
||||||
object_replicator.quarantine_renamer = orig_quarantine_renamer
|
object_base.quarantine_renamer = orig_quarantine_renamer
|
||||||
self.assertTrue(called[0])
|
self.assertTrue(called[0])
|
||||||
|
|
||||||
def test_hash_suffix_one_file(self):
|
def test_hash_suffix_one_file(self):
|
||||||
@ -307,10 +307,10 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
ohash = hash_path('a', 'c', 'o')
|
ohash = hash_path('a', 'c', 'o')
|
||||||
data_dir = ohash[-3:]
|
data_dir = ohash[-3:]
|
||||||
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
||||||
object_replicator.hash_suffix(whole_path_from, 101)
|
object_base.hash_suffix(whole_path_from, 101)
|
||||||
self.assertEquals(len(os.listdir(self.parts['0'])), 1)
|
self.assertEquals(len(os.listdir(self.parts['0'])), 1)
|
||||||
|
|
||||||
object_replicator.hash_suffix(whole_path_from, 99)
|
object_base.hash_suffix(whole_path_from, 99)
|
||||||
self.assertEquals(len(os.listdir(self.parts['0'])), 0)
|
self.assertEquals(len(os.listdir(self.parts['0'])), 0)
|
||||||
|
|
||||||
def test_hash_suffix_multi_file_one(self):
|
def test_hash_suffix_multi_file_one(self):
|
||||||
@ -330,7 +330,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
hsh_path = os.listdir(whole_path_from)[0]
|
hsh_path = os.listdir(whole_path_from)[0]
|
||||||
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
|
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
|
||||||
|
|
||||||
object_replicator.hash_suffix(whole_path_from, 99)
|
object_base.hash_suffix(whole_path_from, 99)
|
||||||
# only the tombstone should be left
|
# only the tombstone should be left
|
||||||
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
|
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
|
||||||
|
|
||||||
@ -354,7 +354,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
hsh_path = os.listdir(whole_path_from)[0]
|
hsh_path = os.listdir(whole_path_from)[0]
|
||||||
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
|
whole_hsh_path = os.path.join(whole_path_from, hsh_path)
|
||||||
|
|
||||||
object_replicator.hash_suffix(whole_path_from, 99)
|
object_base.hash_suffix(whole_path_from, 99)
|
||||||
# only the meta and data should be left
|
# only the meta and data should be left
|
||||||
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
|
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
|
||||||
|
|
||||||
@ -371,17 +371,17 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
data_dir = ohash[-3:]
|
data_dir = ohash[-3:]
|
||||||
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
||||||
hashes_file = os.path.join(self.objects, '0',
|
hashes_file = os.path.join(self.objects, '0',
|
||||||
object_replicator.HASH_FILE)
|
object_base.HASH_FILE)
|
||||||
# test that non existent file except caught
|
# test that non existent file except caught
|
||||||
self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
|
self.assertEquals(object_base.invalidate_hash(whole_path_from),
|
||||||
None)
|
None)
|
||||||
# test that hashes get cleared
|
# test that hashes get cleared
|
||||||
check_pickle_data = pickle.dumps({data_dir: None},
|
check_pickle_data = pickle.dumps({data_dir: None},
|
||||||
object_replicator.PICKLE_PROTOCOL)
|
object_base.PICKLE_PROTOCOL)
|
||||||
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
|
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
|
||||||
with open(hashes_file, 'wb') as fp:
|
with open(hashes_file, 'wb') as fp:
|
||||||
pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
|
pickle.dump(data_hash, fp, object_base.PICKLE_PROTOCOL)
|
||||||
object_replicator.invalidate_hash(whole_path_from)
|
object_base.invalidate_hash(whole_path_from)
|
||||||
assertFileData(hashes_file, check_pickle_data)
|
assertFileData(hashes_file, check_pickle_data)
|
||||||
|
|
||||||
def test_check_ring(self):
|
def test_check_ring(self):
|
||||||
@ -538,7 +538,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
('2', True), ('3', True)]:
|
('2', True), ('3', True)]:
|
||||||
self.assertEquals(os.access(
|
self.assertEquals(os.access(
|
||||||
os.path.join(self.objects,
|
os.path.join(self.objects,
|
||||||
i, object_replicator.HASH_FILE),
|
i, object_base.HASH_FILE),
|
||||||
os.F_OK), result)
|
os.F_OK), result)
|
||||||
finally:
|
finally:
|
||||||
object_replicator.http_connect = was_connector
|
object_replicator.http_connect = was_connector
|
||||||
|
Loading…
x
Reference in New Issue
Block a user