More quarantine catching code. This should catch and quarantine any databases that indicate malformation or corruption with the known raised exceptions and it should catch and quarantine the case we've seen where hash directories become files.
This commit is contained in:
commit
9ca33c8690
@ -22,6 +22,7 @@ import logging
|
|||||||
import operator
|
import operator
|
||||||
import os
|
import os
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
import sys
|
||||||
import time
|
import time
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
import errno
|
import errno
|
||||||
@ -256,12 +257,46 @@ class DatabaseBroker(object):
|
|||||||
self._delete_db(conn, timestamp)
|
self._delete_db(conn, timestamp)
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
|
def possibly_quarantine(self, exc_type, exc_value, exc_traceback):
|
||||||
|
"""
|
||||||
|
Checks the exception info to see if it indicates a quarantine situation
|
||||||
|
(malformed or corrupted database). If not, the original exception will
|
||||||
|
be reraised. If so, the database will be quarantined and a new
|
||||||
|
sqlite3.DatabaseError will be raised indicating the action taken.
|
||||||
|
"""
|
||||||
|
if 'database disk image is malformed' in str(exc_value):
|
||||||
|
exc_hint = 'malformed'
|
||||||
|
elif 'file is encrypted or is not a database' in str(exc_value):
|
||||||
|
exc_hint = 'corrupted'
|
||||||
|
else:
|
||||||
|
raise exc_type, exc_value, exc_traceback
|
||||||
|
prefix_path = os.path.dirname(self.db_dir)
|
||||||
|
partition_path = os.path.dirname(prefix_path)
|
||||||
|
dbs_path = os.path.dirname(partition_path)
|
||||||
|
device_path = os.path.dirname(dbs_path)
|
||||||
|
quar_path = os.path.join(device_path, 'quarantined', self.db_type,
|
||||||
|
os.path.basename(self.db_dir))
|
||||||
|
try:
|
||||||
|
renamer(self.db_dir, quar_path)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||||
|
raise
|
||||||
|
quar_path = "%s-%s" % (quar_path, uuid4().hex)
|
||||||
|
renamer(self.db_dir, quar_path)
|
||||||
|
detail = _('Quarantined %s to %s due to %s database') % \
|
||||||
|
(self.db_dir, quar_path, exc_hint)
|
||||||
|
self.logger.error(detail)
|
||||||
|
raise sqlite3.DatabaseError(detail)
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
def get(self):
|
def get(self):
|
||||||
"""Use with the "with" statement; returns a database connection."""
|
"""Use with the "with" statement; returns a database connection."""
|
||||||
if not self.conn:
|
if not self.conn:
|
||||||
if self.db_file != ':memory:' and os.path.exists(self.db_file):
|
if self.db_file != ':memory:' and os.path.exists(self.db_file):
|
||||||
self.conn = get_db_connection(self.db_file, self.timeout)
|
try:
|
||||||
|
self.conn = get_db_connection(self.db_file, self.timeout)
|
||||||
|
except (sqlite3.DatabaseError, DatabaseConnectionError):
|
||||||
|
self.possibly_quarantine(*sys.exc_info())
|
||||||
else:
|
else:
|
||||||
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
|
raise DatabaseConnectionError(self.db_file, "DB doesn't exist")
|
||||||
conn = self.conn
|
conn = self.conn
|
||||||
@ -270,6 +305,12 @@ class DatabaseBroker(object):
|
|||||||
yield conn
|
yield conn
|
||||||
conn.rollback()
|
conn.rollback()
|
||||||
self.conn = conn
|
self.conn = conn
|
||||||
|
except sqlite3.DatabaseError, err:
|
||||||
|
try:
|
||||||
|
conn.close()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
self.possibly_quarantine(*sys.exc_info())
|
||||||
except Exception:
|
except Exception:
|
||||||
conn.close()
|
conn.close()
|
||||||
raise
|
raise
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from os.path import isdir, join
|
from os.path import basename, dirname, isdir, join
|
||||||
import random
|
import random
|
||||||
import shutil
|
import shutil
|
||||||
import time
|
import time
|
||||||
@ -22,6 +22,8 @@ import logging
|
|||||||
import hashlib
|
import hashlib
|
||||||
import itertools
|
import itertools
|
||||||
import cPickle as pickle
|
import cPickle as pickle
|
||||||
|
import errno
|
||||||
|
import uuid
|
||||||
|
|
||||||
import eventlet
|
import eventlet
|
||||||
from eventlet import GreenPool, tpool, Timeout, sleep, hubs
|
from eventlet import GreenPool, tpool, Timeout, sleep, hubs
|
||||||
@ -30,7 +32,7 @@ from eventlet.support.greenlets import GreenletExit
|
|||||||
|
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
|
from swift.common.utils import whataremyips, unlink_older_than, lock_path, \
|
||||||
compute_eta, get_logger, write_pickle
|
compute_eta, get_logger, write_pickle, renamer
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
|
|
||||||
@ -41,6 +43,31 @@ ONE_WEEK = 604800
|
|||||||
HASH_FILE = 'hashes.pkl'
|
HASH_FILE = 'hashes.pkl'
|
||||||
|
|
||||||
|
|
||||||
|
def quarantine_renamer(device_path, corrupted_file_path):
|
||||||
|
"""
|
||||||
|
In the case that a file is corrupted, move it to a quarantined
|
||||||
|
area to allow replication to fix it.
|
||||||
|
|
||||||
|
:params device_path: The path to the device the corrupted file is on.
|
||||||
|
:params corrupted_file_path: The path to the file you want quarantined.
|
||||||
|
|
||||||
|
:returns: path (str) of directory the file was moved to
|
||||||
|
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
|
||||||
|
exceptions from rename
|
||||||
|
"""
|
||||||
|
from_dir = dirname(corrupted_file_path)
|
||||||
|
to_dir = join(device_path, 'quarantined', 'objects', basename(from_dir))
|
||||||
|
invalidate_hash(dirname(from_dir))
|
||||||
|
try:
|
||||||
|
renamer(from_dir, to_dir)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
||||||
|
raise
|
||||||
|
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
||||||
|
renamer(from_dir, to_dir)
|
||||||
|
return to_dir
|
||||||
|
|
||||||
|
|
||||||
def hash_suffix(path, reclaim_age):
|
def hash_suffix(path, reclaim_age):
|
||||||
"""
|
"""
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
@ -50,7 +77,19 @@ def hash_suffix(path, reclaim_age):
|
|||||||
md5 = hashlib.md5()
|
md5 = hashlib.md5()
|
||||||
for hsh in sorted(os.listdir(path)):
|
for hsh in sorted(os.listdir(path)):
|
||||||
hsh_path = join(path, hsh)
|
hsh_path = join(path, hsh)
|
||||||
files = os.listdir(hsh_path)
|
try:
|
||||||
|
files = os.listdir(hsh_path)
|
||||||
|
except OSError, err:
|
||||||
|
if err.errno == errno.ENOTDIR:
|
||||||
|
partition_path = dirname(path)
|
||||||
|
objects_path = dirname(partition_path)
|
||||||
|
device_path = dirname(objects_path)
|
||||||
|
quar_path = quarantine_renamer(device_path, hsh_path)
|
||||||
|
logging.exception(
|
||||||
|
_('Quarantined %s to %s because it is not a directory') %
|
||||||
|
(hsh_path, quar_path))
|
||||||
|
continue
|
||||||
|
raise
|
||||||
if len(files) == 1:
|
if len(files) == 1:
|
||||||
if files[0].endswith('.ts'):
|
if files[0].endswith('.ts'):
|
||||||
# remove tombstones older than reclaim_age
|
# remove tombstones older than reclaim_age
|
||||||
|
@ -21,7 +21,6 @@ import errno
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
import uuid
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
from tempfile import mkstemp
|
from tempfile import mkstemp
|
||||||
@ -44,7 +43,8 @@ from swift.common.constraints import check_object_creation, check_mount, \
|
|||||||
check_float, check_utf8
|
check_float, check_utf8
|
||||||
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
|
||||||
DiskFileNotExist
|
DiskFileNotExist
|
||||||
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash
|
from swift.obj.replicator import tpooled_get_hashes, invalidate_hash, \
|
||||||
|
quarantine_renamer
|
||||||
|
|
||||||
|
|
||||||
DATADIR = 'objects'
|
DATADIR = 'objects'
|
||||||
@ -91,32 +91,6 @@ def write_metadata(fd, metadata):
|
|||||||
key += 1
|
key += 1
|
||||||
|
|
||||||
|
|
||||||
def quarantine_renamer(device_path, corrupted_file_path):
|
|
||||||
"""
|
|
||||||
In the case that a file is corrupted, move it to a quarantined
|
|
||||||
area to allow replication to fix it.
|
|
||||||
|
|
||||||
:params device_path: The path to the device the corrupted file is on.
|
|
||||||
:params corrupted_file_path: The path to the file you want quarantined.
|
|
||||||
|
|
||||||
:returns: path (str) of directory the file was moved to
|
|
||||||
:raises OSError: re-raises non errno.EEXIST / errno.ENOTEMPTY
|
|
||||||
exceptions from rename
|
|
||||||
"""
|
|
||||||
from_dir = os.path.dirname(corrupted_file_path)
|
|
||||||
to_dir = os.path.join(device_path, 'quarantined',
|
|
||||||
'objects', os.path.basename(from_dir))
|
|
||||||
invalidate_hash(os.path.dirname(from_dir))
|
|
||||||
try:
|
|
||||||
renamer(from_dir, to_dir)
|
|
||||||
except OSError, e:
|
|
||||||
if e.errno not in (errno.EEXIST, errno.ENOTEMPTY):
|
|
||||||
raise
|
|
||||||
to_dir = "%s-%s" % (to_dir, uuid.uuid4().hex)
|
|
||||||
renamer(from_dir, to_dir)
|
|
||||||
return to_dir
|
|
||||||
|
|
||||||
|
|
||||||
class DiskFile(object):
|
class DiskFile(object):
|
||||||
"""
|
"""
|
||||||
Manage object files on disk.
|
Manage object files on disk.
|
||||||
|
BIN
test/unit/common/corrupted_example.db
Normal file
BIN
test/unit/common/corrupted_example.db
Normal file
Binary file not shown.
BIN
test/unit/common/malformed_example.db
Normal file
BIN
test/unit/common/malformed_example.db
Normal file
Binary file not shown.
@ -19,7 +19,7 @@ from __future__ import with_statement
|
|||||||
import hashlib
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
from shutil import rmtree
|
from shutil import rmtree, copy
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
@ -27,6 +27,7 @@ from uuid import uuid4
|
|||||||
import simplejson
|
import simplejson
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
|
||||||
|
import swift.common.db
|
||||||
from swift.common.db import AccountBroker, chexor, ContainerBroker, \
|
from swift.common.db import AccountBroker, chexor, ContainerBroker, \
|
||||||
DatabaseBroker, DatabaseConnectionError, dict_factory, get_db_connection
|
DatabaseBroker, DatabaseConnectionError, dict_factory, get_db_connection
|
||||||
from swift.common.utils import normalize_timestamp
|
from swift.common.utils import normalize_timestamp
|
||||||
@ -199,6 +200,47 @@ class TestDatabaseBroker(unittest.TestCase):
|
|||||||
with broker.get() as conn:
|
with broker.get() as conn:
|
||||||
self.assertEquals(
|
self.assertEquals(
|
||||||
[r[0] for r in conn.execute('SELECT * FROM test')], ['1'])
|
[r[0] for r in conn.execute('SELECT * FROM test')], ['1'])
|
||||||
|
orig_renamer = swift.common.db.renamer
|
||||||
|
try:
|
||||||
|
swift.common.db.renamer = lambda a, b: b
|
||||||
|
qpath = os.path.dirname(os.path.dirname(os.path.dirname(
|
||||||
|
os.path.dirname(self.testdir))))
|
||||||
|
if qpath:
|
||||||
|
qpath += '/quarantined/test/db'
|
||||||
|
else:
|
||||||
|
qpath = 'quarantined/test/db'
|
||||||
|
# Test malformed database
|
||||||
|
copy(os.path.join(os.path.dirname(__file__),
|
||||||
|
'malformed_example.db'),
|
||||||
|
os.path.join(self.testdir, '1.db'))
|
||||||
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
||||||
|
broker.db_type = 'test'
|
||||||
|
exc = None
|
||||||
|
try:
|
||||||
|
with broker.get() as conn:
|
||||||
|
conn.execute('SELECT * FROM test')
|
||||||
|
except Exception, err:
|
||||||
|
exc = err
|
||||||
|
self.assertEquals(str(exc),
|
||||||
|
'Quarantined %s to %s due to malformed database' %
|
||||||
|
(self.testdir, qpath))
|
||||||
|
# Test corrupted database
|
||||||
|
copy(os.path.join(os.path.dirname(__file__),
|
||||||
|
'corrupted_example.db'),
|
||||||
|
os.path.join(self.testdir, '1.db'))
|
||||||
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'))
|
||||||
|
broker.db_type = 'test'
|
||||||
|
exc = None
|
||||||
|
try:
|
||||||
|
with broker.get() as conn:
|
||||||
|
conn.execute('SELECT * FROM test')
|
||||||
|
except Exception, err:
|
||||||
|
exc = err
|
||||||
|
self.assertEquals(str(exc),
|
||||||
|
'Quarantined %s to %s due to corrupted database' %
|
||||||
|
(self.testdir, qpath))
|
||||||
|
finally:
|
||||||
|
swift.common.db.renamer = orig_renamer
|
||||||
|
|
||||||
def test_lock(self):
|
def test_lock(self):
|
||||||
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'), timeout=.1)
|
broker = DatabaseBroker(os.path.join(self.testdir, '1.db'), timeout=.1)
|
||||||
|
@ -205,6 +205,27 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
self.assertEquals(hashed, 1)
|
self.assertEquals(hashed, 1)
|
||||||
self.assert_('a83' in hashes)
|
self.assert_('a83' in hashes)
|
||||||
|
|
||||||
|
def test_hash_suffix_hash_dir_is_file_quarantine(self):
|
||||||
|
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
|
||||||
|
mkdirs(os.path.dirname(df.datadir))
|
||||||
|
open(df.datadir, 'wb').close()
|
||||||
|
ohash = hash_path('a', 'c', 'o')
|
||||||
|
data_dir = ohash[-3:]
|
||||||
|
whole_path_from = os.path.join(self.objects, '0', data_dir)
|
||||||
|
orig_quarantine_renamer = object_replicator.quarantine_renamer
|
||||||
|
called = [False]
|
||||||
|
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
called[0] = True
|
||||||
|
return orig_quarantine_renamer(*args, **kwargs)
|
||||||
|
|
||||||
|
try:
|
||||||
|
object_replicator.quarantine_renamer = wrapped
|
||||||
|
object_replicator.hash_suffix(whole_path_from, 101)
|
||||||
|
finally:
|
||||||
|
object_replicator.quarantine_renamer = orig_quarantine_renamer
|
||||||
|
self.assertTrue(called[0])
|
||||||
|
|
||||||
def test_hash_suffix_one_file(self):
|
def test_hash_suffix_one_file(self):
|
||||||
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
|
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o', FakeLogger())
|
||||||
mkdirs(df.datadir)
|
mkdirs(df.datadir)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user