Sync common db code from Oslo
This sync contains commit ``ce69e7f Don't store engine instances in oslo.db `` which removed global engine from oslo.db code. So, added code for work with sqla engines and sessions to Glance - get_engine() and get_session() functions in glance.db.sqlalchemy.api module. Also this remove database slave connection and tpool features, because they was removed from oslo code and seeps to be unused in Glance. Full list of changes: 7959826 db: move all options into database group dda24eb Introduce mysql_sql_mode option, remove old warning 0b5af67 Introduce a method to set any MySQL session SQL mode 8dccc7b Handle ibm_db_sa DBDuplicateEntry integrity errors 0f24d82 Fix migration.db_version when no tables ac84a40 Update log translation domains c0d357b Add model_query() to db.sqlalchemy.utils module 84254fc Fix a small typo in api.py b8a676c Remove CONF.database.connection default value 86707cd Remove None for dict.get() 0545121 Fix duplicating of SQL queries in logs fcf517d Update oslo log messages with translation domains fa05b7c Restore the ability to load the DB backend lazily 630d395 Don't use cfg.CONF in oslo.db ce69e7f Don't store engine instances in oslo.db 35dc1d7 py3kcompat: remove b4f72b2 Don't raise MySQL 2013 'Lost connection' errors 271adfb Format sql in db.sqlalchemy.session docstring 0334cb3 Handle exception messages with six.text_type eff69ce Drop dependency on log from oslo db code 7a11a04 Automatic retry db.api query if db connection lost 11f2add Clean up docstring in db.sqlalchemy.session 1b5147f Only enable MySQL TRADITIONAL mode if we're running against MySQL 39e1c5c Move db tests base.py to common code 986dafd Fix parsing of UC errors in sqlite 3.7.16+/3.8.2+ 9a203e6 Use dialect rather than a particular DB API driver 1779029 Move helper DB functions to db.sqlalchemy.utils bcf6d5e Small edits on help strings ae01e9a Transition from migrate to alembic 70ebb19 Fix mocking of utcnow() for model datetime cols 7aa94df Add a db check for CHARSET=utf8 aff0171 Remove "vim: tabstop=4 shiftwidth=4 softtabstop=4" from headers fa0f36f Fix database connection string is secret 8575d87 Removed copyright from empty files d08d27f Fix the obsolete exception message 8b2b0b7 Use hacking import_exceptions for gettextutils._ 9bc593e Add docstring for exception handlers of session 855644a Removal of _REPOSITORY global variable. ea6caf9 Remove string.lowercase usage a33989e Remove eventlet tpool from common db.api e40903b Database hook enabling traditional mode at MySQL f2115a0 Replace xrange in for loop with range c802fa6 SQLAlchemy error patterns improved 1c1f199 Remove unused import 97d8cf4 Remove lazy loading of database backend Co-authored-by: Zhi Yan Liu <zhiyanl@cn.ibm.com> Related to blueprint db-use-oslo-common-code Change-Id: Ia73abba8309ccc2ad10a0f636b410984c6b6e5d8
This commit is contained in:
parent
01903933db
commit
99f4ad8172
@ -43,12 +43,13 @@ from oslo.config import cfg
|
||||
from glance.common import config
|
||||
from glance.common import exception
|
||||
from glance.db import migration as db_migration
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
from glance.openstack.common.db.sqlalchemy import migration
|
||||
from glance.openstack.common import log
|
||||
from glance.openstack.common import strutils
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_group("database", "glance.openstack.common.db.sqlalchemy.session")
|
||||
CONF.import_group("database", "glance.openstack.common.db.options")
|
||||
|
||||
|
||||
# Decorators for actions
|
||||
@ -67,23 +68,30 @@ class DbCommands(object):
|
||||
|
||||
def version(self):
|
||||
"""Print database's current migration level"""
|
||||
print(migration.db_version(db_migration.MIGRATE_REPO_PATH,
|
||||
print(migration.db_version(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
db_migration.INIT_VERSION))
|
||||
|
||||
@args('--version', metavar='<version>', help='Database version')
|
||||
def upgrade(self, version=None):
|
||||
"""Upgrade the database's migration level"""
|
||||
migration.db_sync(db_migration.MIGRATE_REPO_PATH, version)
|
||||
migration.db_sync(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
version)
|
||||
|
||||
@args('--version', metavar='<version>', help='Database version')
|
||||
def downgrade(self, version=None):
|
||||
"""Downgrade the database's migration level"""
|
||||
migration.db_sync(db_migration.MIGRATE_REPO_PATH, version)
|
||||
migration.db_sync(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
version)
|
||||
|
||||
@args('--version', metavar='<version>', help='Database version')
|
||||
def version_control(self, version=None):
|
||||
"""Place a database under migration control"""
|
||||
migration.db_version_control(db_migration.MIGRATE_REPO_PATH, version)
|
||||
migration.db_version_control(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
version)
|
||||
|
||||
@args('--version', metavar='<version>', help='Database version')
|
||||
@args('--current_version', metavar='<version>',
|
||||
@ -94,9 +102,12 @@ class DbCommands(object):
|
||||
creating first if necessary.
|
||||
"""
|
||||
if current_version is not None:
|
||||
migration.db_version_control(db_migration.MIGRATE_REPO_PATH,
|
||||
migration.db_version_control(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
current_version)
|
||||
migration.db_sync(db_migration.MIGRATE_REPO_PATH, version)
|
||||
migration.db_sync(db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH,
|
||||
version)
|
||||
|
||||
|
||||
class DbLegacyCommands(object):
|
||||
|
@ -22,14 +22,14 @@
|
||||
import os
|
||||
|
||||
from glance.common import utils
|
||||
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
|
||||
IMPL = utils.LazyPluggable(
|
||||
'backend',
|
||||
config_group='database',
|
||||
sqlalchemy='glance.openstack.common.db.sqlalchemy.migration')
|
||||
|
||||
INIT_VERSION = 000
|
||||
INIT_VERSION = 0
|
||||
|
||||
MIGRATE_REPO_PATH = os.path.join(
|
||||
os.path.abspath(os.path.dirname(__file__)),
|
||||
@ -38,12 +38,9 @@ MIGRATE_REPO_PATH = os.path.join(
|
||||
)
|
||||
|
||||
|
||||
def db_sync(version=None):
|
||||
def db_sync(version=None, init_version=0):
|
||||
"""Migrate the database to `version` or the most recent version."""
|
||||
return IMPL.db_sync(abs_path=MIGRATE_REPO_PATH, version=version)
|
||||
|
||||
|
||||
def db_version():
|
||||
"""Display the current database version."""
|
||||
return IMPL.db_version(abs_path=MIGRATE_REPO_PATH,
|
||||
init_version=INIT_VERSION)
|
||||
return IMPL.db_sync(engine=db_api.get_engine(),
|
||||
abs_path=MIGRATE_REPO_PATH,
|
||||
version=version,
|
||||
init_version=init_version)
|
||||
|
@ -44,13 +44,39 @@ STATUSES = ['active', 'saving', 'queued', 'killed', 'pending_delete',
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('debug', 'glance.openstack.common.log')
|
||||
CONF.import_opt('connection', 'glance.openstack.common.db.options',
|
||||
group='database')
|
||||
|
||||
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _create_facade_lazily():
|
||||
global _FACADE
|
||||
if _FACADE is None:
|
||||
_FACADE = session.EngineFacade(
|
||||
CONF.database.connection,
|
||||
**dict(CONF.database.iteritems()))
|
||||
return _FACADE
|
||||
|
||||
|
||||
def get_engine():
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_engine()
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False):
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_session(autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
|
||||
|
||||
def clear_db_env():
|
||||
"""
|
||||
Unset global configuration variables for database.
|
||||
"""
|
||||
session.cleanup()
|
||||
global _FACADE
|
||||
_FACADE = None
|
||||
|
||||
|
||||
def _check_mutate_authorization(context, image_ref):
|
||||
@ -65,10 +91,6 @@ def _check_mutate_authorization(context, image_ref):
|
||||
raise exc_class(msg)
|
||||
|
||||
|
||||
_get_session = session.get_session
|
||||
get_engine = session.get_engine
|
||||
|
||||
|
||||
def image_create(context, values):
|
||||
"""Create an image from the values dictionary."""
|
||||
return _image_update(context, values, None, purge_props=False)
|
||||
@ -87,7 +109,7 @@ def image_update(context, image_id, values, purge_props=False,
|
||||
|
||||
def image_destroy(context, image_id):
|
||||
"""Destroy the image or raise if it does not exist."""
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
image_ref = _image_get(context, image_id, session=session)
|
||||
|
||||
@ -141,7 +163,7 @@ def _check_image_id(image_id):
|
||||
def _image_get(context, image_id, session=None, force_show_deleted=False):
|
||||
"""Get an image or raise if it does not exist."""
|
||||
_check_image_id(image_id)
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
|
||||
try:
|
||||
query = session.query(models.Image)\
|
||||
@ -411,7 +433,7 @@ def _make_image_property_condition(key, value):
|
||||
|
||||
def _select_images_query(context, image_conditions, admin_as_user,
|
||||
member_status, visibility):
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
|
||||
img_conditional_clause = sa_sql.and_(*image_conditions)
|
||||
|
||||
@ -593,7 +615,7 @@ def _image_update(context, values, image_id, purge_props=False,
|
||||
#NOTE(jbresnah) values is altered in this so a copy is needed
|
||||
values = values.copy()
|
||||
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
|
||||
# Remove the properties passed in the values mapping. We
|
||||
@ -767,7 +789,7 @@ def _image_child_entry_delete_all(child_model_cls, image_id, delete_time=None,
|
||||
:rtype: int
|
||||
:return: The number of child entries got soft-deleted.
|
||||
"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
|
||||
query = session.query(child_model_cls) \
|
||||
.filter_by(image_id=image_id) \
|
||||
@ -801,7 +823,7 @@ def image_property_delete(context, prop_ref, image_ref, session=None):
|
||||
"""
|
||||
Used internally by image_property_create and image_property_update.
|
||||
"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
prop = session.query(models.ImageProperty).filter_by(image_id=image_ref,
|
||||
name=prop_ref).one()
|
||||
prop.delete(session=session)
|
||||
@ -840,7 +862,7 @@ def _image_member_format(member_ref):
|
||||
|
||||
def image_member_update(context, memb_id, values):
|
||||
"""Update an ImageMember object."""
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
memb_ref = _image_member_get(context, memb_id, session)
|
||||
_image_member_update(context, memb_ref, values, session)
|
||||
return _image_member_format(memb_ref)
|
||||
@ -858,7 +880,7 @@ def _image_member_update(context, memb_ref, values, session=None):
|
||||
|
||||
def image_member_delete(context, memb_id, session=None):
|
||||
"""Delete an ImageMember object."""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
member_ref = _image_member_get(context, memb_id, session)
|
||||
_image_member_delete(context, member_ref, session)
|
||||
|
||||
@ -890,7 +912,7 @@ def image_member_find(context, image_id=None, member=None, status=None):
|
||||
:param image_id: identifier of image entity
|
||||
:param member: tenant to which membership has been granted
|
||||
"""
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
members = _image_member_find(context, session, image_id, member, status)
|
||||
return [_image_member_format(m) for m in members]
|
||||
|
||||
@ -923,7 +945,7 @@ def image_member_count(context, image_id):
|
||||
|
||||
:param image_id: identifier of image entity
|
||||
"""
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
|
||||
if not image_id:
|
||||
msg = _("Image id is required.")
|
||||
@ -950,7 +972,7 @@ def _can_show_deleted(context):
|
||||
|
||||
|
||||
def image_tag_set_all(context, image_id, tags):
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
existing_tags = set(image_tag_get_all(context, image_id, session))
|
||||
tags = set(tags)
|
||||
|
||||
@ -968,7 +990,7 @@ def image_tag_set_all(context, image_id, tags):
|
||||
|
||||
def image_tag_create(context, image_id, value, session=None):
|
||||
"""Create an image tag."""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
tag_ref = models.ImageTag(image_id=image_id, value=value)
|
||||
tag_ref.save(session=session)
|
||||
return tag_ref['value']
|
||||
@ -977,7 +999,7 @@ def image_tag_create(context, image_id, value, session=None):
|
||||
def image_tag_delete(context, image_id, value, session=None):
|
||||
"""Delete an image tag."""
|
||||
_check_image_id(image_id)
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
query = session.query(models.ImageTag)\
|
||||
.filter_by(image_id=image_id)\
|
||||
.filter_by(value=value)\
|
||||
@ -1002,7 +1024,7 @@ def _image_tag_delete_all(context, image_id, delete_time=None, session=None):
|
||||
def image_tag_get_all(context, image_id, session=None):
|
||||
"""Get a list of tags for a specific image."""
|
||||
_check_image_id(image_id)
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
tags = session.query(models.ImageTag)\
|
||||
.filter_by(image_id=image_id)\
|
||||
.filter_by(deleted=False)\
|
||||
@ -1013,7 +1035,7 @@ def image_tag_get_all(context, image_id, session=None):
|
||||
|
||||
def user_get_storage_usage(context, owner_id, image_id=None, session=None):
|
||||
_check_image_id(image_id)
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
total_size = _image_get_disk_usage_by_owner(
|
||||
owner_id, session, image_id=image_id)
|
||||
return total_size
|
||||
@ -1033,7 +1055,7 @@ def _task_info_format(task_info_ref):
|
||||
|
||||
def _task_info_create(context, task_id, values, session=None):
|
||||
"""Create an TaskInfo object"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
task_info_ref = models.TaskInfo()
|
||||
task_info_ref.task_id = task_id
|
||||
task_info_ref.update(values)
|
||||
@ -1043,7 +1065,7 @@ def _task_info_create(context, task_id, values, session=None):
|
||||
|
||||
def _task_info_update(context, task_id, values, session=None):
|
||||
"""Update an TaskInfo object"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
task_info_ref = _task_info_get(context, task_id, session=session)
|
||||
if task_info_ref:
|
||||
task_info_ref.update(values)
|
||||
@ -1053,7 +1075,7 @@ def _task_info_update(context, task_id, values, session=None):
|
||||
|
||||
def _task_info_get(context, task_id, session=None):
|
||||
"""Fetch an TaskInfo entity by task_id"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
query = session.query(models.TaskInfo)
|
||||
query = query.filter_by(task_id=task_id)
|
||||
try:
|
||||
@ -1071,7 +1093,7 @@ def task_create(context, values, session=None):
|
||||
"""Create a task object"""
|
||||
|
||||
values = values.copy()
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
with session.begin():
|
||||
task_info_values = _pop_task_info_values(values)
|
||||
|
||||
@ -1099,7 +1121,7 @@ def _pop_task_info_values(values):
|
||||
def task_update(context, task_id, values, session=None):
|
||||
"""Update a task object"""
|
||||
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
|
||||
with session.begin():
|
||||
task_info_values = _pop_task_info_values(values)
|
||||
@ -1129,7 +1151,7 @@ def task_get(context, task_id, session=None, force_show_deleted=False):
|
||||
|
||||
def task_delete(context, task_id, session=None):
|
||||
"""Delete a task"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
task_ref = _task_get(context, task_id, session=session)
|
||||
task_ref.delete(session=session)
|
||||
return _task_format(task_ref, task_ref.info)
|
||||
@ -1152,7 +1174,7 @@ def task_get_all(context, filters=None, marker=None, limit=None,
|
||||
"""
|
||||
filters = filters or {}
|
||||
|
||||
session = _get_session()
|
||||
session = get_session()
|
||||
query = session.query(models.Task)
|
||||
|
||||
if not (context.is_admin or admin_as_user == True) and \
|
||||
@ -1215,7 +1237,7 @@ def _is_task_visible(context, task):
|
||||
|
||||
def _task_get(context, task_id, session=None, force_show_deleted=False):
|
||||
"""Fetch a task entity by id"""
|
||||
session = session or _get_session()
|
||||
session = session or get_session()
|
||||
query = session.query(models.Task).options(
|
||||
sa_orm.joinedload(models.Task.info)
|
||||
).filter_by(id=task_id)
|
||||
|
@ -72,6 +72,10 @@ class GlanceBase(models.ModelBase, models.TimestampMixin):
|
||||
__protected_attributes__ = set([
|
||||
"created_at", "updated_at", "deleted_at", "deleted"])
|
||||
|
||||
def save(self, session=None):
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
super(GlanceBase, self).save(session or db_api.get_session())
|
||||
|
||||
created_at = Column(DateTime, default=timeutils.utcnow,
|
||||
nullable=False)
|
||||
# TODO(vsergeyev): Column `updated_at` have no default value in
|
||||
|
@ -0,0 +1,2 @@
|
||||
import six
|
||||
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2013 Rackspace Hosting
|
||||
# All Rights Reserved.
|
||||
#
|
||||
@ -17,90 +15,148 @@
|
||||
|
||||
"""Multiple DB API backend support.
|
||||
|
||||
Supported configuration options:
|
||||
|
||||
The following two parameters are in the 'database' group:
|
||||
`backend`: DB backend name or full module path to DB backend module.
|
||||
`use_tpool`: Enable thread pooling of DB API calls.
|
||||
|
||||
A DB backend module should implement a method named 'get_backend' which
|
||||
takes no arguments. The method can return any object that implements DB
|
||||
API methods.
|
||||
|
||||
*NOTE*: There are bugs in eventlet when using tpool combined with
|
||||
threading locks. The python logging module happens to use such locks. To
|
||||
work around this issue, be sure to specify thread=False with
|
||||
eventlet.monkey_patch().
|
||||
|
||||
A bug for eventlet has been filed here:
|
||||
|
||||
https://bitbucket.org/eventlet/eventlet/issue/137/
|
||||
"""
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from glance.openstack.common.db import exception
|
||||
from glance.openstack.common.gettextutils import _LE
|
||||
from glance.openstack.common import importutils
|
||||
from glance.openstack.common import lockutils
|
||||
|
||||
|
||||
db_opts = [
|
||||
cfg.StrOpt('backend',
|
||||
default='sqlalchemy',
|
||||
deprecated_name='db_backend',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The backend to use for db'),
|
||||
cfg.BoolOpt('use_tpool',
|
||||
default=False,
|
||||
deprecated_name='dbapi_use_tpool',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Enable the experimental use of thread pooling for '
|
||||
'all DB API calls')
|
||||
]
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(db_opts, 'database')
|
||||
|
||||
def safe_for_db_retry(f):
|
||||
"""Enable db-retry for decorated function, if config option enabled."""
|
||||
f.__dict__['enable_retry'] = True
|
||||
return f
|
||||
|
||||
|
||||
class wrap_db_retry(object):
|
||||
"""Retry db.api methods, if DBConnectionError() raised
|
||||
|
||||
Retry decorated db.api methods. If we enabled `use_db_reconnect`
|
||||
in config, this decorator will be applied to all db.api functions,
|
||||
marked with @safe_for_db_retry decorator.
|
||||
Decorator catchs DBConnectionError() and retries function in a
|
||||
loop until it succeeds, or until maximum retries count will be reached.
|
||||
"""
|
||||
|
||||
def __init__(self, retry_interval, max_retries, inc_retry_interval,
|
||||
max_retry_interval):
|
||||
super(wrap_db_retry, self).__init__()
|
||||
|
||||
self.retry_interval = retry_interval
|
||||
self.max_retries = max_retries
|
||||
self.inc_retry_interval = inc_retry_interval
|
||||
self.max_retry_interval = max_retry_interval
|
||||
|
||||
def __call__(self, f):
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
next_interval = self.retry_interval
|
||||
remaining = self.max_retries
|
||||
|
||||
while True:
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
except exception.DBConnectionError as e:
|
||||
if remaining == 0:
|
||||
LOG.exception(_LE('DB exceeded retry limit.'))
|
||||
raise exception.DBError(e)
|
||||
if remaining != -1:
|
||||
remaining -= 1
|
||||
LOG.exception(_LE('DB connection error.'))
|
||||
# NOTE(vsergeyev): We are using patched time module, so
|
||||
# this effectively yields the execution
|
||||
# context to another green thread.
|
||||
time.sleep(next_interval)
|
||||
if self.inc_retry_interval:
|
||||
next_interval = min(
|
||||
next_interval * 2,
|
||||
self.max_retry_interval
|
||||
)
|
||||
return wrapper
|
||||
|
||||
|
||||
class DBAPI(object):
|
||||
def __init__(self, backend_mapping=None):
|
||||
if backend_mapping is None:
|
||||
backend_mapping = {}
|
||||
self.__backend = None
|
||||
self.__backend_mapping = backend_mapping
|
||||
def __init__(self, backend_name, backend_mapping=None, lazy=False,
|
||||
**kwargs):
|
||||
"""Initialize the chosen DB API backend.
|
||||
|
||||
:param backend_name: name of the backend to load
|
||||
:type backend_name: str
|
||||
|
||||
:param backend_mapping: backend name -> module/class to load mapping
|
||||
:type backend_mapping: dict
|
||||
|
||||
:param lazy: load the DB backend lazily on the first DB API method call
|
||||
:type lazy: bool
|
||||
|
||||
Keyword arguments:
|
||||
|
||||
:keyword use_db_reconnect: retry DB transactions on disconnect or not
|
||||
:type use_db_reconnect: bool
|
||||
|
||||
:keyword retry_interval: seconds between transaction retries
|
||||
:type retry_interval: int
|
||||
|
||||
:keyword inc_retry_interval: increase retry interval or not
|
||||
:type inc_retry_interval: bool
|
||||
|
||||
:keyword max_retry_interval: max interval value between retries
|
||||
:type max_retry_interval: int
|
||||
|
||||
:keyword max_retries: max number of retries before an error is raised
|
||||
:type max_retries: int
|
||||
|
||||
@lockutils.synchronized('dbapi_backend', 'glance-')
|
||||
def __get_backend(self):
|
||||
"""Get the actual backend. May be a module or an instance of
|
||||
a class. Doesn't matter to us. We do this synchronized as it's
|
||||
possible multiple greenthreads started very quickly trying to do
|
||||
DB calls and eventlet can switch threads before self.__backend gets
|
||||
assigned.
|
||||
"""
|
||||
if self.__backend:
|
||||
# Another thread assigned it
|
||||
return self.__backend
|
||||
backend_name = CONF.database.backend
|
||||
self.__use_tpool = CONF.database.use_tpool
|
||||
if self.__use_tpool:
|
||||
from eventlet import tpool
|
||||
self.__tpool = tpool
|
||||
# Import the untranslated name if we don't have a
|
||||
# mapping.
|
||||
backend_path = self.__backend_mapping.get(backend_name,
|
||||
backend_name)
|
||||
backend_mod = importutils.import_module(backend_path)
|
||||
self.__backend = backend_mod.get_backend()
|
||||
return self.__backend
|
||||
|
||||
self._backend = None
|
||||
self._backend_name = backend_name
|
||||
self._backend_mapping = backend_mapping or {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
if not lazy:
|
||||
self._load_backend()
|
||||
|
||||
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
|
||||
self.retry_interval = kwargs.get('retry_interval', 1)
|
||||
self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
|
||||
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
|
||||
self.max_retries = kwargs.get('max_retries', 20)
|
||||
|
||||
def _load_backend(self):
|
||||
with self._lock:
|
||||
if not self._backend:
|
||||
# Import the untranslated name if we don't have a mapping
|
||||
backend_path = self._backend_mapping.get(self._backend_name,
|
||||
self._backend_name)
|
||||
backend_mod = importutils.import_module(backend_path)
|
||||
self._backend = backend_mod.get_backend()
|
||||
|
||||
def __getattr__(self, key):
|
||||
backend = self.__backend or self.__get_backend()
|
||||
attr = getattr(backend, key)
|
||||
if not self.__use_tpool or not hasattr(attr, '__call__'):
|
||||
if not self._backend:
|
||||
self._load_backend()
|
||||
|
||||
attr = getattr(self._backend, key)
|
||||
if not hasattr(attr, '__call__'):
|
||||
return attr
|
||||
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
|
||||
# DB API methods, decorated with @safe_for_db_retry
|
||||
# on disconnect.
|
||||
if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
|
||||
attr = wrap_db_retry(
|
||||
retry_interval=self.retry_interval,
|
||||
max_retries=self.max_retries,
|
||||
inc_retry_interval=self.inc_retry_interval,
|
||||
max_retry_interval=self.max_retry_interval)(attr)
|
||||
|
||||
def tpool_wrapper(*args, **kwargs):
|
||||
return self.__tpool.execute(attr, *args, **kwargs)
|
||||
|
||||
functools.update_wrapper(tpool_wrapper, attr)
|
||||
return tpool_wrapper
|
||||
return attr
|
||||
|
@ -1,5 +1,3 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
@ -18,14 +16,16 @@
|
||||
|
||||
"""DB related custom exceptions."""
|
||||
|
||||
from glance.openstack.common.gettextutils import _ # noqa
|
||||
import six
|
||||
|
||||
from glance.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
class DBError(Exception):
|
||||
"""Wraps an implementation specific exception."""
|
||||
def __init__(self, inner_exception=None):
|
||||
self.inner_exception = inner_exception
|
||||
super(DBError, self).__init__(str(inner_exception))
|
||||
super(DBError, self).__init__(six.text_type(inner_exception))
|
||||
|
||||
|
||||
class DBDuplicateEntry(DBError):
|
||||
@ -48,4 +48,9 @@ class DBInvalidUnicodeParameter(Exception):
|
||||
class DbMigrationError(DBError):
|
||||
"""Wraps migration specific exception."""
|
||||
def __init__(self, message=None):
|
||||
super(DbMigrationError, self).__init__(str(message))
|
||||
super(DbMigrationError, self).__init__(message)
|
||||
|
||||
|
||||
class DBConnectionError(DBError):
|
||||
"""Wraps connection specific exception."""
|
||||
pass
|
||||
|
168
glance/openstack/common/db/options.py
Normal file
168
glance/openstack/common/db/options.py
Normal file
@ -0,0 +1,168 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
|
||||
database_opts = [
|
||||
cfg.StrOpt('sqlite_db',
|
||||
deprecated_group='DEFAULT',
|
||||
default='glance.sqlite',
|
||||
help='The file name to use with SQLite'),
|
||||
cfg.BoolOpt('sqlite_synchronous',
|
||||
deprecated_group='DEFAULT',
|
||||
default=True,
|
||||
help='If True, SQLite uses synchronous mode'),
|
||||
cfg.StrOpt('backend',
|
||||
default='sqlalchemy',
|
||||
deprecated_name='db_backend',
|
||||
deprecated_group='DEFAULT',
|
||||
help='The backend to use for db'),
|
||||
cfg.StrOpt('connection',
|
||||
help='The SQLAlchemy connection string used to connect to the '
|
||||
'database',
|
||||
secret=True,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_connection',
|
||||
group='DATABASE'),
|
||||
cfg.DeprecatedOpt('connection',
|
||||
group='sql'), ]),
|
||||
cfg.StrOpt('mysql_sql_mode',
|
||||
help='The SQL mode to be used for MySQL sessions '
|
||||
'(default is empty, meaning do not override '
|
||||
'any server-side SQL mode setting)'),
|
||||
cfg.IntOpt('idle_timeout',
|
||||
default=3600,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_idle_timeout',
|
||||
group='DATABASE'),
|
||||
cfg.DeprecatedOpt('idle_timeout',
|
||||
group='sql')],
|
||||
help='Timeout before idle sql connections are reaped'),
|
||||
cfg.IntOpt('min_pool_size',
|
||||
default=1,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_min_pool_size',
|
||||
group='DATABASE')],
|
||||
help='Minimum number of SQL connections to keep open in a '
|
||||
'pool'),
|
||||
cfg.IntOpt('max_pool_size',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_max_pool_size',
|
||||
group='DATABASE')],
|
||||
help='Maximum number of SQL connections to keep open in a '
|
||||
'pool'),
|
||||
cfg.IntOpt('max_retries',
|
||||
default=10,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_max_retries',
|
||||
group='DATABASE')],
|
||||
help='Maximum db connection retries during startup. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
cfg.IntOpt('retry_interval',
|
||||
default=10,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('reconnect_interval',
|
||||
group='DATABASE')],
|
||||
help='Interval between retries of opening a sql connection'),
|
||||
cfg.IntOpt('max_overflow',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
|
||||
group='DATABASE')],
|
||||
help='If set, use this value for max_overflow with sqlalchemy'),
|
||||
cfg.IntOpt('connection_debug',
|
||||
default=0,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
|
||||
group='DEFAULT')],
|
||||
help='Verbosity of SQL debugging information. 0=None, '
|
||||
'100=Everything'),
|
||||
cfg.BoolOpt('connection_trace',
|
||||
default=False,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
|
||||
group='DEFAULT')],
|
||||
help='Add python stack traces to SQL as comment strings'),
|
||||
cfg.IntOpt('pool_timeout',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
|
||||
group='DATABASE')],
|
||||
help='If set, use this value for pool_timeout with sqlalchemy'),
|
||||
cfg.BoolOpt('use_db_reconnect',
|
||||
default=False,
|
||||
help='Enable the experimental use of database reconnect '
|
||||
'on connection lost'),
|
||||
cfg.IntOpt('db_retry_interval',
|
||||
default=1,
|
||||
help='seconds between db connection retries'),
|
||||
cfg.BoolOpt('db_inc_retry_interval',
|
||||
default=True,
|
||||
help='Whether to increase interval between db connection '
|
||||
'retries, up to db_max_retry_interval'),
|
||||
cfg.IntOpt('db_max_retry_interval',
|
||||
default=10,
|
||||
help='max seconds between db connection retries, if '
|
||||
'db_inc_retry_interval is enabled'),
|
||||
cfg.IntOpt('db_max_retries',
|
||||
default=20,
|
||||
help='maximum db connection retries before error is raised. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(database_opts, 'database')
|
||||
|
||||
|
||||
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
|
||||
max_overflow=None, pool_timeout=None):
|
||||
"""Set defaults for configuration variables."""
|
||||
cfg.set_defaults(database_opts,
|
||||
connection=sql_connection,
|
||||
sqlite_db=sqlite_db)
|
||||
# Update the QueuePool defaults
|
||||
if max_pool_size is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
max_pool_size=max_pool_size)
|
||||
if max_overflow is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
max_overflow=max_overflow)
|
||||
if pool_timeout is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
pool_timeout=pool_timeout)
|
||||
|
||||
|
||||
def list_opts():
|
||||
"""Returns a list of oslo.config options available in the library.
|
||||
|
||||
The returned list includes all oslo.config options which may be registered
|
||||
at runtime by the library.
|
||||
|
||||
Each element of the list is a tuple. The first element is the name of the
|
||||
group under which the list of elements in the second element will be
|
||||
registered. A group name of None corresponds to the [DEFAULT] group in
|
||||
config files.
|
||||
|
||||
The purpose of this is to allow tools like the Oslo sample config file
|
||||
generator to discover the options exposed to users by this library.
|
||||
|
||||
:returns: a list of (group_name, opts) tuples
|
||||
"""
|
||||
return [('database', copy.deepcopy(database_opts))]
|
@ -51,13 +51,9 @@ import sqlalchemy
|
||||
from sqlalchemy.schema import UniqueConstraint
|
||||
|
||||
from glance.openstack.common.db import exception
|
||||
from glance.openstack.common.db.sqlalchemy import session as db_session
|
||||
from glance.openstack.common.gettextutils import _
|
||||
|
||||
|
||||
get_engine = db_session.get_engine
|
||||
|
||||
|
||||
def _get_unique_constraints(self, table):
|
||||
"""Retrieve information about existing unique constraints of the table
|
||||
|
||||
@ -172,11 +168,12 @@ def patch_migrate():
|
||||
sqlite.SQLiteConstraintGenerator)
|
||||
|
||||
|
||||
def db_sync(abs_path, version=None, init_version=0):
|
||||
def db_sync(engine, abs_path, version=None, init_version=0):
|
||||
"""Upgrade or downgrade a database.
|
||||
|
||||
Function runs the upgrade() or downgrade() functions in change scripts.
|
||||
|
||||
:param engine: SQLAlchemy engine instance for a given database
|
||||
:param abs_path: Absolute path to migrate repository.
|
||||
:param version: Database will upgrade/downgrade until this version.
|
||||
If None - database will update to the latest
|
||||
@ -190,18 +187,23 @@ def db_sync(abs_path, version=None, init_version=0):
|
||||
raise exception.DbMigrationError(
|
||||
message=_("version should be an integer"))
|
||||
|
||||
current_version = db_version(abs_path, init_version)
|
||||
current_version = db_version(engine, abs_path, init_version)
|
||||
repository = _find_migrate_repo(abs_path)
|
||||
_db_schema_sanity_check()
|
||||
_db_schema_sanity_check(engine)
|
||||
if version is None or version > current_version:
|
||||
return versioning_api.upgrade(get_engine(), repository, version)
|
||||
return versioning_api.upgrade(engine, repository, version)
|
||||
else:
|
||||
return versioning_api.downgrade(get_engine(), repository,
|
||||
return versioning_api.downgrade(engine, repository,
|
||||
version)
|
||||
|
||||
|
||||
def _db_schema_sanity_check():
|
||||
engine = get_engine()
|
||||
def _db_schema_sanity_check(engine):
|
||||
"""Ensure all database tables were created with required parameters.
|
||||
|
||||
:param engine: SQLAlchemy engine instance for a given database
|
||||
|
||||
"""
|
||||
|
||||
if engine.name == 'mysql':
|
||||
onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
|
||||
'from information_schema.TABLES '
|
||||
@ -216,23 +218,23 @@ def _db_schema_sanity_check():
|
||||
) % ','.join(table_names))
|
||||
|
||||
|
||||
def db_version(abs_path, init_version):
|
||||
def db_version(engine, abs_path, init_version):
|
||||
"""Show the current version of the repository.
|
||||
|
||||
:param engine: SQLAlchemy engine instance for a given database
|
||||
:param abs_path: Absolute path to migrate repository
|
||||
:param version: Initial database version
|
||||
"""
|
||||
repository = _find_migrate_repo(abs_path)
|
||||
try:
|
||||
return versioning_api.db_version(get_engine(), repository)
|
||||
return versioning_api.db_version(engine, repository)
|
||||
except versioning_exceptions.DatabaseNotControlledError:
|
||||
meta = sqlalchemy.MetaData()
|
||||
engine = get_engine()
|
||||
meta.reflect(bind=engine)
|
||||
tables = meta.tables
|
||||
if len(tables) == 0 or 'alembic_version' in tables:
|
||||
db_version_control(abs_path, init_version)
|
||||
return versioning_api.db_version(get_engine(), repository)
|
||||
db_version_control(engine, abs_path, version=init_version)
|
||||
return versioning_api.db_version(engine, repository)
|
||||
else:
|
||||
raise exception.DbMigrationError(
|
||||
message=_(
|
||||
@ -241,17 +243,18 @@ def db_version(abs_path, init_version):
|
||||
"manually."))
|
||||
|
||||
|
||||
def db_version_control(abs_path, version=None):
|
||||
def db_version_control(engine, abs_path, version=None):
|
||||
"""Mark a database as under this repository's version control.
|
||||
|
||||
Once a database is under version control, schema changes should
|
||||
only be done via change scripts in this repository.
|
||||
|
||||
:param engine: SQLAlchemy engine instance for a given database
|
||||
:param abs_path: Absolute path to migrate repository
|
||||
:param version: Initial database version
|
||||
"""
|
||||
repository = _find_migrate_repo(abs_path)
|
||||
versioning_api.version_control(get_engine(), repository, version)
|
||||
versioning_api.version_control(engine, repository, version)
|
||||
return version
|
||||
|
||||
|
||||
|
@ -26,7 +26,6 @@ from sqlalchemy import Column, Integer
|
||||
from sqlalchemy import DateTime
|
||||
from sqlalchemy.orm import object_mapper
|
||||
|
||||
from glance.openstack.common.db.sqlalchemy import session as sa
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
@ -34,10 +33,9 @@ class ModelBase(object):
|
||||
"""Base class for models."""
|
||||
__table_initialized__ = False
|
||||
|
||||
def save(self, session=None):
|
||||
def save(self, session):
|
||||
"""Save this object."""
|
||||
if not session:
|
||||
session = sa.get_session()
|
||||
|
||||
# NOTE(boris-42): This part of code should be look like:
|
||||
# session.add(self)
|
||||
# session.flush()
|
||||
@ -110,7 +108,7 @@ class SoftDeleteMixin(object):
|
||||
deleted_at = Column(DateTime)
|
||||
deleted = Column(Integer, default=0)
|
||||
|
||||
def soft_delete(self, session=None):
|
||||
def soft_delete(self, session):
|
||||
"""Mark this object as deleted."""
|
||||
self.deleted = self.id
|
||||
self.deleted_at = timeutils.utcnow()
|
||||
|
@ -16,19 +16,6 @@
|
||||
|
||||
"""Session Handling for SQLAlchemy backend.
|
||||
|
||||
Initializing:
|
||||
|
||||
* Call `set_defaults()` with the minimal of the following kwargs:
|
||||
``sql_connection``, ``sqlite_db``
|
||||
|
||||
Example:
|
||||
|
||||
.. code:: python
|
||||
|
||||
session.set_defaults(
|
||||
sql_connection="sqlite:///var/lib/glance/sqlite.db",
|
||||
sqlite_db="/var/lib/glance/sqlite.db")
|
||||
|
||||
Recommended ways to use sessions within this framework:
|
||||
|
||||
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
|
||||
@ -87,7 +74,7 @@ Recommended ways to use sessions within this framework:
|
||||
.. code:: python
|
||||
|
||||
def create_many_foo(context, foos):
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin():
|
||||
for foo in foos:
|
||||
foo_ref = models.Foo()
|
||||
@ -95,7 +82,7 @@ Recommended ways to use sessions within this framework:
|
||||
session.add(foo_ref)
|
||||
|
||||
def update_bar(context, foo_id, newbar):
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin():
|
||||
foo_ref = (model_query(context, models.Foo, session).
|
||||
filter_by(id=foo_id).
|
||||
@ -142,7 +129,7 @@ Recommended ways to use sessions within this framework:
|
||||
foo1 = models.Foo()
|
||||
foo2 = models.Foo()
|
||||
foo1.id = foo2.id = 1
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
try:
|
||||
with session.begin():
|
||||
session.add(foo1)
|
||||
@ -168,7 +155,7 @@ Recommended ways to use sessions within this framework:
|
||||
.. code:: python
|
||||
|
||||
def myfunc(foo):
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin():
|
||||
# do some database things
|
||||
bar = _private_func(foo, session)
|
||||
@ -176,7 +163,7 @@ Recommended ways to use sessions within this framework:
|
||||
|
||||
def _private_func(foo, session=None):
|
||||
if not session:
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin(subtransaction=True):
|
||||
# do some other database things
|
||||
return bar
|
||||
@ -240,7 +227,7 @@ Efficient use of soft deletes:
|
||||
|
||||
def complex_soft_delete_with_synchronization_bar(session=None):
|
||||
if session is None:
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin(subtransactions=True):
|
||||
count = (model_query(BarModel).
|
||||
find(some_condition).
|
||||
@ -257,7 +244,7 @@ Efficient use of soft deletes:
|
||||
.. code:: python
|
||||
|
||||
def soft_delete_bar_model():
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin():
|
||||
bar_ref = model_query(BarModel).find(some_condition).first()
|
||||
# Work with bar_ref
|
||||
@ -269,7 +256,7 @@ Efficient use of soft deletes:
|
||||
.. code:: python
|
||||
|
||||
def soft_delete_multi_models():
|
||||
session = get_session()
|
||||
session = sessionmaker()
|
||||
with session.begin():
|
||||
query = (model_query(BarModel, session=session).
|
||||
find(some_condition))
|
||||
@ -293,11 +280,9 @@ Efficient use of soft deletes:
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import os.path
|
||||
import re
|
||||
import time
|
||||
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
from sqlalchemy import exc as sqla_exc
|
||||
from sqlalchemy.interfaces import PoolListener
|
||||
@ -306,150 +291,12 @@ from sqlalchemy.pool import NullPool, StaticPool
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
|
||||
from glance.openstack.common.db import exception
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common.gettextutils import _LE, _LW, _LI
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
sqlite_db_opts = [
|
||||
cfg.StrOpt('sqlite_db',
|
||||
default='glance.sqlite',
|
||||
help='The file name to use with SQLite'),
|
||||
cfg.BoolOpt('sqlite_synchronous',
|
||||
default=True,
|
||||
help='If True, SQLite uses synchronous mode'),
|
||||
]
|
||||
|
||||
database_opts = [
|
||||
cfg.StrOpt('connection',
|
||||
default='sqlite:///' +
|
||||
os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
'../', '$sqlite_db')),
|
||||
help='The SQLAlchemy connection string used to connect to the '
|
||||
'database',
|
||||
secret=True,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_connection',
|
||||
group='DATABASE'),
|
||||
cfg.DeprecatedOpt('connection',
|
||||
group='sql'), ]),
|
||||
cfg.StrOpt('slave_connection',
|
||||
default='',
|
||||
secret=True,
|
||||
help='The SQLAlchemy connection string used to connect to the '
|
||||
'slave database'),
|
||||
cfg.IntOpt('idle_timeout',
|
||||
default=3600,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_idle_timeout',
|
||||
group='DATABASE'),
|
||||
cfg.DeprecatedOpt('idle_timeout',
|
||||
group='sql')],
|
||||
help='Timeout before idle sql connections are reaped'),
|
||||
cfg.IntOpt('min_pool_size',
|
||||
default=1,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_min_pool_size',
|
||||
group='DATABASE')],
|
||||
help='Minimum number of SQL connections to keep open in a '
|
||||
'pool'),
|
||||
cfg.IntOpt('max_pool_size',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_max_pool_size',
|
||||
group='DATABASE')],
|
||||
help='Maximum number of SQL connections to keep open in a '
|
||||
'pool'),
|
||||
cfg.IntOpt('max_retries',
|
||||
default=10,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sql_max_retries',
|
||||
group='DATABASE')],
|
||||
help='Maximum db connection retries during startup. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
cfg.IntOpt('retry_interval',
|
||||
default=10,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('reconnect_interval',
|
||||
group='DATABASE')],
|
||||
help='Interval between retries of opening a sql connection'),
|
||||
cfg.IntOpt('max_overflow',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
|
||||
group='DEFAULT'),
|
||||
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
|
||||
group='DATABASE')],
|
||||
help='If set, use this value for max_overflow with sqlalchemy'),
|
||||
cfg.IntOpt('connection_debug',
|
||||
default=0,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
|
||||
group='DEFAULT')],
|
||||
help='Verbosity of SQL debugging information. 0=None, '
|
||||
'100=Everything'),
|
||||
cfg.BoolOpt('connection_trace',
|
||||
default=False,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
|
||||
group='DEFAULT')],
|
||||
help='Add python stack traces to SQL as comment strings'),
|
||||
cfg.IntOpt('pool_timeout',
|
||||
default=None,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
|
||||
group='DATABASE')],
|
||||
help='If set, use this value for pool_timeout with sqlalchemy'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(sqlite_db_opts)
|
||||
CONF.register_opts(database_opts, 'database')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_ENGINE = None
|
||||
_MAKER = None
|
||||
_SLAVE_ENGINE = None
|
||||
_SLAVE_MAKER = None
|
||||
|
||||
|
||||
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
|
||||
max_overflow=None, pool_timeout=None):
|
||||
"""Set defaults for configuration variables."""
|
||||
cfg.set_defaults(database_opts,
|
||||
connection=sql_connection)
|
||||
cfg.set_defaults(sqlite_db_opts,
|
||||
sqlite_db=sqlite_db)
|
||||
# Update the QueuePool defaults
|
||||
if max_pool_size is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
max_pool_size=max_pool_size)
|
||||
if max_overflow is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
max_overflow=max_overflow)
|
||||
if pool_timeout is not None:
|
||||
cfg.set_defaults(database_opts,
|
||||
pool_timeout=pool_timeout)
|
||||
|
||||
|
||||
def cleanup():
|
||||
global _ENGINE, _MAKER
|
||||
global _SLAVE_ENGINE, _SLAVE_MAKER
|
||||
|
||||
if _MAKER:
|
||||
_MAKER.close_all()
|
||||
_MAKER = None
|
||||
if _ENGINE:
|
||||
_ENGINE.dispose()
|
||||
_ENGINE = None
|
||||
if _SLAVE_MAKER:
|
||||
_SLAVE_MAKER.close_all()
|
||||
_SLAVE_MAKER = None
|
||||
if _SLAVE_ENGINE:
|
||||
_SLAVE_ENGINE.dispose()
|
||||
_SLAVE_ENGINE = None
|
||||
|
||||
|
||||
class SqliteForeignKeysListener(PoolListener):
|
||||
"""Ensures that the foreign key constraints are enforced in SQLite.
|
||||
@ -462,30 +309,6 @@ class SqliteForeignKeysListener(PoolListener):
|
||||
dbapi_con.execute('pragma foreign_keys=ON')
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
|
||||
slave_session=False, mysql_traditional_mode=False):
|
||||
"""Return a SQLAlchemy session."""
|
||||
global _MAKER
|
||||
global _SLAVE_MAKER
|
||||
maker = _MAKER
|
||||
|
||||
if slave_session:
|
||||
maker = _SLAVE_MAKER
|
||||
|
||||
if maker is None:
|
||||
engine = get_engine(sqlite_fk=sqlite_fk, slave_engine=slave_session,
|
||||
mysql_traditional_mode=mysql_traditional_mode)
|
||||
maker = get_maker(engine, autocommit, expire_on_commit)
|
||||
|
||||
if slave_session:
|
||||
_SLAVE_MAKER = maker
|
||||
else:
|
||||
_MAKER = maker
|
||||
|
||||
session = maker()
|
||||
return session
|
||||
|
||||
|
||||
# note(boris-42): In current versions of DB backends unique constraint
|
||||
# violation messages follow the structure:
|
||||
#
|
||||
@ -509,11 +332,20 @@ def get_session(autocommit=True, expire_on_commit=False, sqlite_fk=False,
|
||||
# 'c1'")
|
||||
# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined
|
||||
# with -' for key 'name_of_our_constraint'")
|
||||
#
|
||||
# ibm_db_sa:
|
||||
# N columns - (IntegrityError) SQL0803N One or more values in the INSERT
|
||||
# statement, UPDATE statement, or foreign key update caused by a
|
||||
# DELETE statement are not valid because the primary key, unique
|
||||
# constraint or unique index identified by "2" constrains table
|
||||
# "NOVA.KEY_PAIRS" from having duplicate values for the index
|
||||
# key.
|
||||
_DUP_KEY_RE_DB = {
|
||||
"sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"),
|
||||
re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")),
|
||||
"postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),),
|
||||
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),)
|
||||
"mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),),
|
||||
"ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),),
|
||||
}
|
||||
|
||||
|
||||
@ -535,7 +367,7 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
|
||||
return [columns]
|
||||
return columns[len(uniqbase):].split("0")[1:]
|
||||
|
||||
if engine_name not in ["mysql", "sqlite", "postgresql"]:
|
||||
if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]:
|
||||
return
|
||||
|
||||
# FIXME(johannes): The usage of the .message attribute has been
|
||||
@ -550,7 +382,12 @@ def _raise_if_duplicate_entry_error(integrity_error, engine_name):
|
||||
else:
|
||||
return
|
||||
|
||||
columns = match.group(1)
|
||||
# NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the
|
||||
# columns so we have to omit that from the DBDuplicateEntry error.
|
||||
columns = ''
|
||||
|
||||
if engine_name != 'ibm_db_sa':
|
||||
columns = match.group(1)
|
||||
|
||||
if engine_name == "sqlite":
|
||||
columns = [c.split('.')[-1] for c in columns.strip().split(", ")]
|
||||
@ -591,57 +428,39 @@ def _raise_if_deadlock_error(operational_error, engine_name):
|
||||
|
||||
|
||||
def _wrap_db_error(f):
|
||||
#TODO(rpodolyaka): in a subsequent commit make this a class decorator to
|
||||
# ensure it can only applied to Session subclasses instances (as we use
|
||||
# Session instance bind attribute below)
|
||||
|
||||
@functools.wraps(f)
|
||||
def _wrap(*args, **kwargs):
|
||||
def _wrap(self, *args, **kwargs):
|
||||
try:
|
||||
return f(*args, **kwargs)
|
||||
return f(self, *args, **kwargs)
|
||||
except UnicodeEncodeError:
|
||||
raise exception.DBInvalidUnicodeParameter()
|
||||
# note(boris-42): We should catch unique constraint violation and
|
||||
# wrap it by our own DBDuplicateEntry exception. Unique constraint
|
||||
# violation is wrapped by IntegrityError.
|
||||
except sqla_exc.OperationalError as e:
|
||||
_raise_if_deadlock_error(e, get_engine().name)
|
||||
_raise_if_db_connection_lost(e, self.bind)
|
||||
_raise_if_deadlock_error(e, self.bind.dialect.name)
|
||||
# NOTE(comstud): A lot of code is checking for OperationalError
|
||||
# so let's not wrap it for now.
|
||||
raise
|
||||
# note(boris-42): We should catch unique constraint violation and
|
||||
# wrap it by our own DBDuplicateEntry exception. Unique constraint
|
||||
# violation is wrapped by IntegrityError.
|
||||
except sqla_exc.IntegrityError as e:
|
||||
# note(boris-42): SqlAlchemy doesn't unify errors from different
|
||||
# DBs so we must do this. Also in some tables (for example
|
||||
# instance_types) there are more than one unique constraint. This
|
||||
# means we should get names of columns, which values violate
|
||||
# unique constraint, from error message.
|
||||
_raise_if_duplicate_entry_error(e, get_engine().name)
|
||||
_raise_if_duplicate_entry_error(e, self.bind.dialect.name)
|
||||
raise exception.DBError(e)
|
||||
except Exception as e:
|
||||
LOG.exception(_('DB exception wrapped.'))
|
||||
LOG.exception(_LE('DB exception wrapped.'))
|
||||
raise exception.DBError(e)
|
||||
return _wrap
|
||||
|
||||
|
||||
def get_engine(sqlite_fk=False, slave_engine=False,
|
||||
mysql_traditional_mode=False):
|
||||
"""Return a SQLAlchemy engine."""
|
||||
global _ENGINE
|
||||
global _SLAVE_ENGINE
|
||||
engine = _ENGINE
|
||||
db_uri = CONF.database.connection
|
||||
|
||||
if slave_engine:
|
||||
engine = _SLAVE_ENGINE
|
||||
db_uri = CONF.database.slave_connection
|
||||
|
||||
if engine is None:
|
||||
engine = create_engine(db_uri, sqlite_fk=sqlite_fk,
|
||||
mysql_traditional_mode=mysql_traditional_mode)
|
||||
if slave_engine:
|
||||
_SLAVE_ENGINE = engine
|
||||
else:
|
||||
_ENGINE = engine
|
||||
|
||||
return engine
|
||||
|
||||
|
||||
def _synchronous_switch_listener(dbapi_conn, connection_rec):
|
||||
"""Switch sqlite connections to non-synchronous mode."""
|
||||
dbapi_conn.execute("PRAGMA synchronous = OFF")
|
||||
@ -683,7 +502,7 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
|
||||
cursor.execute(ping_sql)
|
||||
except Exception as ex:
|
||||
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
|
||||
msg = _('Database server has gone away: %s') % ex
|
||||
msg = _LW('Database server has gone away: %s') % ex
|
||||
LOG.warning(msg)
|
||||
raise sqla_exc.DisconnectionError(msg)
|
||||
else:
|
||||
@ -698,7 +517,44 @@ def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
|
||||
than a declared field just with warning. That is fraught with data
|
||||
corruption.
|
||||
"""
|
||||
dbapi_con.cursor().execute("SET SESSION sql_mode = TRADITIONAL;")
|
||||
_set_session_sql_mode(dbapi_con, connection_rec,
|
||||
connection_proxy, 'TRADITIONAL')
|
||||
|
||||
|
||||
def _set_session_sql_mode(dbapi_con, connection_rec,
|
||||
connection_proxy, sql_mode=None):
|
||||
"""Set the sql_mode session variable.
|
||||
|
||||
MySQL supports several server modes. The default is None, but sessions
|
||||
may choose to enable server modes like TRADITIONAL, ANSI,
|
||||
several STRICT_* modes and others.
|
||||
|
||||
Note: passing in '' (empty string) for sql_mode clears
|
||||
the SQL mode for the session, overriding a potentially set
|
||||
server default. Passing in None (the default) makes this
|
||||
a no-op, meaning if a server-side SQL mode is set, it still applies.
|
||||
"""
|
||||
cursor = dbapi_con.cursor()
|
||||
if sql_mode is not None:
|
||||
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
|
||||
|
||||
# Check against the real effective SQL mode. Even when unset by
|
||||
# our own config, the server may still be operating in a specific
|
||||
# SQL mode as set by the server configuration
|
||||
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
|
||||
row = cursor.fetchone()
|
||||
if row is None:
|
||||
LOG.warning(_LW('Unable to detect effective SQL mode'))
|
||||
return
|
||||
realmode = row[1]
|
||||
LOG.info(_LI('MySQL server mode set to %s') % realmode)
|
||||
# 'TRADITIONAL' mode enables several other modes, so
|
||||
# we need a substring match here
|
||||
if not ('TRADITIONAL' in realmode.upper() or
|
||||
'STRICT_ALL_TABLES' in realmode.upper()):
|
||||
LOG.warning(_LW("MySQL SQL mode is '%s', "
|
||||
"consider enabling TRADITIONAL or STRICT_ALL_TABLES")
|
||||
% realmode)
|
||||
|
||||
|
||||
def _is_db_connection_error(args):
|
||||
@ -713,69 +569,79 @@ def _is_db_connection_error(args):
|
||||
return False
|
||||
|
||||
|
||||
def create_engine(sql_connection, sqlite_fk=False,
|
||||
mysql_traditional_mode=False):
|
||||
def _raise_if_db_connection_lost(error, engine):
|
||||
# NOTE(vsergeyev): Function is_disconnect(e, connection, cursor)
|
||||
# requires connection and cursor in incoming parameters,
|
||||
# but we have no possibility to create connection if DB
|
||||
# is not available, so in such case reconnect fails.
|
||||
# But is_disconnect() ignores these parameters, so it
|
||||
# makes sense to pass to function None as placeholder
|
||||
# instead of connection and cursor.
|
||||
if engine.dialect.is_disconnect(error, None, None):
|
||||
raise exception.DBConnectionError(error)
|
||||
|
||||
|
||||
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
|
||||
mysql_traditional_mode=False, idle_timeout=3600,
|
||||
connection_debug=0, max_pool_size=None, max_overflow=None,
|
||||
pool_timeout=None, sqlite_synchronous=True,
|
||||
connection_trace=False, max_retries=10, retry_interval=10):
|
||||
"""Return a new SQLAlchemy engine."""
|
||||
# NOTE(geekinutah): At this point we could be connecting to the normal
|
||||
# db handle or the slave db handle. Things like
|
||||
# _wrap_db_error aren't going to work well if their
|
||||
# backends don't match. Let's check.
|
||||
_assert_matching_drivers()
|
||||
|
||||
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
|
||||
|
||||
engine_args = {
|
||||
"pool_recycle": CONF.database.idle_timeout,
|
||||
"echo": False,
|
||||
"pool_recycle": idle_timeout,
|
||||
'convert_unicode': True,
|
||||
}
|
||||
|
||||
# Map our SQL debug level to SQLAlchemy's options
|
||||
if CONF.database.connection_debug >= 100:
|
||||
engine_args['echo'] = 'debug'
|
||||
elif CONF.database.connection_debug >= 50:
|
||||
engine_args['echo'] = True
|
||||
logger = logging.getLogger('sqlalchemy.engine')
|
||||
|
||||
# Map SQL debug level to Python log level
|
||||
if connection_debug >= 100:
|
||||
logger.setLevel(logging.DEBUG)
|
||||
elif connection_debug >= 50:
|
||||
logger.setLevel(logging.INFO)
|
||||
else:
|
||||
logger.setLevel(logging.WARNING)
|
||||
|
||||
if "sqlite" in connection_dict.drivername:
|
||||
if sqlite_fk:
|
||||
engine_args["listeners"] = [SqliteForeignKeysListener()]
|
||||
engine_args["poolclass"] = NullPool
|
||||
|
||||
if CONF.database.connection == "sqlite://":
|
||||
if sql_connection == "sqlite://":
|
||||
engine_args["poolclass"] = StaticPool
|
||||
engine_args["connect_args"] = {'check_same_thread': False}
|
||||
else:
|
||||
if CONF.database.max_pool_size is not None:
|
||||
engine_args['pool_size'] = CONF.database.max_pool_size
|
||||
if CONF.database.max_overflow is not None:
|
||||
engine_args['max_overflow'] = CONF.database.max_overflow
|
||||
if CONF.database.pool_timeout is not None:
|
||||
engine_args['pool_timeout'] = CONF.database.pool_timeout
|
||||
if max_pool_size is not None:
|
||||
engine_args['pool_size'] = max_pool_size
|
||||
if max_overflow is not None:
|
||||
engine_args['max_overflow'] = max_overflow
|
||||
if pool_timeout is not None:
|
||||
engine_args['pool_timeout'] = pool_timeout
|
||||
|
||||
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
|
||||
|
||||
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
|
||||
|
||||
if engine.name in ['mysql', 'ibm_db_sa']:
|
||||
callback = functools.partial(_ping_listener, engine)
|
||||
sqlalchemy.event.listen(engine, 'checkout', callback)
|
||||
ping_callback = functools.partial(_ping_listener, engine)
|
||||
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
|
||||
if engine.name == 'mysql':
|
||||
if mysql_traditional_mode:
|
||||
sqlalchemy.event.listen(engine, 'checkout',
|
||||
_set_mode_traditional)
|
||||
else:
|
||||
LOG.warning(_("This application has not enabled MySQL "
|
||||
"traditional mode, which means silent "
|
||||
"data corruption may occur. "
|
||||
"Please encourage the application "
|
||||
"developers to enable this mode."))
|
||||
mysql_sql_mode = 'TRADITIONAL'
|
||||
if mysql_sql_mode:
|
||||
mode_callback = functools.partial(_set_session_sql_mode,
|
||||
sql_mode=mysql_sql_mode)
|
||||
sqlalchemy.event.listen(engine, 'checkout', mode_callback)
|
||||
elif 'sqlite' in connection_dict.drivername:
|
||||
if not CONF.sqlite_synchronous:
|
||||
if not sqlite_synchronous:
|
||||
sqlalchemy.event.listen(engine, 'connect',
|
||||
_synchronous_switch_listener)
|
||||
sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener)
|
||||
|
||||
if (CONF.database.connection_trace and
|
||||
engine.dialect.dbapi.__name__ == 'MySQLdb'):
|
||||
if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb':
|
||||
_patch_mysqldb_with_stacktrace_comments()
|
||||
|
||||
try:
|
||||
@ -784,15 +650,15 @@ def create_engine(sql_connection, sqlite_fk=False,
|
||||
if not _is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
|
||||
remaining = CONF.database.max_retries
|
||||
remaining = max_retries
|
||||
if remaining == -1:
|
||||
remaining = 'infinite'
|
||||
while True:
|
||||
msg = _('SQL connection failed. %s attempts left.')
|
||||
msg = _LW('SQL connection failed. %s attempts left.')
|
||||
LOG.warning(msg % remaining)
|
||||
if remaining != 'infinite':
|
||||
remaining -= 1
|
||||
time.sleep(CONF.database.retry_interval)
|
||||
time.sleep(retry_interval)
|
||||
try:
|
||||
engine.connect()
|
||||
break
|
||||
@ -879,13 +745,116 @@ def _patch_mysqldb_with_stacktrace_comments():
|
||||
setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query)
|
||||
|
||||
|
||||
def _assert_matching_drivers():
|
||||
"""Make sure slave handle and normal handle have the same driver."""
|
||||
# NOTE(geekinutah): There's no use case for writing to one backend and
|
||||
# reading from another. Who knows what the future holds?
|
||||
if CONF.database.slave_connection == '':
|
||||
return
|
||||
class EngineFacade(object):
|
||||
"""A helper class for removing of global engine instances from glance.db.
|
||||
|
||||
normal = sqlalchemy.engine.url.make_url(CONF.database.connection)
|
||||
slave = sqlalchemy.engine.url.make_url(CONF.database.slave_connection)
|
||||
assert normal.drivername == slave.drivername
|
||||
As a library, glance.db can't decide where to store/when to create engine
|
||||
and sessionmaker instances, so this must be left for a target application.
|
||||
|
||||
On the other hand, in order to simplify the adoption of glance.db changes,
|
||||
we'll provide a helper class, which creates engine and sessionmaker
|
||||
on its instantiation and provides get_engine()/get_session() methods
|
||||
that are compatible with corresponding utility functions that currently
|
||||
exist in target projects, e.g. in Nova.
|
||||
|
||||
engine/sessionmaker instances will still be global (and they are meant to
|
||||
be global), but they will be stored in the app context, rather that in the
|
||||
glance.db context.
|
||||
|
||||
Note: using of this helper is completely optional and you are encouraged to
|
||||
integrate engine/sessionmaker instances into your apps any way you like
|
||||
(e.g. one might want to bind a session to a request context). Two important
|
||||
things to remember:
|
||||
1. An Engine instance is effectively a pool of DB connections, so it's
|
||||
meant to be shared (and it's thread-safe).
|
||||
2. A Session instance is not meant to be shared and represents a DB
|
||||
transactional context (i.e. it's not thread-safe). sessionmaker is
|
||||
a factory of sessions.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, sql_connection,
|
||||
sqlite_fk=False, mysql_sql_mode=None,
|
||||
autocommit=True, expire_on_commit=False, **kwargs):
|
||||
"""Initialize engine and sessionmaker instances.
|
||||
|
||||
:param sqlite_fk: enable foreign keys in SQLite
|
||||
:type sqlite_fk: bool
|
||||
|
||||
:param mysql_sql_mode: set SQL mode in MySQL
|
||||
:type mysql_sql_mode: string
|
||||
|
||||
:param autocommit: use autocommit mode for created Session instances
|
||||
:type autocommit: bool
|
||||
|
||||
:param expire_on_commit: expire session objects on commit
|
||||
:type expire_on_commit: bool
|
||||
|
||||
Keyword arguments:
|
||||
|
||||
:keyword idle_timeout: timeout before idle sql connections are reaped
|
||||
(defaults to 3600)
|
||||
:keyword connection_debug: verbosity of SQL debugging information.
|
||||
0=None, 100=Everything (defaults to 0)
|
||||
:keyword max_pool_size: maximum number of SQL connections to keep open
|
||||
in a pool (defaults to SQLAlchemy settings)
|
||||
:keyword max_overflow: if set, use this value for max_overflow with
|
||||
sqlalchemy (defaults to SQLAlchemy settings)
|
||||
:keyword pool_timeout: if set, use this value for pool_timeout with
|
||||
sqlalchemy (defaults to SQLAlchemy settings)
|
||||
:keyword sqlite_synchronous: if True, SQLite uses synchronous mode
|
||||
(defaults to True)
|
||||
:keyword connection_trace: add python stack traces to SQL as comment
|
||||
strings (defaults to False)
|
||||
:keyword max_retries: maximum db connection retries during startup.
|
||||
(setting -1 implies an infinite retry count)
|
||||
(defaults to 10)
|
||||
:keyword retry_interval: interval between retries of opening a sql
|
||||
connection (defaults to 10)
|
||||
|
||||
"""
|
||||
|
||||
super(EngineFacade, self).__init__()
|
||||
|
||||
self._engine = create_engine(
|
||||
sql_connection=sql_connection,
|
||||
sqlite_fk=sqlite_fk,
|
||||
mysql_sql_mode=mysql_sql_mode,
|
||||
idle_timeout=kwargs.get('idle_timeout', 3600),
|
||||
connection_debug=kwargs.get('connection_debug', 0),
|
||||
max_pool_size=kwargs.get('max_pool_size'),
|
||||
max_overflow=kwargs.get('max_overflow'),
|
||||
pool_timeout=kwargs.get('pool_timeout'),
|
||||
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
|
||||
connection_trace=kwargs.get('connection_trace', False),
|
||||
max_retries=kwargs.get('max_retries', 10),
|
||||
retry_interval=kwargs.get('retry_interval', 10))
|
||||
self._session_maker = get_maker(
|
||||
engine=self._engine,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
|
||||
def get_engine(self):
|
||||
"""Get the engine instance (note, that it's shared)."""
|
||||
|
||||
return self._engine
|
||||
|
||||
def get_session(self, **kwargs):
|
||||
"""Get a Session instance.
|
||||
|
||||
If passed, keyword arguments values override the ones used when the
|
||||
sessionmaker instance was created.
|
||||
|
||||
:keyword autocommit: use autocommit mode for created Session instances
|
||||
:type autocommit: bool
|
||||
|
||||
:keyword expire_on_commit: expire session objects on commit
|
||||
:type expire_on_commit: bool
|
||||
|
||||
"""
|
||||
|
||||
for arg in kwargs:
|
||||
if arg not in ('autocommit', 'expire_on_commit'):
|
||||
del kwargs[arg]
|
||||
|
||||
return self._session_maker(**kwargs)
|
||||
|
@ -18,7 +18,6 @@ import functools
|
||||
import os
|
||||
|
||||
import fixtures
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
from glance.openstack.common.db.sqlalchemy import session
|
||||
@ -38,18 +37,17 @@ class DbFixture(fixtures.Fixture):
|
||||
def _get_uri(self):
|
||||
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, test):
|
||||
super(DbFixture, self).__init__()
|
||||
self.conf = cfg.CONF
|
||||
self.conf.import_opt('connection',
|
||||
'glance.openstack.common.db.sqlalchemy.session',
|
||||
group='database')
|
||||
|
||||
self.test = test
|
||||
|
||||
def setUp(self):
|
||||
super(DbFixture, self).setUp()
|
||||
|
||||
self.conf.set_default('connection', self._get_uri(), group='database')
|
||||
self.addCleanup(self.conf.reset)
|
||||
self.test.engine = session.create_engine(self._get_uri())
|
||||
self.test.sessionmaker = session.get_maker(self.test.engine)
|
||||
self.addCleanup(self.test.engine.dispose)
|
||||
|
||||
|
||||
class DbTestCase(test.BaseTestCase):
|
||||
@ -64,9 +62,7 @@ class DbTestCase(test.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(DbTestCase, self).setUp()
|
||||
self.useFixture(self.FIXTURE())
|
||||
|
||||
self.addCleanup(session.cleanup)
|
||||
self.useFixture(self.FIXTURE(self))
|
||||
|
||||
|
||||
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
|
||||
@ -83,11 +79,10 @@ def backend_specific(*dialects):
|
||||
if not set(dialects).issubset(ALLOWED_DIALECTS):
|
||||
raise ValueError(
|
||||
"Please use allowed dialects: %s" % ALLOWED_DIALECTS)
|
||||
engine = session.get_engine()
|
||||
if engine.name not in dialects:
|
||||
if self.engine.name not in dialects:
|
||||
msg = ('The test "%s" can be run '
|
||||
'only on %s. Current engine is %s.')
|
||||
args = (f.__name__, ' '.join(dialects), engine.name)
|
||||
args = (f.__name__, ' '.join(dialects), self.engine.name)
|
||||
self.skip(msg % args)
|
||||
else:
|
||||
return f(self)
|
||||
|
@ -21,12 +21,12 @@ import subprocess
|
||||
|
||||
import lockfile
|
||||
from six import moves
|
||||
from six.moves.urllib import parse
|
||||
import sqlalchemy
|
||||
import sqlalchemy.exc
|
||||
|
||||
from glance.openstack.common.db.sqlalchemy import utils
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common.py3kcompat import urlutils
|
||||
from glance.openstack.common.gettextutils import _LE
|
||||
from glance.openstack.common import test
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -60,10 +60,10 @@ def _set_db_lock(lock_path=None, lock_prefix=None):
|
||||
path = lock_path or os.environ.get("GLANCE_LOCK_PATH")
|
||||
lock = lockfile.FileLock(os.path.join(path, lock_prefix))
|
||||
with lock:
|
||||
LOG.debug(_('Got lock "%s"') % f.__name__)
|
||||
LOG.debug('Got lock "%s"' % f.__name__)
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug(_('Lock released "%s"') % f.__name__)
|
||||
LOG.debug('Lock released "%s"' % f.__name__)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
@ -153,7 +153,7 @@ class BaseMigrationTestCase(test.BaseTestCase):
|
||||
def _reset_databases(self):
|
||||
for key, engine in self.engines.items():
|
||||
conn_string = self.test_databases[key]
|
||||
conn_pieces = urlutils.urlparse(conn_string)
|
||||
conn_pieces = parse.urlparse(conn_string)
|
||||
engine.dispose()
|
||||
if conn_string.startswith('sqlite'):
|
||||
# We can just delete the SQLite database, which is
|
||||
@ -264,6 +264,6 @@ class WalkVersionsMixin(object):
|
||||
if check:
|
||||
check(engine, data)
|
||||
except Exception:
|
||||
LOG.error("Failed to migrate to version %s on engine %s" %
|
||||
LOG.error(_LE("Failed to migrate to version %s on engine %s") %
|
||||
(version, engine))
|
||||
raise
|
||||
|
@ -30,6 +30,7 @@ from sqlalchemy import func
|
||||
from sqlalchemy import Index
|
||||
from sqlalchemy import Integer
|
||||
from sqlalchemy import MetaData
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy.sql.expression import UpdateBase
|
||||
from sqlalchemy.sql import select
|
||||
@ -37,7 +38,9 @@ from sqlalchemy import String
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy.types import NullType
|
||||
|
||||
from glance.openstack.common.gettextutils import _
|
||||
from glance.openstack.common import context as request_context
|
||||
from glance.openstack.common.db.sqlalchemy import models
|
||||
from glance.openstack.common.gettextutils import _, _LI, _LW
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
@ -93,7 +96,7 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
|
||||
if 'id' not in sort_keys:
|
||||
# TODO(justinsb): If this ever gives a false-positive, check
|
||||
# the actual primary key, rather than assuming its id
|
||||
LOG.warning(_('Id not in sort_keys; is sort_keys unique?'))
|
||||
LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?'))
|
||||
|
||||
assert(not (sort_dir and sort_dirs))
|
||||
|
||||
@ -156,6 +159,94 @@ def paginate_query(query, model, limit, sort_keys, marker=None,
|
||||
return query
|
||||
|
||||
|
||||
def _read_deleted_filter(query, db_model, read_deleted):
|
||||
if 'deleted' not in db_model.__table__.columns:
|
||||
raise ValueError(_("There is no `deleted` column in `%s` table. "
|
||||
"Project doesn't use soft-deleted feature.")
|
||||
% db_model.__name__)
|
||||
|
||||
default_deleted_value = db_model.__table__.c.deleted.default.arg
|
||||
if read_deleted == 'no':
|
||||
query = query.filter(db_model.deleted == default_deleted_value)
|
||||
elif read_deleted == 'yes':
|
||||
pass # omit the filter to include deleted and active
|
||||
elif read_deleted == 'only':
|
||||
query = query.filter(db_model.deleted != default_deleted_value)
|
||||
else:
|
||||
raise ValueError(_("Unrecognized read_deleted value '%s'")
|
||||
% read_deleted)
|
||||
return query
|
||||
|
||||
|
||||
def _project_filter(query, db_model, context, project_only):
|
||||
if project_only and 'project_id' not in db_model.__table__.columns:
|
||||
raise ValueError(_("There is no `project_id` column in `%s` table.")
|
||||
% db_model.__name__)
|
||||
|
||||
if request_context.is_user_context(context) and project_only:
|
||||
if project_only == 'allow_none':
|
||||
is_none = None
|
||||
query = query.filter(or_(db_model.project_id == context.project_id,
|
||||
db_model.project_id == is_none))
|
||||
else:
|
||||
query = query.filter(db_model.project_id == context.project_id)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def model_query(context, model, session, args=None, project_only=False,
|
||||
read_deleted=None):
|
||||
"""Query helper that accounts for context's `read_deleted` field.
|
||||
|
||||
:param context: context to query under
|
||||
|
||||
:param model: Model to query. Must be a subclass of ModelBase.
|
||||
:type model: models.ModelBase
|
||||
|
||||
:param session: The session to use.
|
||||
:type session: sqlalchemy.orm.session.Session
|
||||
|
||||
:param args: Arguments to query. If None - model is used.
|
||||
:type args: tuple
|
||||
|
||||
:param project_only: If present and context is user-type, then restrict
|
||||
query to match the context's project_id. If set to
|
||||
'allow_none', restriction includes project_id = None.
|
||||
:type project_only: bool
|
||||
|
||||
:param read_deleted: If present, overrides context's read_deleted field.
|
||||
:type read_deleted: bool
|
||||
|
||||
Usage:
|
||||
result = (utils.model_query(context, models.Instance, session=session)
|
||||
.filter_by(uuid=instance_uuid)
|
||||
.all())
|
||||
|
||||
query = utils.model_query(
|
||||
context, Node,
|
||||
session=session,
|
||||
args=(func.count(Node.id), func.sum(Node.ram))
|
||||
).filter_by(project_id=project_id)
|
||||
"""
|
||||
|
||||
if not read_deleted:
|
||||
if hasattr(context, 'read_deleted'):
|
||||
# NOTE(viktors): some projects use `read_deleted` attribute in
|
||||
# their contexts instead of `show_deleted`.
|
||||
read_deleted = context.read_deleted
|
||||
else:
|
||||
read_deleted = context.show_deleted
|
||||
|
||||
if not issubclass(model, models.ModelBase):
|
||||
raise TypeError(_("model should be a subclass of ModelBase"))
|
||||
|
||||
query = session.query(model) if not args else session.query(*args)
|
||||
query = _read_deleted_filter(query, model, read_deleted)
|
||||
query = _project_filter(query, model, context, project_only)
|
||||
|
||||
return query
|
||||
|
||||
|
||||
def get_table(engine, name):
|
||||
"""Returns an sqlalchemy table dynamically from db.
|
||||
|
||||
@ -276,8 +367,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
|
||||
|
||||
rows_to_delete_select = select([table.c.id]).where(delete_condition)
|
||||
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
|
||||
LOG.info(_("Deleting duplicated row with id: %(id)s from table: "
|
||||
"%(table)s") % dict(id=row[0], table=table_name))
|
||||
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
|
||||
"%(table)s") % dict(id=row[0], table=table_name))
|
||||
|
||||
if use_soft_delete:
|
||||
delete_statement = table.update().\
|
||||
|
@ -23,6 +23,7 @@ Usual usage in an openstack.common module:
|
||||
"""
|
||||
|
||||
import copy
|
||||
import functools
|
||||
import gettext
|
||||
import locale
|
||||
from logging import handlers
|
||||
@ -35,6 +36,17 @@ import six
|
||||
_localedir = os.environ.get('glance'.upper() + '_LOCALEDIR')
|
||||
_t = gettext.translation('glance', localedir=_localedir, fallback=True)
|
||||
|
||||
# We use separate translation catalogs for each log level, so set up a
|
||||
# mapping between the log level name and the translator. The domain
|
||||
# for the log level is project_name + "-log-" + log_level so messages
|
||||
# for each level end up in their own catalog.
|
||||
_t_log_levels = dict(
|
||||
(level, gettext.translation('glance' + '-log-' + level,
|
||||
localedir=_localedir,
|
||||
fallback=True))
|
||||
for level in ['info', 'warning', 'error', 'critical']
|
||||
)
|
||||
|
||||
_AVAILABLE_LANGUAGES = {}
|
||||
USE_LAZY = False
|
||||
|
||||
@ -60,6 +72,28 @@ def _(msg):
|
||||
return _t.ugettext(msg)
|
||||
|
||||
|
||||
def _log_translation(msg, level):
|
||||
"""Build a single translation of a log message
|
||||
"""
|
||||
if USE_LAZY:
|
||||
return Message(msg, domain='glance' + '-log-' + level)
|
||||
else:
|
||||
translator = _t_log_levels[level]
|
||||
if six.PY3:
|
||||
return translator.gettext(msg)
|
||||
return translator.ugettext(msg)
|
||||
|
||||
# Translators for log levels.
|
||||
#
|
||||
# The abbreviated names are meant to reflect the usual use of a short
|
||||
# name like '_'. The "L" is for "log" and the other letter comes from
|
||||
# the level.
|
||||
_LI = functools.partial(_log_translation, level='info')
|
||||
_LW = functools.partial(_log_translation, level='warning')
|
||||
_LE = functools.partial(_log_translation, level='error')
|
||||
_LC = functools.partial(_log_translation, level='critical')
|
||||
|
||||
|
||||
def install(domain, lazy=False):
|
||||
"""Install a _() function using the given translation domain.
|
||||
|
||||
|
@ -35,10 +35,10 @@ import time
|
||||
|
||||
import fixtures
|
||||
import six.moves.urllib.parse as urlparse
|
||||
from sqlalchemy import create_engine
|
||||
import testtools
|
||||
|
||||
from glance.common import utils
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
from glance.openstack.common import jsonutils
|
||||
from glance.openstack.common import units
|
||||
from glance import tests as glance_tests
|
||||
@ -915,8 +915,7 @@ class FunctionalTest(test_utils.BaseTestCase):
|
||||
DB verification within the functional tests.
|
||||
The raw result set is returned.
|
||||
"""
|
||||
engine = create_engine(self.registry_server.sql_connection,
|
||||
pool_recycle=30)
|
||||
engine = db_api.get_engine()
|
||||
return engine.execute(sql)
|
||||
|
||||
def copy_data_file(self, file_name, dst_dir):
|
||||
|
@ -20,29 +20,38 @@ import testtools
|
||||
import glance
|
||||
from glance.cmd import manage
|
||||
from glance.db import migration as db_migration
|
||||
from glance.db.sqlalchemy import api as db_api
|
||||
from glance.openstack.common.db.sqlalchemy import migration
|
||||
|
||||
|
||||
class TestLegacyManage(testtools.TestCase):
|
||||
class TestManageBase(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestLegacyManage, self).setUp()
|
||||
super(TestManageBase, self).setUp()
|
||||
|
||||
def clear_conf():
|
||||
manage.CONF.reset()
|
||||
manage.CONF.unregister_opt(manage.command_opt)
|
||||
self.addCleanup(clear_conf)
|
||||
|
||||
self.patcher = mock.patch('glance.db.sqlalchemy.api.get_engine')
|
||||
self.patcher.start()
|
||||
self.addCleanup(self.patcher.stop)
|
||||
|
||||
def _main_test_helper(self, argv, func_name=None, *exp_args):
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.argv', argv))
|
||||
manage.main()
|
||||
func_name.assert_called_once_with(*exp_args)
|
||||
|
||||
|
||||
class TestLegacyManage(TestManageBase):
|
||||
|
||||
def test_legacy_db_version(self):
|
||||
migration.db_version = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_version'],
|
||||
glance.openstack.common.db.sqlalchemy.
|
||||
migration.db_version,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, 0)
|
||||
|
||||
def test_legacy_db_sync(self):
|
||||
@ -50,92 +59,92 @@ class TestLegacyManage(testtools.TestCase):
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_sync'],
|
||||
glance.openstack.common.db.sqlalchemy.
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_legacy_db_upgrade(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_upgrade'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_legacy_db_version_control(self):
|
||||
migration.db_version_control = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_version_control'],
|
||||
migration.db_version_control,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_legacy_db_sync_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_sync', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, '20')
|
||||
|
||||
def test_legacy_db_upgrade_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_upgrade', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, '20')
|
||||
|
||||
def test_legacy_db_downgrade_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db_downgrade', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, '20')
|
||||
|
||||
|
||||
class TestManage(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestManage, self).setUp()
|
||||
|
||||
def clear_conf():
|
||||
manage.CONF.reset()
|
||||
manage.CONF.unregister_opt(manage.command_opt)
|
||||
self.addCleanup(clear_conf)
|
||||
|
||||
def _main_test_helper(self, argv, func_name=None, *exp_args):
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.argv', argv))
|
||||
manage.main()
|
||||
func_name.assert_called_once_with(*exp_args)
|
||||
class TestManage(TestManageBase):
|
||||
|
||||
def test_db_version(self):
|
||||
migration.db_version = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'version'],
|
||||
migration.db_version,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, 0)
|
||||
|
||||
def test_db_sync(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'sync'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_db_upgrade(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'upgrade'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_db_version_control(self):
|
||||
migration.db_version_control = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'version_control'],
|
||||
migration.db_version_control,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, None)
|
||||
|
||||
def test_db_sync_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'sync', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, u'20')
|
||||
|
||||
def test_db_upgrade_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'upgrade', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, '20')
|
||||
|
||||
def test_db_downgrade_version(self):
|
||||
migration.db_sync = mock.Mock()
|
||||
self._main_test_helper(['glance.cmd.manage', 'db', 'downgrade', '20'],
|
||||
migration.db_sync,
|
||||
db_api.get_engine(),
|
||||
db_migration.MIGRATE_REPO_PATH, '20')
|
||||
|
Loading…
x
Reference in New Issue
Block a user