Merge "pep8 db.py"

This commit is contained in:
Jenkins 2012-11-26 23:59:00 +00:00 committed by Gerrit Code Review
commit 2ad23a25e8

View File

@ -32,7 +32,7 @@ from eventlet import sleep, Timeout
import sqlite3 import sqlite3
from swift.common.utils import json, normalize_timestamp, renamer, \ from swift.common.utils import json, normalize_timestamp, renamer, \
mkdirs, lock_parent_directory, fallocate mkdirs, lock_parent_directory, fallocate
from swift.common.exceptions import LockTimeout from swift.common.exceptions import LockTimeout
@ -60,7 +60,7 @@ class DatabaseConnectionError(sqlite3.DatabaseError):
def __str__(self): def __str__(self):
return 'DB connection error (%s, %s):\n%s' % ( return 'DB connection error (%s, %s):\n%s' % (
self.path, self.timeout, self.msg) self.path, self.timeout, self.msg)
class GreenDBConnection(sqlite3.Connection): class GreenDBConnection(sqlite3.Connection):
@ -84,7 +84,7 @@ class GreenDBConnection(sqlite3.Connection):
def execute(self, *args, **kwargs): def execute(self, *args, **kwargs):
return self._timeout(lambda: sqlite3.Connection.execute( return self._timeout(lambda: sqlite3.Connection.execute(
self, *args, **kwargs)) self, *args, **kwargs))
def commit(self): def commit(self):
return self._timeout(lambda: sqlite3.Connection.commit(self)) return self._timeout(lambda: sqlite3.Connection.commit(self))
@ -131,14 +131,14 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
try: try:
connect_time = time.time() connect_time = time.time()
conn = sqlite3.connect(path, check_same_thread=False, conn = sqlite3.connect(path, check_same_thread=False,
factory=GreenDBConnection, timeout=timeout) factory=GreenDBConnection, timeout=timeout)
if path != ':memory:' and not okay_to_create: if path != ':memory:' and not okay_to_create:
# attempt to detect and fail when connect creates the db file # attempt to detect and fail when connect creates the db file
stat = os.stat(path) stat = os.stat(path)
if stat.st_size == 0 and stat.st_ctime >= connect_time: if stat.st_size == 0 and stat.st_ctime >= connect_time:
os.unlink(path) os.unlink(path)
raise DatabaseConnectionError(path, raise DatabaseConnectionError(path,
'DB file created by connect?') 'DB file created by connect?')
conn.row_factory = sqlite3.Row conn.row_factory = sqlite3.Row
conn.text_factory = str conn.text_factory = str
conn.execute('PRAGMA synchronous = NORMAL') conn.execute('PRAGMA synchronous = NORMAL')
@ -149,7 +149,7 @@ def get_db_connection(path, timeout=30, okay_to_create=False):
except sqlite3.DatabaseError: except sqlite3.DatabaseError:
import traceback import traceback
raise DatabaseConnectionError(path, traceback.format_exc(), raise DatabaseConnectionError(path, traceback.format_exc(),
timeout=timeout) timeout=timeout)
return conn return conn
@ -186,7 +186,7 @@ class DatabaseBroker(object):
fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir) fd, tmp_db_file = mkstemp(suffix='.tmp', dir=self.db_dir)
os.close(fd) os.close(fd)
conn = sqlite3.connect(tmp_db_file, check_same_thread=False, conn = sqlite3.connect(tmp_db_file, check_same_thread=False,
factory=GreenDBConnection, timeout=0) factory=GreenDBConnection, timeout=0)
# creating dbs implicitly does a lot of transactions, so we # creating dbs implicitly does a lot of transactions, so we
# pick fast, unsafe options here and do a big fsync at the end. # pick fast, unsafe options here and do a big fsync at the end.
conn.execute('PRAGMA synchronous = OFF') conn.execute('PRAGMA synchronous = OFF')
@ -243,8 +243,9 @@ class DatabaseBroker(object):
if os.path.exists(self.db_file): if os.path.exists(self.db_file):
# It's as if there was a "condition" where different parts # It's as if there was a "condition" where different parts
# of the system were "racing" each other. # of the system were "racing" each other.
raise DatabaseConnectionError(self.db_file, raise DatabaseConnectionError(
'DB created by someone else while working?') self.db_file,
'DB created by someone else while working?')
renamer(tmp_db_file, self.db_file) renamer(tmp_db_file, self.db_file)
self.conn = get_db_connection(self.db_file, self.timeout) self.conn = get_db_connection(self.db_file, self.timeout)
else: else:
@ -285,7 +286,8 @@ class DatabaseBroker(object):
dbs_path = os.path.dirname(partition_path) dbs_path = os.path.dirname(partition_path)
device_path = os.path.dirname(dbs_path) device_path = os.path.dirname(dbs_path)
quar_path = os.path.join(device_path, 'quarantined', quar_path = os.path.join(device_path, 'quarantined',
self.db_type + 's', os.path.basename(self.db_dir)) self.db_type + 's',
os.path.basename(self.db_dir))
try: try:
renamer(self.db_dir, quar_path) renamer(self.db_dir, quar_path)
except OSError, e: except OSError, e:
@ -294,7 +296,7 @@ class DatabaseBroker(object):
quar_path = "%s-%s" % (quar_path, uuid4().hex) quar_path = "%s-%s" % (quar_path, uuid4().hex)
renamer(self.db_dir, quar_path) renamer(self.db_dir, quar_path)
detail = _('Quarantined %s to %s due to %s database') % \ detail = _('Quarantined %s to %s due to %s database') % \
(self.db_dir, quar_path, exc_hint) (self.db_dir, quar_path, exc_hint)
self.logger.error(detail) self.logger.error(detail)
raise sqlite3.DatabaseError(detail) raise sqlite3.DatabaseError(detail)
@ -498,13 +500,13 @@ class DatabaseBroker(object):
INSERT INTO %s_sync (sync_point, remote_id) INSERT INTO %s_sync (sync_point, remote_id)
VALUES (?, ?) VALUES (?, ?)
''' % ('incoming' if incoming else 'outgoing'), ''' % ('incoming' if incoming else 'outgoing'),
(rec['sync_point'], rec['remote_id'])) (rec['sync_point'], rec['remote_id']))
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
conn.execute(''' conn.execute('''
UPDATE %s_sync SET sync_point=max(?, sync_point) UPDATE %s_sync SET sync_point=max(?, sync_point)
WHERE remote_id=? WHERE remote_id=?
''' % ('incoming' if incoming else 'outgoing'), ''' % ('incoming' if incoming else 'outgoing'),
(rec['sync_point'], rec['remote_id'])) (rec['sync_point'], rec['remote_id']))
conn.commit() conn.commit()
def _preallocate(self): def _preallocate(self):
@ -787,7 +789,7 @@ class ContainerBroker(DatabaseBroker):
raise raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT object_count from container_stat').fetchone() 'SELECT object_count from container_stat').fetchone()
return (row[0] == 0) return (row[0] == 0)
def _commit_puts(self, item_list=None): def _commit_puts(self, item_list=None):
@ -808,10 +810,12 @@ class ContainerBroker(DatabaseBroker):
try: try:
(name, timestamp, size, content_type, etag, (name, timestamp, size, content_type, etag,
deleted) = pickle.loads(entry.decode('base64')) deleted) = pickle.loads(entry.decode('base64'))
item_list.append({'name': name, 'created_at': item_list.append({'name': name,
timestamp, 'size': size, 'content_type': 'created_at': timestamp,
content_type, 'etag': etag, 'size': size,
'deleted': deleted}) 'content_type': content_type,
'etag': etag,
'deleted': deleted})
except Exception: except Exception:
self.logger.exception( self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'), _('Invalid pending entry %(file)s: %(entry)s'),
@ -1277,7 +1281,7 @@ class AccountBroker(DatabaseBroker):
UPDATE account_stat SET account = ?, created_at = ?, id = ?, UPDATE account_stat SET account = ?, created_at = ?, id = ?,
put_timestamp = ? put_timestamp = ?
''', (self.account, normalize_timestamp(time.time()), str(uuid4()), ''', (self.account, normalize_timestamp(time.time()), str(uuid4()),
put_timestamp)) put_timestamp))
def get_db_version(self, conn): def get_db_version(self, conn):
if self._db_version == -1: if self._db_version == -1:
@ -1332,14 +1336,15 @@ class AccountBroker(DatabaseBroker):
if entry: if entry:
try: try:
(name, put_timestamp, delete_timestamp, (name, put_timestamp, delete_timestamp,
object_count, bytes_used, deleted) = \ object_count, bytes_used, deleted) = \
pickle.loads(entry.decode('base64')) pickle.loads(entry.decode('base64'))
item_list.append({'name': name, item_list.append(
'put_timestamp': put_timestamp, {'name': name,
'delete_timestamp': delete_timestamp, 'put_timestamp': put_timestamp,
'object_count': object_count, 'delete_timestamp': delete_timestamp,
'bytes_used': bytes_used, 'object_count': object_count,
'deleted': deleted}) 'bytes_used': bytes_used,
'deleted': deleted})
except Exception: except Exception:
self.logger.exception( self.logger.exception(
_('Invalid pending entry %(file)s: %(entry)s'), _('Invalid pending entry %(file)s: %(entry)s'),
@ -1365,7 +1370,7 @@ class AccountBroker(DatabaseBroker):
raise raise
with self.get() as conn: with self.get() as conn:
row = conn.execute( row = conn.execute(
'SELECT container_count from account_stat').fetchone() 'SELECT container_count from account_stat').fetchone()
return (row[0] == 0) return (row[0] == 0)
def reclaim(self, container_timestamp, sync_timestamp): def reclaim(self, container_timestamp, sync_timestamp):
@ -1419,7 +1424,7 @@ class AccountBroker(DatabaseBroker):
ret = conn.execute(''' ret = conn.execute('''
SELECT put_timestamp FROM container SELECT put_timestamp FROM container
WHERE name = ? AND deleted != 1''', WHERE name = ? AND deleted != 1''',
(container_name,)).fetchone() (container_name,)).fetchone()
if ret: if ret:
ret = ret[0] ret = ret[0]
return ret return ret
@ -1513,8 +1518,8 @@ class AccountBroker(DatabaseBroker):
SELECT put_timestamp, delete_timestamp, container_count, status SELECT put_timestamp, delete_timestamp, container_count, status
FROM account_stat''').fetchone() FROM account_stat''').fetchone()
return row['status'] == 'DELETED' or ( return row['status'] == 'DELETED' or (
row['container_count'] in (None, '', 0, '0') and row['container_count'] in (None, '', 0, '0') and
row['delete_timestamp'] > row['put_timestamp']) row['delete_timestamp'] > row['put_timestamp'])
def is_status_deleted(self): def is_status_deleted(self):
"""Only returns true if the status field is set to DELETED.""" """Only returns true if the status field is set to DELETED."""