Merge "make obj replicator locking more optimistic"

This commit is contained in:
Jenkins 2012-08-23 22:06:35 +00:00 committed by Gerrit Code Review
commit 3f01f889d5
5 changed files with 117 additions and 69 deletions

View File

@ -166,50 +166,59 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False,
hashed = 0
hashes_file = join(partition_dir, HASH_FILE)
with lock_path(partition_dir):
modified = False
hashes = {}
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
except Exception:
do_listdir = True
if do_listdir:
hashes = dict(((suff, hashes.get(suff, None))
for suff in os.listdir(partition_dir)
if len(suff) == 3 and isdir(join(partition_dir, suff))))
modified = False
hashes = {}
mtime = -1
try:
with open(hashes_file, 'rb') as fp:
hashes = pickle.load(fp)
mtime = os.path.getmtime(hashes_file)
except Exception:
do_listdir = True
if do_listdir:
for suff in os.listdir(partition_dir):
if len(suff) == 3 and isdir(join(partition_dir, suff)):
hashes.setdefault(suff, None)
modified = True
hashes.update((hash_, None) for hash_ in recalculate)
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = join(partition_dir, suffix)
if isdir(suffix_dir):
try:
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
hashed += 1
except OSError:
logging.exception(_('Error hashing suffix'))
else:
del hashes[suffix]
modified = True
for hash_ in recalculate:
hashes[hash_] = None
for suffix, hash_ in hashes.items():
if not hash_:
suffix_dir = join(partition_dir, suffix)
if os.path.exists(suffix_dir):
try:
hashes[suffix] = hash_suffix(suffix_dir, reclaim_age)
hashed += 1
except OSError:
logging.exception(_('Error hashing suffix'))
hashes[suffix] = None
else:
del hashes[suffix]
modified = True
sleep()
if modified:
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
if modified:
with lock_path(partition_dir):
if not os.path.exists(hashes_file) or \
os.path.getmtime(hashes_file) == mtime:
write_pickle(
hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
return hashed, hashes
return get_hashes(partition_dir, recalculate, do_listdir,
reclaim_age)
else:
return hashed, hashes
def tpooled_get_hashes(*args, **kwargs):
def tpool_reraise(func, *args, **kwargs):
"""
Hack to work around Eventlet's tpool not catching and reraising Timeouts.
We return the Timeout, Timeout if it's raised, the caller looks for it
and reraises it if found.
"""
try:
return get_hashes(*args, **kwargs)
except Timeout, err:
return err, err
def inner():
try:
return func(*args, **kwargs)
except BaseException, err:
return err
resp = tpool.execute(inner)
if isinstance(resp, BaseException):
raise resp
return resp
class ObjectReplicator(Daemon):
@ -392,12 +401,9 @@ class ObjectReplicator(Daemon):
self.logger.increment('partition.update.count.%s' % (job['device'],))
begin = time.time()
try:
hashed, local_hash = tpool.execute(tpooled_get_hashes, job['path'],
hashed, local_hash = tpool_reraise(get_hashes, job['path'],
do_listdir=(self.replication_count % 10) == 0,
reclaim_age=self.reclaim_age)
# See tpooled_get_hashes "Hack".
if isinstance(hashed, BaseException):
raise hashed
self.suffix_hash += hashed
self.logger.update_stats('suffix.hashes', hashed)
attempts_left = len(job['nodes'])
@ -428,12 +434,9 @@ class ObjectReplicator(Daemon):
local_hash[suffix] != remote_hash.get(suffix, -1)]
if not suffixes:
continue
hashed, recalc_hash = tpool.execute(tpooled_get_hashes,
hashed, recalc_hash = tpool_reraise(get_hashes,
job['path'], recalculate=suffixes,
reclaim_age=self.reclaim_age)
# See tpooled_get_hashes "Hack".
if isinstance(hashed, BaseException):
raise hashed
self.logger.update_stats('suffix.hashes', hashed)
local_hash = recalc_hash
suffixes = [suffix for suffix in local_hash if

View File

@ -44,8 +44,8 @@ from swift.common.constraints import check_object_creation, check_mount, \
check_float, check_utf8
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
DiskFileNotExist
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \
quarantine_renamer
from swift.obj.replicator import tpool_reraise, invalidate_hash, \
quarantine_renamer, get_hashes
from swift.common.http import is_success, HTTPInsufficientStorage, \
HTTPClientDisconnect
@ -865,12 +865,7 @@ class ObjectController(object):
if not os.path.exists(path):
mkdirs(path)
suffixes = suffix.split('-') if suffix else []
_junk, hashes = tpool.execute(tpooled_get_hashes, path,
recalculate=suffixes)
# See tpooled_get_hashes "Hack".
if isinstance(hashes, BaseException):
self.logger.increment('REPLICATE.errors')
raise hashes
_junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes)
self.logger.timing_since('REPLICATE.timing', start_time)
return Response(body=pickle.dumps(hashes))

View File

@ -234,3 +234,25 @@ class MockTrue(object):
def __ne__(self, other):
return other is not True
@contextmanager
def mock(update):
returns = []
deletes = []
for key, value in update.items():
imports = key.split('.')
attr = imports.pop(-1)
module = __import__(imports[0], fromlist=imports[1:])
for modname in imports[1:]:
module = getattr(module, modname)
if hasattr(module, attr):
returns.append((module, attr, getattr(module, attr)))
else:
deletes.append((module, attr))
setattr(module, attr, value)
yield True
for module, attr, value in returns:
setattr(module, attr, value)
for module, attr in deletes:
delattr(module, attr)

View File

@ -27,7 +27,7 @@ import tempfile
from contextlib import contextmanager
from eventlet.green import subprocess
from eventlet import Timeout, tpool
from test.unit import FakeLogger
from test.unit import FakeLogger, mock
from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp
from swift.common import ring
@ -209,6 +209,41 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(hashed, 1)
self.assert_('a83' in hashes)
def test_get_hashes_unmodified(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
with open(os.path.join(df.datadir, normalize_timestamp(
time.time()) + '.ts'), 'wb') as f:
f.write('1234567890')
part = os.path.join(self.objects, '0')
hashed, hashes = object_replicator.get_hashes(part)
i = [0]
def getmtime(filename):
i[0] += 1
return 1
with mock({'os.path.getmtime': getmtime}):
hashed, hashes = object_replicator.get_hashes(
part, recalculate=['a83'])
self.assertEquals(i[0], 2)
def test_get_hashes_modified(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
with open(os.path.join(df.datadir, normalize_timestamp(
time.time()) + '.ts'), 'wb') as f:
f.write('1234567890')
part = os.path.join(self.objects, '0')
hashed, hashes = object_replicator.get_hashes(part)
i = [0]
def getmtime(filename):
if i[0] < 3:
i[0] += 1
return i[0]
with mock({'os.path.getmtime': getmtime}):
hashed, hashes = object_replicator.get_hashes(
part, recalculate=['a83'])
self.assertEquals(i[0], 3)
def test_hash_suffix_hash_dir_is_file_quarantine(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
mkdirs(os.path.dirname(df.datadir))

View File

@ -17,8 +17,6 @@
import cPickle as pickle
import os
import sys
import shutil
import unittest
from shutil import rmtree
from StringIO import StringIO
@ -37,7 +35,6 @@ from swift.common import utils
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
NullLogger, storage_directory
from swift.common.exceptions import DiskFileNotExist
from swift.obj import replicator
from eventlet import tpool
@ -2124,13 +2121,11 @@ class TestObjectController(unittest.TestCase):
def fake_get_hashes(*args, **kwargs):
return 0, {1: 2}
def my_tpool_execute(*args, **kwargs):
func = args[0]
args = args[1:]
def my_tpool_execute(func, *args, **kwargs):
return func(*args, **kwargs)
was_get_hashes = replicator.get_hashes
replicator.get_hashes = fake_get_hashes
was_get_hashes = object_server.get_hashes
object_server.get_hashes = fake_get_hashes
was_tpool_exe = tpool.execute
tpool.execute = my_tpool_execute
try:
@ -2143,20 +2138,18 @@ class TestObjectController(unittest.TestCase):
self.assertEquals(p_data, {1: 2})
finally:
tpool.execute = was_tpool_exe
replicator.get_hashes = was_get_hashes
object_server.get_hashes = was_get_hashes
def test_REPLICATE_timeout(self):
def fake_get_hashes(*args, **kwargs):
raise Timeout()
def my_tpool_execute(*args, **kwargs):
func = args[0]
args = args[1:]
def my_tpool_execute(func, *args, **kwargs):
return func(*args, **kwargs)
was_get_hashes = replicator.get_hashes
replicator.get_hashes = fake_get_hashes
was_get_hashes = object_server.get_hashes
object_server.get_hashes = fake_get_hashes
was_tpool_exe = tpool.execute
tpool.execute = my_tpool_execute
try:
@ -2166,7 +2159,7 @@ class TestObjectController(unittest.TestCase):
self.assertRaises(Timeout, self.object_controller.REPLICATE, req)
finally:
tpool.execute = was_tpool_exe
replicator.get_hashes = was_get_hashes
object_server.get_hashes = was_get_hashes
if __name__ == '__main__':
unittest.main()