From 9fe15dd15a4ba25b2bb4d34bd48b3ef53c7d1cbf Mon Sep 17 00:00:00 2001 From: gholt Date: Sat, 11 May 2013 19:51:03 +0000 Subject: [PATCH] Moved some code out of swift.obj.replicator This will be needed in future replication work to avoid circular imports. I used swift.obj.base as the module name just because we seemed to avoid putting code in __init__.py files so far and I didn't want to buck the trend. I would love to see other obj things like *_metadata and DiskFile move into swift.obj.base as well and swift.obj.server just be the WSGI server logic, but I'll leave that for the future. I have changed the tests as little as possible (just the references to where they get the code to test) to show the refactor has not broken anything. I did add a test for tpool_reraise since there was none before. There will be a follow on patch for moving the tests to their new location(s). I figured I'd wait to put the bikes in the shed until everyone's done painting it. Change-Id: I32b4ac88be21eb76c877d3f4cc1e6ac33304835b --- swift/common/utils.py | 17 ++- swift/obj/base.py | 207 ++++++++++++++++++++++++++++++ swift/obj/replicator.py | 209 ++----------------------------- swift/obj/server.py | 5 +- test/unit/common/test_utils.py | 12 +- test/unit/obj/test_auditor.py | 2 +- test/unit/obj/test_replicator.py | 32 ++--- 7 files changed, 261 insertions(+), 223 deletions(-) create mode 100644 swift/obj/base.py diff --git a/swift/common/utils.py b/swift/common/utils.py index 12ab6f447c..8ea9ea4392 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -43,7 +43,7 @@ from urlparse import urlparse as stdlib_urlparse, ParseResult import itertools import eventlet -from eventlet import GreenPool, sleep, Timeout +from eventlet import GreenPool, sleep, Timeout, tpool from eventlet.green import socket, threading import netifaces import codecs @@ -1716,3 +1716,18 @@ class InputProxy(object): raise self.bytes_received += len(line) return line + + +def tpool_reraise(func, *args, **kwargs): + """ + Hack to work around Eventlet's tpool not catching and reraising Timeouts. + """ + def inner(): + try: + return func(*args, **kwargs) + except BaseException, err: + return err + resp = tpool.execute(inner) + if isinstance(resp, BaseException): + raise resp + return resp diff --git a/swift/obj/base.py b/swift/obj/base.py new file mode 100644 index 0000000000..b41511c9b5 --- /dev/null +++ b/swift/obj/base.py @@ -0,0 +1,207 @@ +# Copyright (c) 2010-2013 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""General object server functions.""" + +import cPickle as pickle +import errno +import hashlib +import logging +import os +import time +import uuid +from os.path import basename, dirname, join + +from swift.common.exceptions import PathNotDir +from swift.common.utils import lock_path, renamer, write_pickle + + +PICKLE_PROTOCOL = 2 +ONE_WEEK = 604800 +HASH_FILE = 'hashes.pkl' + + +def quarantine_renamer(device_path, corrupted_file_path): + """ + In the case that a file is corrupted, move it to a quarantined + area to allow replication to fix it. + + :params device_path: The path to the device the corrupted file is on. + :params corrupted_file_path: The path to the file you want quarantined. + + :returns: path (str) of directory the file was moved to + :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY + exceptions from rename + """ + from_dir = dirname(corrupted_file_path) + to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir)) + invalidate_hash(dirname(from_dir)) + try: + renamer(from_dir, to_dir) + except OSError, e: + if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): + raise + to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex) + renamer(from_dir, to_dir) + return to_dir + + +def hash_suffix(path, reclaim_age): + """ + Performs reclamation and returns an md5 of all (remaining) files. + + :param reclaim_age: age in seconds at which to remove tombstones + :raises PathNotDir: if given path is not a valid directory + :raises OSError: for non-ENOTDIR errors + """ + md5 = hashlib.md5() + try: + path_contents = sorted(os.listdir(path)) + except OSError, err: + if err.errno in (errno.ENOTDIR, errno.ENOENT): + raise PathNotDir() + raise + for hsh in path_contents: + hsh_path = join(path, hsh) + try: + files = os.listdir(hsh_path) + except OSError, err: + if err.errno == errno.ENOTDIR: + partition_path = dirname(path) + objects_path = dirname(partition_path) + device_path = dirname(objects_path) + quar_path = quarantine_renamer(device_path, hsh_path) + logging.exception( + _('Quarantined %s to %s because it is not a directory') % + (hsh_path, quar_path)) + continue + raise + if len(files) == 1: + if files[0].endswith('.ts'): + # remove tombstones older than reclaim_age + ts = files[0].rsplit('.', 1)[0] + if (time.time() - float(ts)) > reclaim_age: + os.unlink(join(hsh_path, files[0])) + files.remove(files[0]) + elif files: + files.sort(reverse=True) + meta = data = tomb = None + for filename in list(files): + if not meta and filename.endswith('.meta'): + meta = filename + if not data and filename.endswith('.data'): + data = filename + if not tomb and filename.endswith('.ts'): + tomb = filename + if (filename < tomb or # any file older than tomb + filename < data or # any file older than data + (filename.endswith('.meta') and + filename < meta)): # old meta + os.unlink(join(hsh_path, filename)) + files.remove(filename) + if not files: + os.rmdir(hsh_path) + for filename in files: + md5.update(filename) + try: + os.rmdir(path) + except OSError: + pass + return md5.hexdigest() + + +def invalidate_hash(suffix_dir): + """ + Invalidates the hash for a suffix_dir in the partition's hashes file. + + :param suffix_dir: absolute path to suffix dir whose hash needs + invalidating + """ + + suffix = os.path.basename(suffix_dir) + partition_dir = os.path.dirname(suffix_dir) + hashes_file = join(partition_dir, HASH_FILE) + with lock_path(partition_dir): + try: + with open(hashes_file, 'rb') as fp: + hashes = pickle.load(fp) + if suffix in hashes and not hashes[suffix]: + return + except Exception: + return + hashes[suffix] = None + write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) + + +def get_hashes(partition_dir, recalculate=None, do_listdir=False, + reclaim_age=ONE_WEEK): + """ + Get a list of hashes for the suffix dir. do_listdir causes it to mistrust + the hash cache for suffix existence at the (unexpectedly high) cost of a + listdir. reclaim_age is just passed on to hash_suffix. + + :param partition_dir: absolute path of partition to get hashes for + :param recalculate: list of suffixes which should be recalculated when got + :param do_listdir: force existence check for all hashes in the partition + :param reclaim_age: age at which to remove tombstones + + :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) + """ + + hashed = 0 + hashes_file = join(partition_dir, HASH_FILE) + modified = False + force_rewrite = False + hashes = {} + mtime = -1 + + if recalculate is None: + recalculate = [] + + try: + with open(hashes_file, 'rb') as fp: + hashes = pickle.load(fp) + mtime = os.path.getmtime(hashes_file) + except Exception: + do_listdir = True + force_rewrite = True + if do_listdir: + for suff in os.listdir(partition_dir): + if len(suff) == 3: + hashes.setdefault(suff, None) + modified = True + hashes.update((hash_, None) for hash_ in recalculate) + for suffix, hash_ in hashes.items(): + if not hash_: + suffix_dir = join(partition_dir, suffix) + try: + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) + hashed += 1 + except PathNotDir: + del hashes[suffix] + except OSError: + logging.exception(_('Error hashing suffix')) + modified = True + if modified: + with lock_path(partition_dir): + if force_rewrite or not os.path.exists(hashes_file) or \ + os.path.getmtime(hashes_file) == mtime: + write_pickle( + hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) + return hashed, hashes + return get_hashes(partition_dir, recalculate, do_listdir, + reclaim_age) + else: + return hashed, hashes diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 00fb29a8fe..17c9f6e9a8 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -14,16 +14,12 @@ # limitations under the License. import os -from os.path import basename, dirname, isdir, isfile, join +from os.path import isdir, isfile, join import random import shutil import time -import logging -import hashlib import itertools import cPickle as pickle -import errno -import uuid import eventlet from eventlet import GreenPool, tpool, Timeout, sleep, hubs @@ -31,209 +27,18 @@ from eventlet.green import subprocess from eventlet.support.greenlets import GreenletExit from swift.common.ring import Ring -from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ - compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \ - rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub +from swift.common.utils import whataremyips, unlink_older_than, \ + compute_eta, get_logger, dump_recon_cache, \ + rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \ + tpool_reraise from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE -from swift.common.exceptions import PathNotDir +from swift.obj.base import get_hashes + hubs.use_hub(get_hub()) -PICKLE_PROTOCOL = 2 -ONE_WEEK = 604800 -HASH_FILE = 'hashes.pkl' - - -def quarantine_renamer(device_path, corrupted_file_path): - """ - In the case that a file is corrupted, move it to a quarantined - area to allow replication to fix it. - - :params device_path: The path to the device the corrupted file is on. - :params corrupted_file_path: The path to the file you want quarantined. - - :returns: path (str) of directory the file was moved to - :raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY - exceptions from rename - """ - from_dir = dirname(corrupted_file_path) - to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir)) - invalidate_hash(dirname(from_dir)) - try: - renamer(from_dir, to_dir) - except OSError, e: - if e.errno not in (errno.EEXIST, errno.ENOTEMPTY): - raise - to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex) - renamer(from_dir, to_dir) - return to_dir - - -def hash_suffix(path, reclaim_age): - """ - Performs reclamation and returns an md5 of all (remaining) files. - - :param reclaim_age: age in seconds at which to remove tombstones - :raises PathNotDir: if given path is not a valid directory - :raises OSError: for non-ENOTDIR errors - """ - md5 = hashlib.md5() - try: - path_contents = sorted(os.listdir(path)) - except OSError, err: - if err.errno in (errno.ENOTDIR, errno.ENOENT): - raise PathNotDir() - raise - for hsh in path_contents: - hsh_path = join(path, hsh) - try: - files = os.listdir(hsh_path) - except OSError, err: - if err.errno == errno.ENOTDIR: - partition_path = dirname(path) - objects_path = dirname(partition_path) - device_path = dirname(objects_path) - quar_path = quarantine_renamer(device_path, hsh_path) - logging.exception( - _('Quarantined %s to %s because it is not a directory') % - (hsh_path, quar_path)) - continue - raise - if len(files) == 1: - if files[0].endswith('.ts'): - # remove tombstones older than reclaim_age - ts = files[0].rsplit('.', 1)[0] - if (time.time() - float(ts)) > reclaim_age: - os.unlink(join(hsh_path, files[0])) - files.remove(files[0]) - elif files: - files.sort(reverse=True) - meta = data = tomb = None - for filename in list(files): - if not meta and filename.endswith('.meta'): - meta = filename - if not data and filename.endswith('.data'): - data = filename - if not tomb and filename.endswith('.ts'): - tomb = filename - if (filename < tomb or # any file older than tomb - filename < data or # any file older than data - (filename.endswith('.meta') and - filename < meta)): # old meta - os.unlink(join(hsh_path, filename)) - files.remove(filename) - if not files: - os.rmdir(hsh_path) - for filename in files: - md5.update(filename) - try: - os.rmdir(path) - except OSError: - pass - return md5.hexdigest() - - -def invalidate_hash(suffix_dir): - """ - Invalidates the hash for a suffix_dir in the partition's hashes file. - - :param suffix_dir: absolute path to suffix dir whose hash needs - invalidating - """ - - suffix = os.path.basename(suffix_dir) - partition_dir = os.path.dirname(suffix_dir) - hashes_file = join(partition_dir, HASH_FILE) - with lock_path(partition_dir): - try: - with open(hashes_file, 'rb') as fp: - hashes = pickle.load(fp) - if suffix in hashes and not hashes[suffix]: - return - except Exception: - return - hashes[suffix] = None - write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) - - -def get_hashes(partition_dir, recalculate=None, do_listdir=False, - reclaim_age=ONE_WEEK): - """ - Get a list of hashes for the suffix dir. do_listdir causes it to mistrust - the hash cache for suffix existence at the (unexpectedly high) cost of a - listdir. reclaim_age is just passed on to hash_suffix. - - :param partition_dir: absolute path of partition to get hashes for - :param recalculate: list of suffixes which should be recalculated when got - :param do_listdir: force existence check for all hashes in the partition - :param reclaim_age: age at which to remove tombstones - - :returns: tuple of (number of suffix dirs hashed, dictionary of hashes) - """ - - hashed = 0 - hashes_file = join(partition_dir, HASH_FILE) - modified = False - force_rewrite = False - hashes = {} - mtime = -1 - - if recalculate is None: - recalculate = [] - - try: - with open(hashes_file, 'rb') as fp: - hashes = pickle.load(fp) - mtime = os.path.getmtime(hashes_file) - except Exception: - do_listdir = True - force_rewrite = True - if do_listdir: - for suff in os.listdir(partition_dir): - if len(suff) == 3: - hashes.setdefault(suff, None) - modified = True - hashes.update((hash_, None) for hash_ in recalculate) - for suffix, hash_ in hashes.items(): - if not hash_: - suffix_dir = join(partition_dir, suffix) - try: - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) - hashed += 1 - except PathNotDir: - del hashes[suffix] - except OSError: - logging.exception(_('Error hashing suffix')) - modified = True - if modified: - with lock_path(partition_dir): - if force_rewrite or not os.path.exists(hashes_file) or \ - os.path.getmtime(hashes_file) == mtime: - write_pickle( - hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) - return hashed, hashes - return get_hashes(partition_dir, recalculate, do_listdir, - reclaim_age) - else: - return hashed, hashes - - -def tpool_reraise(func, *args, **kwargs): - """ - Hack to work around Eventlet's tpool not catching and reraising Timeouts. - """ - def inner(): - try: - return func(*args, **kwargs) - except BaseException, err: - return err - resp = tpool.execute(inner) - if isinstance(resp, BaseException): - raise resp - return resp - class ObjectReplicator(Daemon): """ diff --git a/swift/obj/server.py b/swift/obj/server.py index 946c8bb484..76ad8b5370 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -33,13 +33,14 @@ from eventlet import sleep, Timeout, tpool from swift.common.utils import mkdirs, normalize_timestamp, public, \ storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \ split_path, drop_buffer_cache, get_logger, write_pickle, \ - config_true_value, validate_device_partition, timing_stats + config_true_value, validate_device_partition, timing_stats, \ + tpool_reraise from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ DiskFileNotExist, DiskFileCollision, DiskFileNoSpace -from swift.obj.replicator import tpool_reraise, invalidate_hash, \ +from swift.obj.base import invalidate_hash, \ quarantine_renamer, get_hashes from swift.common.http import is_success from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 0f5a0eb1bf..3db2c7e4bf 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -36,7 +36,7 @@ from StringIO import StringIO from functools import partial from tempfile import TemporaryFile, NamedTemporaryFile -from mock import patch +from mock import MagicMock, patch from swift.common.exceptions import (Timeout, MessageTimeout, ConnectionTimeout) @@ -1308,6 +1308,16 @@ log_name = %(yarr)s''' ts = utils.get_trans_id_time('tx1df4ff4f55ea45f7b2ec2-almostright') self.assertEquals(ts, None) + def test_tpool_reraise(self): + with patch.object(utils.tpool, 'execute', lambda f: f()): + self.assertTrue( + utils.tpool_reraise(MagicMock(return_value='test1')), 'test1') + self.assertRaises(Exception, + utils.tpool_reraise, MagicMock(side_effect=Exception('test2'))) + self.assertRaises(BaseException, + utils.tpool_reraise, + MagicMock(side_effect=BaseException('test3'))) + class TestStatsdLogging(unittest.TestCase): def test_get_logger_statsd_client_not_specified(self): diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index db80063007..1a9277377b 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -26,7 +26,7 @@ from swift.obj import server as object_server from swift.obj.server import DiskFile, write_metadata, DATADIR from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory -from swift.obj.replicator import invalidate_hash +from swift.obj.base import invalidate_hash class TestAuditor(unittest.TestCase): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index d0ad1030a3..709355f303 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -29,7 +29,7 @@ from test.unit import FakeLogger, mock from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring -from swift.obj import replicator as object_replicator +from swift.obj import base as object_base, replicator as object_replicator from swift.obj.server import DiskFile @@ -241,7 +241,7 @@ class TestObjectReplicator(unittest.TestCase): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(df.datadir) part = os.path.join(self.objects, '0') - open(os.path.join(part, object_replicator.HASH_FILE), 'w') + open(os.path.join(part, object_base.HASH_FILE), 'w') # Now the hash file is zero bytes. i = [0] def getmtime(filename): @@ -282,7 +282,7 @@ class TestObjectReplicator(unittest.TestCase): ohash = hash_path('a', 'c', 'o') data_dir = ohash[-3:] whole_path_from = os.path.join(self.objects, '0', data_dir) - orig_quarantine_renamer = object_replicator.quarantine_renamer + orig_quarantine_renamer = object_base.quarantine_renamer called = [False] def wrapped(*args, **kwargs): @@ -290,10 +290,10 @@ class TestObjectReplicator(unittest.TestCase): return orig_quarantine_renamer(*args, **kwargs) try: - object_replicator.quarantine_renamer = wrapped - object_replicator.hash_suffix(whole_path_from, 101) + object_base.quarantine_renamer = wrapped + object_base.hash_suffix(whole_path_from, 101) finally: - object_replicator.quarantine_renamer = orig_quarantine_renamer + object_base.quarantine_renamer = orig_quarantine_renamer self.assertTrue(called[0]) def test_hash_suffix_one_file(self): @@ -307,10 +307,10 @@ class TestObjectReplicator(unittest.TestCase): ohash = hash_path('a', 'c', 'o') data_dir = ohash[-3:] whole_path_from = os.path.join(self.objects, '0', data_dir) - object_replicator.hash_suffix(whole_path_from, 101) + object_base.hash_suffix(whole_path_from, 101) self.assertEquals(len(os.listdir(self.parts['0'])), 1) - object_replicator.hash_suffix(whole_path_from, 99) + object_base.hash_suffix(whole_path_from, 99) self.assertEquals(len(os.listdir(self.parts['0'])), 0) def test_hash_suffix_multi_file_one(self): @@ -330,7 +330,7 @@ class TestObjectReplicator(unittest.TestCase): hsh_path = os.listdir(whole_path_from)[0] whole_hsh_path = os.path.join(whole_path_from, hsh_path) - object_replicator.hash_suffix(whole_path_from, 99) + object_base.hash_suffix(whole_path_from, 99) # only the tombstone should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 1) @@ -354,7 +354,7 @@ class TestObjectReplicator(unittest.TestCase): hsh_path = os.listdir(whole_path_from)[0] whole_hsh_path = os.path.join(whole_path_from, hsh_path) - object_replicator.hash_suffix(whole_path_from, 99) + object_base.hash_suffix(whole_path_from, 99) # only the meta and data should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 2) @@ -371,17 +371,17 @@ class TestObjectReplicator(unittest.TestCase): data_dir = ohash[-3:] whole_path_from = os.path.join(self.objects, '0', data_dir) hashes_file = os.path.join(self.objects, '0', - object_replicator.HASH_FILE) + object_base.HASH_FILE) # test that non existent file except caught - self.assertEquals(object_replicator.invalidate_hash(whole_path_from), + self.assertEquals(object_base.invalidate_hash(whole_path_from), None) # test that hashes get cleared check_pickle_data = pickle.dumps({data_dir: None}, - object_replicator.PICKLE_PROTOCOL) + object_base.PICKLE_PROTOCOL) for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]: with open(hashes_file, 'wb') as fp: - pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL) - object_replicator.invalidate_hash(whole_path_from) + pickle.dump(data_hash, fp, object_base.PICKLE_PROTOCOL) + object_base.invalidate_hash(whole_path_from) assertFileData(hashes_file, check_pickle_data) def test_check_ring(self): @@ -538,7 +538,7 @@ class TestObjectReplicator(unittest.TestCase): ('2', True), ('3', True)]: self.assertEquals(os.access( os.path.join(self.objects, - i, object_replicator.HASH_FILE), + i, object_base.HASH_FILE), os.F_OK), result) finally: object_replicator.http_connect = was_connector