diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 46cf37a020..2c1e52ab8e 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -166,50 +166,59 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False, hashed = 0 hashes_file = join(partition_dir, HASH_FILE) - with lock_path(partition_dir): - modified = False - hashes = {} - try: - with open(hashes_file, 'rb') as fp: - hashes = pickle.load(fp) - except Exception: - do_listdir = True - if do_listdir: - hashes = dict(((suff, hashes.get(suff, None)) - for suff in os.listdir(partition_dir) - if len(suff) == 3 and isdir(join(partition_dir, suff)))) + modified = False + hashes = {} + mtime = -1 + try: + with open(hashes_file, 'rb') as fp: + hashes = pickle.load(fp) + mtime = os.path.getmtime(hashes_file) + except Exception: + do_listdir = True + if do_listdir: + for suff in os.listdir(partition_dir): + if len(suff) == 3 and isdir(join(partition_dir, suff)): + hashes.setdefault(suff, None) + modified = True + hashes.update((hash_, None) for hash_ in recalculate) + for suffix, hash_ in hashes.items(): + if not hash_: + suffix_dir = join(partition_dir, suffix) + if isdir(suffix_dir): + try: + hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) + hashed += 1 + except OSError: + logging.exception(_('Error hashing suffix')) + else: + del hashes[suffix] modified = True - for hash_ in recalculate: - hashes[hash_] = None - for suffix, hash_ in hashes.items(): - if not hash_: - suffix_dir = join(partition_dir, suffix) - if os.path.exists(suffix_dir): - try: - hashes[suffix] = hash_suffix(suffix_dir, reclaim_age) - hashed += 1 - except OSError: - logging.exception(_('Error hashing suffix')) - hashes[suffix] = None - else: - del hashes[suffix] - modified = True - sleep() - if modified: - write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) + if modified: + with lock_path(partition_dir): + if not os.path.exists(hashes_file) or \ + os.path.getmtime(hashes_file) == mtime: + write_pickle( + hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) + return hashed, hashes + return get_hashes(partition_dir, recalculate, do_listdir, + reclaim_age) + else: return hashed, hashes -def tpooled_get_hashes(*args, **kwargs): +def tpool_reraise(func, *args, **kwargs): """ Hack to work around Eventlet's tpool not catching and reraising Timeouts. - We return the Timeout, Timeout if it's raised, the caller looks for it - and reraises it if found. """ - try: - return get_hashes(*args, **kwargs) - except Timeout, err: - return err, err + def inner(): + try: + return func(*args, **kwargs) + except BaseException, err: + return err + resp = tpool.execute(inner) + if isinstance(resp, BaseException): + raise resp + return resp class ObjectReplicator(Daemon): @@ -392,12 +401,9 @@ class ObjectReplicator(Daemon): self.logger.increment('partition.update.count.%s' % (job['device'],)) begin = time.time() try: - hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'], + hashed, local_hash = tpool_reraise(get_hashes, job['path'], do_listdir=(self.replication_count % 10) == 0, reclaim_age=self.reclaim_age) - # See tpooled_get_hashes "Hack". - if isinstance(hashed, BaseException): - raise hashed self.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) attempts_left = len(job['nodes']) @@ -428,12 +434,9 @@ class ObjectReplicator(Daemon): local_hash[suffix] != remote_hash.get(suffix, -1)] if not suffixes: continue - hashed, recalc_hash = tpool.execute(tpooled_get_hashes, + hashed, recalc_hash = tpool_reraise(get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) - # See tpooled_get_hashes "Hack". - if isinstance(hashed, BaseException): - raise hashed self.logger.update_stats('suffix.hashes', hashed) local_hash = recalc_hash suffixes = [suffix for suffix in local_hash if diff --git a/swift/obj/server.py b/swift/obj/server.py index 4f5be86f9e..d9d96958cc 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -44,8 +44,8 @@ from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ DiskFileNotExist -from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \ - quarantine_renamer +from swift.obj.replicator import tpool_reraise, invalidate_hash, \ + quarantine_renamer, get_hashes from swift.common.http import is_success, HTTPInsufficientStorage, \ HTTPClientDisconnect @@ -865,12 +865,7 @@ class ObjectController(object): if not os.path.exists(path): mkdirs(path) suffixes = suffix.split('-') if suffix else [] - _junk, hashes = tpool.execute(tpooled_get_hashes, path, - recalculate=suffixes) - # See tpooled_get_hashes "Hack". - if isinstance(hashes, BaseException): - self.logger.increment('REPLICATE.errors') - raise hashes + _junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes) self.logger.timing_since('REPLICATE.timing', start_time) return Response(body=pickle.dumps(hashes)) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 640639a2b5..b6a98f2f84 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -234,3 +234,25 @@ class MockTrue(object): def __ne__(self, other): return other is not True + + +@contextmanager +def mock(update): + returns = [] + deletes = [] + for key, value in update.items(): + imports = key.split('.') + attr = imports.pop(-1) + module = __import__(imports[0], fromlist=imports[1:]) + for modname in imports[1:]: + module = getattr(module, modname) + if hasattr(module, attr): + returns.append((module, attr, getattr(module, attr))) + else: + deletes.append((module, attr)) + setattr(module, attr, value) + yield True + for module, attr, value in returns: + setattr(module, attr, value) + for module, attr in deletes: + delattr(module, attr) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 2205f861de..823c321748 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -27,7 +27,7 @@ import tempfile from contextlib import contextmanager from eventlet.green import subprocess from eventlet import Timeout, tpool -from test.unit import FakeLogger +from test.unit import FakeLogger, mock from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp from swift.common import ring @@ -209,6 +209,41 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(hashed, 1) self.assert_('a83' in hashes) + def test_get_hashes_unmodified(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) + mkdirs(df.datadir) + with open(os.path.join(df.datadir, normalize_timestamp( + time.time()) + '.ts'), 'wb') as f: + f.write('1234567890') + part = os.path.join(self.objects, '0') + hashed, hashes = object_replicator.get_hashes(part) + i = [0] + def getmtime(filename): + i[0] += 1 + return 1 + with mock({'os.path.getmtime': getmtime}): + hashed, hashes = object_replicator.get_hashes( + part, recalculate=['a83']) + self.assertEquals(i[0], 2) + + def test_get_hashes_modified(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) + mkdirs(df.datadir) + with open(os.path.join(df.datadir, normalize_timestamp( + time.time()) + '.ts'), 'wb') as f: + f.write('1234567890') + part = os.path.join(self.objects, '0') + hashed, hashes = object_replicator.get_hashes(part) + i = [0] + def getmtime(filename): + if i[0] < 3: + i[0] += 1 + return i[0] + with mock({'os.path.getmtime': getmtime}): + hashed, hashes = object_replicator.get_hashes( + part, recalculate=['a83']) + self.assertEquals(i[0], 3) + def test_hash_suffix_hash_dir_is_file_quarantine(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger()) mkdirs(os.path.dirname(df.datadir)) diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index ecb88f5a0e..7c8358b880 100644 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -17,8 +17,6 @@ import cPickle as pickle import os -import sys -import shutil import unittest from shutil import rmtree from StringIO import StringIO @@ -37,7 +35,6 @@ from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ NullLogger, storage_directory from swift.common.exceptions import DiskFileNotExist -from swift.obj import replicator from eventlet import tpool @@ -2124,13 +2121,11 @@ class TestObjectController(unittest.TestCase): def fake_get_hashes(*args, **kwargs): return 0, {1: 2} - def my_tpool_execute(*args, **kwargs): - func = args[0] - args = args[1:] + def my_tpool_execute(func, *args, **kwargs): return func(*args, **kwargs) - was_get_hashes = replicator.get_hashes - replicator.get_hashes = fake_get_hashes + was_get_hashes = object_server.get_hashes + object_server.get_hashes = fake_get_hashes was_tpool_exe = tpool.execute tpool.execute = my_tpool_execute try: @@ -2143,20 +2138,18 @@ class TestObjectController(unittest.TestCase): self.assertEquals(p_data, {1: 2}) finally: tpool.execute = was_tpool_exe - replicator.get_hashes = was_get_hashes + object_server.get_hashes = was_get_hashes def test_REPLICATE_timeout(self): def fake_get_hashes(*args, **kwargs): raise Timeout() - def my_tpool_execute(*args, **kwargs): - func = args[0] - args = args[1:] + def my_tpool_execute(func, *args, **kwargs): return func(*args, **kwargs) - was_get_hashes = replicator.get_hashes - replicator.get_hashes = fake_get_hashes + was_get_hashes = object_server.get_hashes + object_server.get_hashes = fake_get_hashes was_tpool_exe = tpool.execute tpool.execute = my_tpool_execute try: @@ -2166,7 +2159,7 @@ class TestObjectController(unittest.TestCase): self.assertRaises(Timeout, self.object_controller.REPLICATE, req) finally: tpool.execute = was_tpool_exe - replicator.get_hashes = was_get_hashes + object_server.get_hashes = was_get_hashes if __name__ == '__main__': unittest.main()