From 9d5d70e7f16d4ec344d5663a0e38d278cd543ef3 Mon Sep 17 00:00:00 2001 From: Endre Karlson Date: Thu, 5 Jun 2014 15:22:19 +0200 Subject: [PATCH] Switch to oslo.db and fix cmd.manage This change doesn't remove all of the d.sqla.* modules for compat so we don't need to change more things then needed. Things like multiple engines etc are supported via a _FACADE dict in d.sqla.session and other things we need as well. We remove d.o.common.db code since it is not needed anymore and technically replaced by oslo.db. Notably the *biggest* change is changing database_connection to connection. Also fixes problems with arguments in cmd.manage Change-Id: I948a42b7f3dbc1d484200e86b3f8d0b85769ad08 --- contrib/devstack/lib/designate | 3 +- .../lib/designate_plugins/backend-powerdns | 3 +- designate/backend/impl_powerdns/__init__.py | 8 +- designate/backend/impl_powerdns/models.py | 4 +- designate/cmd/manage.py | 117 ++- designate/manage/database.py | 72 +- designate/manage/powerdns.py | 74 +- designate/openstack/common/db/__init__.py | 0 designate/openstack/common/db/api.py | 162 ---- designate/openstack/common/db/exception.py | 56 -- designate/openstack/common/db/options.py | 171 ---- .../common/db/sqlalchemy/__init__.py | 0 .../common/db/sqlalchemy/migration.py | 278 ------ .../openstack/common/db/sqlalchemy/models.py | 119 --- .../common/db/sqlalchemy/provision.py | 157 --- .../openstack/common/db/sqlalchemy/session.py | 904 ------------------ .../common/db/sqlalchemy/test_base.py | 153 --- .../common/db/sqlalchemy/test_migrations.py | 269 ------ .../openstack/common/db/sqlalchemy/utils.py | 647 ------------- designate/sqlalchemy/models.py | 65 +- designate/sqlalchemy/session.py | 225 +---- designate/storage/impl_sqlalchemy/__init__.py | 20 +- designate/storage/impl_sqlalchemy/models.py | 12 +- designate/tests/__init__.py | 4 +- designate/tests/test_backend/test_powerdns.py | 2 +- doc/source/backends/powerdns.rst | 6 +- doc/source/configuration.rst | 2 +- doc/source/examples/basic-config-sample.conf | 4 +- doc/source/examples/config-mysql-pdns.conf | 4 +- doc/source/getting-started.rst | 6 +- doc/source/install/ubuntu.rst | 6 +- etc/designate/designate.conf.sample | 6 +- openstack-common.conf | 2 - requirements.txt | 1 + 34 files changed, 189 insertions(+), 3373 deletions(-) delete mode 100644 designate/openstack/common/db/__init__.py delete mode 100644 designate/openstack/common/db/api.py delete mode 100644 designate/openstack/common/db/exception.py delete mode 100644 designate/openstack/common/db/options.py delete mode 100644 designate/openstack/common/db/sqlalchemy/__init__.py delete mode 100644 designate/openstack/common/db/sqlalchemy/migration.py delete mode 100644 designate/openstack/common/db/sqlalchemy/models.py delete mode 100644 designate/openstack/common/db/sqlalchemy/provision.py delete mode 100644 designate/openstack/common/db/sqlalchemy/session.py delete mode 100644 designate/openstack/common/db/sqlalchemy/test_base.py delete mode 100644 designate/openstack/common/db/sqlalchemy/test_migrations.py delete mode 100644 designate/openstack/common/db/sqlalchemy/utils.py diff --git a/contrib/devstack/lib/designate b/contrib/devstack/lib/designate index c87438e98..6721d09a9 100644 --- a/contrib/devstack/lib/designate +++ b/contrib/devstack/lib/designate @@ -82,7 +82,7 @@ function configure_designate() { iniset $DESIGNATE_CONF DEFAULT verbose True iniset $DESIGNATE_CONF DEFAULT state_path $DESIGNATE_STATE_PATH iniset $DESIGNATE_CONF DEFAULT root-helper sudo designate-rootwrap $DESIGNATE_ROOTWRAP_CONF - iniset $DESIGNATE_CONF storage:sqlalchemy database_connection `database_connection_url designate` + iniset $DESIGNATE_CONF storage:sqlalchemy connection `database_connection_url designate` sudo cp $DESIGNATE_DIR/etc/designate/rootwrap.conf.sample $DESIGNATE_ROOTWRAP_CONF iniset $DESIGNATE_ROOTWRAP_CONF DEFAULT filters_path $DESIGNATE_DIR/etc/designate/rootwrap.d root-helper @@ -187,7 +187,6 @@ function init_designate() { recreate_database designate utf8 # Init and migrate designate database - designate-manage database init designate-manage database sync init_designate_backend diff --git a/contrib/devstack/lib/designate_plugins/backend-powerdns b/contrib/devstack/lib/designate_plugins/backend-powerdns index 72970d1dc..dada2f1c6 100644 --- a/contrib/devstack/lib/designate_plugins/backend-powerdns +++ b/contrib/devstack/lib/designate_plugins/backend-powerdns @@ -46,7 +46,7 @@ function install_designate_backend { # configure_designate_backend - make configuration changes, including those to other services function configure_designate_backend { - iniset $DESIGNATE_CONF backend:powerdns database_connection `database_connection_url designate_pdns` + iniset $DESIGNATE_CONF backend:powerdns connection `database_connection_url designate_pdns` sudo tee $POWERDNS_CFG_DIR/pdns.conf > /dev/null < module/class to load mapping - :type backend_mapping: dict - - :param lazy: load the DB backend lazily on the first DB API method call - :type lazy: bool - - Keyword arguments: - - :keyword use_db_reconnect: retry DB transactions on disconnect or not - :type use_db_reconnect: bool - - :keyword retry_interval: seconds between transaction retries - :type retry_interval: int - - :keyword inc_retry_interval: increase retry interval or not - :type inc_retry_interval: bool - - :keyword max_retry_interval: max interval value between retries - :type max_retry_interval: int - - :keyword max_retries: max number of retries before an error is raised - :type max_retries: int - - """ - - self._backend = None - self._backend_name = backend_name - self._backend_mapping = backend_mapping or {} - self._lock = threading.Lock() - - if not lazy: - self._load_backend() - - self.use_db_reconnect = kwargs.get('use_db_reconnect', False) - self.retry_interval = kwargs.get('retry_interval', 1) - self.inc_retry_interval = kwargs.get('inc_retry_interval', True) - self.max_retry_interval = kwargs.get('max_retry_interval', 10) - self.max_retries = kwargs.get('max_retries', 20) - - def _load_backend(self): - with self._lock: - if not self._backend: - # Import the untranslated name if we don't have a mapping - backend_path = self._backend_mapping.get(self._backend_name, - self._backend_name) - backend_mod = importutils.import_module(backend_path) - self._backend = backend_mod.get_backend() - - def __getattr__(self, key): - if not self._backend: - self._load_backend() - - attr = getattr(self._backend, key) - if not hasattr(attr, '__call__'): - return attr - # NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry - # DB API methods, decorated with @safe_for_db_retry - # on disconnect. - if self.use_db_reconnect and hasattr(attr, 'enable_retry'): - attr = wrap_db_retry( - retry_interval=self.retry_interval, - max_retries=self.max_retries, - inc_retry_interval=self.inc_retry_interval, - max_retry_interval=self.max_retry_interval)(attr) - - return attr diff --git a/designate/openstack/common/db/exception.py b/designate/openstack/common/db/exception.py deleted file mode 100644 index fd3ba2464..000000000 --- a/designate/openstack/common/db/exception.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""DB related custom exceptions.""" - -import six - -from designate.openstack.common.gettextutils import _ - - -class DBError(Exception): - """Wraps an implementation specific exception.""" - def __init__(self, inner_exception=None): - self.inner_exception = inner_exception - super(DBError, self).__init__(six.text_type(inner_exception)) - - -class DBDuplicateEntry(DBError): - """Wraps an implementation specific exception.""" - def __init__(self, columns=[], inner_exception=None): - self.columns = columns - super(DBDuplicateEntry, self).__init__(inner_exception) - - -class DBDeadlock(DBError): - def __init__(self, inner_exception=None): - super(DBDeadlock, self).__init__(inner_exception) - - -class DBInvalidUnicodeParameter(Exception): - message = _("Invalid Parameter: " - "Unicode is not supported by the current database.") - - -class DbMigrationError(DBError): - """Wraps migration specific exception.""" - def __init__(self, message=None): - super(DbMigrationError, self).__init__(message) - - -class DBConnectionError(DBError): - """Wraps connection specific exception.""" - pass diff --git a/designate/openstack/common/db/options.py b/designate/openstack/common/db/options.py deleted file mode 100644 index 6cc30ecfa..000000000 --- a/designate/openstack/common/db/options.py +++ /dev/null @@ -1,171 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy - -from oslo.config import cfg - - -database_opts = [ - cfg.StrOpt('sqlite_db', - deprecated_group='DEFAULT', - default='designate.sqlite', - help='The file name to use with SQLite'), - cfg.BoolOpt('sqlite_synchronous', - deprecated_group='DEFAULT', - default=True, - help='If True, SQLite uses synchronous mode'), - cfg.StrOpt('backend', - default='sqlalchemy', - deprecated_name='db_backend', - deprecated_group='DEFAULT', - help='The backend to use for db'), - cfg.StrOpt('connection', - help='The SQLAlchemy connection string used to connect to the ' - 'database', - secret=True, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_connection', - group='DATABASE'), - cfg.DeprecatedOpt('connection', - group='sql'), ]), - cfg.StrOpt('mysql_sql_mode', - default='TRADITIONAL', - help='The SQL mode to be used for MySQL sessions. ' - 'This option, including the default, overrides any ' - 'server-set SQL mode. To use whatever SQL mode ' - 'is set by the server configuration, ' - 'set this to no value. Example: mysql_sql_mode='), - cfg.IntOpt('idle_timeout', - default=3600, - deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_idle_timeout', - group='DATABASE'), - cfg.DeprecatedOpt('idle_timeout', - group='sql')], - help='Timeout before idle sql connections are reaped'), - cfg.IntOpt('min_pool_size', - default=1, - deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_min_pool_size', - group='DATABASE')], - help='Minimum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_pool_size', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_pool_size', - group='DATABASE')], - help='Maximum number of SQL connections to keep open in a ' - 'pool'), - cfg.IntOpt('max_retries', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries', - group='DEFAULT'), - cfg.DeprecatedOpt('sql_max_retries', - group='DATABASE')], - help='Maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('retry_interval', - default=10, - deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval', - group='DEFAULT'), - cfg.DeprecatedOpt('reconnect_interval', - group='DATABASE')], - help='Interval between retries of opening a sql connection'), - cfg.IntOpt('max_overflow', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow', - group='DEFAULT'), - cfg.DeprecatedOpt('sqlalchemy_max_overflow', - group='DATABASE')], - help='If set, use this value for max_overflow with sqlalchemy'), - cfg.IntOpt('connection_debug', - default=0, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug', - group='DEFAULT')], - help='Verbosity of SQL debugging information. 0=None, ' - '100=Everything'), - cfg.BoolOpt('connection_trace', - default=False, - deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace', - group='DEFAULT')], - help='Add python stack traces to SQL as comment strings'), - cfg.IntOpt('pool_timeout', - default=None, - deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout', - group='DATABASE')], - help='If set, use this value for pool_timeout with sqlalchemy'), - cfg.BoolOpt('use_db_reconnect', - default=False, - help='Enable the experimental use of database reconnect ' - 'on connection lost'), - cfg.IntOpt('db_retry_interval', - default=1, - help='seconds between db connection retries'), - cfg.BoolOpt('db_inc_retry_interval', - default=True, - help='Whether to increase interval between db connection ' - 'retries, up to db_max_retry_interval'), - cfg.IntOpt('db_max_retry_interval', - default=10, - help='max seconds between db connection retries, if ' - 'db_inc_retry_interval is enabled'), - cfg.IntOpt('db_max_retries', - default=20, - help='maximum db connection retries before error is raised. ' - '(setting -1 implies an infinite retry count)'), -] - -CONF = cfg.CONF -CONF.register_opts(database_opts, 'database') - - -def set_defaults(sql_connection, sqlite_db, max_pool_size=None, - max_overflow=None, pool_timeout=None): - """Set defaults for configuration variables.""" - cfg.set_defaults(database_opts, - connection=sql_connection, - sqlite_db=sqlite_db) - # Update the QueuePool defaults - if max_pool_size is not None: - cfg.set_defaults(database_opts, - max_pool_size=max_pool_size) - if max_overflow is not None: - cfg.set_defaults(database_opts, - max_overflow=max_overflow) - if pool_timeout is not None: - cfg.set_defaults(database_opts, - pool_timeout=pool_timeout) - - -def list_opts(): - """Returns a list of oslo.config options available in the library. - - The returned list includes all oslo.config options which may be registered - at runtime by the library. - - Each element of the list is a tuple. The first element is the name of the - group under which the list of elements in the second element will be - registered. A group name of None corresponds to the [DEFAULT] group in - config files. - - The purpose of this is to allow tools like the Oslo sample config file - generator to discover the options exposed to users by this library. - - :returns: a list of (group_name, opts) tuples - """ - return [('database', copy.deepcopy(database_opts))] diff --git a/designate/openstack/common/db/sqlalchemy/__init__.py b/designate/openstack/common/db/sqlalchemy/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/designate/openstack/common/db/sqlalchemy/migration.py b/designate/openstack/common/db/sqlalchemy/migration.py deleted file mode 100644 index c01d85dfc..000000000 --- a/designate/openstack/common/db/sqlalchemy/migration.py +++ /dev/null @@ -1,278 +0,0 @@ -# coding: utf-8 -# -# Copyright (c) 2013 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# -# Base on code in migrate/changeset/databases/sqlite.py which is under -# the following license: -# -# The MIT License -# -# Copyright (c) 2009 Evan Rosson, Jan Dittberner, Domen Kožar -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. - -import os -import re - -from migrate.changeset import ansisql -from migrate.changeset.databases import sqlite -from migrate import exceptions as versioning_exceptions -from migrate.versioning import api as versioning_api -from migrate.versioning.repository import Repository -import sqlalchemy -from sqlalchemy.schema import UniqueConstraint - -from designate.openstack.common.db import exception -from designate.openstack.common.gettextutils import _ - - -def _get_unique_constraints(self, table): - """Retrieve information about existing unique constraints of the table - - This feature is needed for _recreate_table() to work properly. - Unfortunately, it's not available in sqlalchemy 0.7.x/0.8.x. - - """ - - data = table.metadata.bind.execute( - """SELECT sql - FROM sqlite_master - WHERE - type='table' AND - name=:table_name""", - table_name=table.name - ).fetchone()[0] - - UNIQUE_PATTERN = "CONSTRAINT (\w+) UNIQUE \(([^\)]+)\)" - return [ - UniqueConstraint( - *[getattr(table.columns, c.strip(' "')) for c in cols.split(",")], - name=name - ) - for name, cols in re.findall(UNIQUE_PATTERN, data) - ] - - -def _recreate_table(self, table, column=None, delta=None, omit_uniques=None): - """Recreate the table properly - - Unlike the corresponding original method of sqlalchemy-migrate this one - doesn't drop existing unique constraints when creating a new one. - - """ - - table_name = self.preparer.format_table(table) - - # we remove all indexes so as not to have - # problems during copy and re-create - for index in table.indexes: - index.drop() - - # reflect existing unique constraints - for uc in self._get_unique_constraints(table): - table.append_constraint(uc) - # omit given unique constraints when creating a new table if required - table.constraints = set([ - cons for cons in table.constraints - if omit_uniques is None or cons.name not in omit_uniques - ]) - - self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) - self.execute() - - insertion_string = self._modify_table(table, column, delta) - - table.create(bind=self.connection) - self.append(insertion_string % {'table_name': table_name}) - self.execute() - self.append('DROP TABLE migration_tmp') - self.execute() - - -def _visit_migrate_unique_constraint(self, *p, **k): - """Drop the given unique constraint - - The corresponding original method of sqlalchemy-migrate just - raises NotImplemented error - - """ - - self.recreate_table(p[0].table, omit_uniques=[p[0].name]) - - -def patch_migrate(): - """A workaround for SQLite's inability to alter things - - SQLite abilities to alter tables are very limited (please read - http://www.sqlite.org/lang_altertable.html for more details). - E. g. one can't drop a column or a constraint in SQLite. The - workaround for this is to recreate the original table omitting - the corresponding constraint (or column). - - sqlalchemy-migrate library has recreate_table() method that - implements this workaround, but it does it wrong: - - - information about unique constraints of a table - is not retrieved. So if you have a table with one - unique constraint and a migration adding another one - you will end up with a table that has only the - latter unique constraint, and the former will be lost - - - dropping of unique constraints is not supported at all - - The proper way to fix this is to provide a pull-request to - sqlalchemy-migrate, but the project seems to be dead. So we - can go on with monkey-patching of the lib at least for now. - - """ - - # this patch is needed to ensure that recreate_table() doesn't drop - # existing unique constraints of the table when creating a new one - helper_cls = sqlite.SQLiteHelper - helper_cls.recreate_table = _recreate_table - helper_cls._get_unique_constraints = _get_unique_constraints - - # this patch is needed to be able to drop existing unique constraints - constraint_cls = sqlite.SQLiteConstraintDropper - constraint_cls.visit_migrate_unique_constraint = \ - _visit_migrate_unique_constraint - constraint_cls.__bases__ = (ansisql.ANSIColumnDropper, - sqlite.SQLiteConstraintGenerator) - - -def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True): - """Upgrade or downgrade a database. - - Function runs the upgrade() or downgrade() functions in change scripts. - - :param engine: SQLAlchemy engine instance for a given database - :param abs_path: Absolute path to migrate repository. - :param version: Database will upgrade/downgrade until this version. - If None - database will update to the latest - available version. - :param init_version: Initial database version - :param sanity_check: Require schema sanity checking for all tables - """ - - if version is not None: - try: - version = int(version) - except ValueError: - raise exception.DbMigrationError( - message=_("version should be an integer")) - - current_version = db_version(engine, abs_path, init_version) - repository = _find_migrate_repo(abs_path) - if sanity_check: - _db_schema_sanity_check(engine) - if version is None or version > current_version: - return versioning_api.upgrade(engine, repository, version) - else: - return versioning_api.downgrade(engine, repository, - version) - - -def _db_schema_sanity_check(engine): - """Ensure all database tables were created with required parameters. - - :param engine: SQLAlchemy engine instance for a given database - - """ - - if engine.name == 'mysql': - onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION ' - 'from information_schema.TABLES ' - 'where TABLE_SCHEMA=%s and ' - 'TABLE_COLLATION NOT LIKE "%%utf8%%"') - - # NOTE(morganfainberg): exclude the sqlalchemy-migrate and alembic - # versioning tables from the tables we need to verify utf8 status on. - # Non-standard table names are not supported. - EXCLUDED_TABLES = ['migrate_version', 'alembic_version'] - - table_names = [res[0] for res in - engine.execute(onlyutf8_sql, engine.url.database) if - res[0].lower() not in EXCLUDED_TABLES] - - if len(table_names) > 0: - raise ValueError(_('Tables "%s" have non utf8 collation, ' - 'please make sure all tables are CHARSET=utf8' - ) % ','.join(table_names)) - - -def db_version(engine, abs_path, init_version): - """Show the current version of the repository. - - :param engine: SQLAlchemy engine instance for a given database - :param abs_path: Absolute path to migrate repository - :param version: Initial database version - """ - repository = _find_migrate_repo(abs_path) - try: - return versioning_api.db_version(engine, repository) - except versioning_exceptions.DatabaseNotControlledError: - meta = sqlalchemy.MetaData() - meta.reflect(bind=engine) - tables = meta.tables - if len(tables) == 0 or 'alembic_version' in tables: - db_version_control(engine, abs_path, version=init_version) - return versioning_api.db_version(engine, repository) - else: - raise exception.DbMigrationError( - message=_( - "The database is not under version control, but has " - "tables. Please stamp the current version of the schema " - "manually.")) - - -def db_version_control(engine, abs_path, version=None): - """Mark a database as under this repository's version control. - - Once a database is under version control, schema changes should - only be done via change scripts in this repository. - - :param engine: SQLAlchemy engine instance for a given database - :param abs_path: Absolute path to migrate repository - :param version: Initial database version - """ - repository = _find_migrate_repo(abs_path) - versioning_api.version_control(engine, repository, version) - return version - - -def _find_migrate_repo(abs_path): - """Get the project's change script repository - - :param abs_path: Absolute path to migrate repository - """ - if not os.path.exists(abs_path): - raise exception.DbMigrationError("Path %s not found" % abs_path) - return Repository(abs_path) diff --git a/designate/openstack/common/db/sqlalchemy/models.py b/designate/openstack/common/db/sqlalchemy/models.py deleted file mode 100644 index 8dbd9fe99..000000000 --- a/designate/openstack/common/db/sqlalchemy/models.py +++ /dev/null @@ -1,119 +0,0 @@ -# Copyright (c) 2011 X.commerce, a business unit of eBay Inc. -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2011 Piston Cloud Computing, Inc. -# Copyright 2012 Cloudscaling Group, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -SQLAlchemy models. -""" - -import six - -from sqlalchemy import Column, Integer -from sqlalchemy import DateTime -from sqlalchemy.orm import object_mapper - -from designate.openstack.common import timeutils - - -class ModelBase(six.Iterator): - """Base class for models.""" - __table_initialized__ = False - - def save(self, session): - """Save this object.""" - - # NOTE(boris-42): This part of code should be look like: - # session.add(self) - # session.flush() - # But there is a bug in sqlalchemy and eventlet that - # raises NoneType exception if there is no running - # transaction and rollback is called. As long as - # sqlalchemy has this bug we have to create transaction - # explicitly. - with session.begin(subtransactions=True): - session.add(self) - session.flush() - - def __setitem__(self, key, value): - setattr(self, key, value) - - def __getitem__(self, key): - return getattr(self, key) - - def get(self, key, default=None): - return getattr(self, key, default) - - @property - def _extra_keys(self): - """Specifies custom fields - - Subclasses can override this property to return a list - of custom fields that should be included in their dict - representation. - - For reference check tests/db/sqlalchemy/test_models.py - """ - return [] - - def __iter__(self): - columns = dict(object_mapper(self).columns).keys() - # NOTE(russellb): Allow models to specify other keys that can be looked - # up, beyond the actual db columns. An example would be the 'name' - # property for an Instance. - columns.extend(self._extra_keys) - self._i = iter(columns) - return self - - # In Python 3, __next__() has replaced next(). - def __next__(self): - n = six.advance_iterator(self._i) - return n, getattr(self, n) - - def next(self): - return self.__next__() - - def update(self, values): - """Make the model object behave like a dict.""" - for k, v in six.iteritems(values): - setattr(self, k, v) - - def iteritems(self): - """Make the model object behave like a dict. - - Includes attributes from joins. - """ - local = dict(self) - joined = dict([(k, v) for k, v in six.iteritems(self.__dict__) - if not k[0] == '_']) - local.update(joined) - return six.iteritems(local) - - -class TimestampMixin(object): - created_at = Column(DateTime, default=lambda: timeutils.utcnow()) - updated_at = Column(DateTime, onupdate=lambda: timeutils.utcnow()) - - -class SoftDeleteMixin(object): - deleted_at = Column(DateTime) - deleted = Column(Integer, default=0) - - def soft_delete(self, session): - """Mark this object as deleted.""" - self.deleted = self.id - self.deleted_at = timeutils.utcnow() - self.save(session=session) diff --git a/designate/openstack/common/db/sqlalchemy/provision.py b/designate/openstack/common/db/sqlalchemy/provision.py deleted file mode 100644 index a53b059d2..000000000 --- a/designate/openstack/common/db/sqlalchemy/provision.py +++ /dev/null @@ -1,157 +0,0 @@ -# Copyright 2013 Mirantis.inc -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Provision test environment for specific DB backends""" - -import argparse -import logging -import os -import random -import string - -from six import moves -import sqlalchemy - -from designate.openstack.common.db import exception as exc - - -LOG = logging.getLogger(__name__) - - -def get_engine(uri): - """Engine creation - - Call the function without arguments to get admin connection. Admin - connection required to create temporary user and database for each - particular test. Otherwise use existing connection to recreate connection - to the temporary database. - """ - return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool) - - -def _execute_sql(engine, sql, driver): - """Initialize connection, execute sql query and close it.""" - try: - with engine.connect() as conn: - if driver == 'postgresql': - conn.connection.set_isolation_level(0) - for s in sql: - conn.execute(s) - except sqlalchemy.exc.OperationalError: - msg = ('%s does not match database admin ' - 'credentials or database does not exist.') - LOG.exception(msg % engine.url) - raise exc.DBConnectionError(msg % engine.url) - - -def create_database(engine): - """Provide temporary user and database for each particular test.""" - driver = engine.name - - auth = { - 'database': ''.join(random.choice(string.ascii_lowercase) - for i in moves.range(10)), - 'user': engine.url.username, - 'passwd': engine.url.password, - } - - sqls = [ - "drop database if exists %(database)s;", - "create database %(database)s;" - ] - - if driver == 'sqlite': - return 'sqlite:////tmp/%s' % auth['database'] - elif driver in ['mysql', 'postgresql']: - sql_query = map(lambda x: x % auth, sqls) - _execute_sql(engine, sql_query, driver) - else: - raise ValueError('Unsupported RDBMS %s' % driver) - - params = auth.copy() - params['backend'] = driver - return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params - - -def drop_database(admin_engine, current_uri): - """Drop temporary database and user after each particular test.""" - - engine = get_engine(current_uri) - driver = engine.name - auth = {'database': engine.url.database, 'user': engine.url.username} - - if driver == 'sqlite': - try: - os.remove(auth['database']) - except OSError: - pass - elif driver in ['mysql', 'postgresql']: - sql = "drop database if exists %(database)s;" - _execute_sql(admin_engine, [sql % auth], driver) - else: - raise ValueError('Unsupported RDBMS %s' % driver) - - -def main(): - """Controller to handle commands - - ::create: Create test user and database with random names. - ::drop: Drop user and database created by previous command. - """ - parser = argparse.ArgumentParser( - description='Controller to handle database creation and dropping' - ' commands.', - epilog='Under normal circumstances is not used directly.' - ' Used in .testr.conf to automate test database creation' - ' and dropping processes.') - subparsers = parser.add_subparsers( - help='Subcommands to manipulate temporary test databases.') - - create = subparsers.add_parser( - 'create', - help='Create temporary test ' - 'databases and users.') - create.set_defaults(which='create') - create.add_argument( - 'instances_count', - type=int, - help='Number of databases to create.') - - drop = subparsers.add_parser( - 'drop', - help='Drop temporary test databases and users.') - drop.set_defaults(which='drop') - drop.add_argument( - 'instances', - nargs='+', - help='List of databases uri to be dropped.') - - args = parser.parse_args() - - connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', - 'sqlite://') - engine = get_engine(connection_string) - which = args.which - - if which == "create": - for i in range(int(args.instances_count)): - print(create_database(engine)) - elif which == "drop": - for db in args.instances: - drop_database(engine, db) - - -if __name__ == "__main__": - main() diff --git a/designate/openstack/common/db/sqlalchemy/session.py b/designate/openstack/common/db/sqlalchemy/session.py deleted file mode 100644 index f971661b3..000000000 --- a/designate/openstack/common/db/sqlalchemy/session.py +++ /dev/null @@ -1,904 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Session Handling for SQLAlchemy backend. - -Recommended ways to use sessions within this framework: - -* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``. - `model_query()` will implicitly use a session when called without one - supplied. This is the ideal situation because it will allow queries - to be automatically retried if the database connection is interrupted. - - .. note:: Automatic retry will be enabled in a future patch. - - It is generally fine to issue several queries in a row like this. Even though - they may be run in separate transactions and/or separate sessions, each one - will see the data from the prior calls. If needed, undo- or rollback-like - functionality should be handled at a logical level. For an example, look at - the code around quotas and `reservation_rollback()`. - - Examples: - - .. code:: python - - def get_foo(context, foo): - return (model_query(context, models.Foo). - filter_by(foo=foo). - first()) - - def update_foo(context, id, newfoo): - (model_query(context, models.Foo). - filter_by(id=id). - update({'foo': newfoo})) - - def create_foo(context, values): - foo_ref = models.Foo() - foo_ref.update(values) - foo_ref.save() - return foo_ref - - -* Within the scope of a single method, keep all the reads and writes within - the context managed by a single session. In this way, the session's - `__exit__` handler will take care of calling `flush()` and `commit()` for - you. If using this approach, you should not explicitly call `flush()` or - `commit()`. Any error within the context of the session will cause the - session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be - raised in `session`'s `__exit__` handler, and any try/except within the - context managed by `session` will not be triggered. And catching other - non-database errors in the session will not trigger the ROLLBACK, so - exception handlers should always be outside the session, unless the - developer wants to do a partial commit on purpose. If the connection is - dropped before this is possible, the database will implicitly roll back the - transaction. - - .. note:: Statements in the session scope will not be automatically retried. - - If you create models within the session, they need to be added, but you - do not need to call `model.save()`: - - .. code:: python - - def create_many_foo(context, foos): - session = sessionmaker() - with session.begin(): - for foo in foos: - foo_ref = models.Foo() - foo_ref.update(foo) - session.add(foo_ref) - - def update_bar(context, foo_id, newbar): - session = sessionmaker() - with session.begin(): - foo_ref = (model_query(context, models.Foo, session). - filter_by(id=foo_id). - first()) - (model_query(context, models.Bar, session). - filter_by(id=foo_ref['bar_id']). - update({'bar': newbar})) - - .. note:: `update_bar` is a trivially simple example of using - ``with session.begin``. Whereas `create_many_foo` is a good example of - when a transaction is needed, it is always best to use as few queries as - possible. - - The two queries in `update_bar` can be better expressed using a single query - which avoids the need for an explicit transaction. It can be expressed like - so: - - .. code:: python - - def update_bar(context, foo_id, newbar): - subq = (model_query(context, models.Foo.id). - filter_by(id=foo_id). - limit(1). - subquery()) - (model_query(context, models.Bar). - filter_by(id=subq.as_scalar()). - update({'bar': newbar})) - - For reference, this emits approximately the following SQL statement: - - .. code:: sql - - UPDATE bar SET bar = ${newbar} - WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1); - - .. note:: `create_duplicate_foo` is a trivially simple example of catching an - exception while using ``with session.begin``. Here create two duplicate - instances with same primary key, must catch the exception out of context - managed by a single session: - - .. code:: python - - def create_duplicate_foo(context): - foo1 = models.Foo() - foo2 = models.Foo() - foo1.id = foo2.id = 1 - session = sessionmaker() - try: - with session.begin(): - session.add(foo1) - session.add(foo2) - except exception.DBDuplicateEntry as e: - handle_error(e) - -* Passing an active session between methods. Sessions should only be passed - to private methods. The private method must use a subtransaction; otherwise - SQLAlchemy will throw an error when you call `session.begin()` on an existing - transaction. Public methods should not accept a session parameter and should - not be involved in sessions within the caller's scope. - - Note that this incurs more overhead in SQLAlchemy than the above means - due to nesting transactions, and it is not possible to implicitly retry - failed database operations when using this approach. - - This also makes code somewhat more difficult to read and debug, because a - single database transaction spans more than one method. Error handling - becomes less clear in this situation. When this is needed for code clarity, - it should be clearly documented. - - .. code:: python - - def myfunc(foo): - session = sessionmaker() - with session.begin(): - # do some database things - bar = _private_func(foo, session) - return bar - - def _private_func(foo, session=None): - if not session: - session = sessionmaker() - with session.begin(subtransaction=True): - # do some other database things - return bar - - -There are some things which it is best to avoid: - -* Don't keep a transaction open any longer than necessary. - - This means that your ``with session.begin()`` block should be as short - as possible, while still containing all the related calls for that - transaction. - -* Avoid ``with_lockmode('UPDATE')`` when possible. - - In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match - any rows, it will take a gap-lock. This is a form of write-lock on the - "gap" where no rows exist, and prevents any other writes to that space. - This can effectively prevent any INSERT into a table by locking the gap - at the end of the index. Similar problems will occur if the SELECT FOR UPDATE - has an overly broad WHERE clause, or doesn't properly use an index. - - One idea proposed at ODS Fall '12 was to use a normal SELECT to test the - number of rows matching a query, and if only one row is returned, - then issue the SELECT FOR UPDATE. - - The better long-term solution is to use - ``INSERT .. ON DUPLICATE KEY UPDATE``. - However, this can not be done until the "deleted" columns are removed and - proper UNIQUE constraints are added to the tables. - - -Enabling soft deletes: - -* To use/enable soft-deletes, the `SoftDeleteMixin` must be added - to your model class. For example: - - .. code:: python - - class NovaBase(models.SoftDeleteMixin, models.ModelBase): - pass - - -Efficient use of soft deletes: - -* There are two possible ways to mark a record as deleted: - `model.soft_delete()` and `query.soft_delete()`. - - The `model.soft_delete()` method works with a single already-fetched entry. - `query.soft_delete()` makes only one db request for all entries that - correspond to the query. - -* In almost all cases you should use `query.soft_delete()`. Some examples: - - .. code:: python - - def soft_delete_bar(): - count = model_query(BarModel).find(some_condition).soft_delete() - if count == 0: - raise Exception("0 entries were soft deleted") - - def complex_soft_delete_with_synchronization_bar(session=None): - if session is None: - session = sessionmaker() - with session.begin(subtransactions=True): - count = (model_query(BarModel). - find(some_condition). - soft_delete(synchronize_session=True)) - # Here synchronize_session is required, because we - # don't know what is going on in outer session. - if count == 0: - raise Exception("0 entries were soft deleted") - -* There is only one situation where `model.soft_delete()` is appropriate: when - you fetch a single record, work with it, and mark it as deleted in the same - transaction. - - .. code:: python - - def soft_delete_bar_model(): - session = sessionmaker() - with session.begin(): - bar_ref = model_query(BarModel).find(some_condition).first() - # Work with bar_ref - bar_ref.soft_delete(session=session) - - However, if you need to work with all entries that correspond to query and - then soft delete them you should use the `query.soft_delete()` method: - - .. code:: python - - def soft_delete_multi_models(): - session = sessionmaker() - with session.begin(): - query = (model_query(BarModel, session=session). - find(some_condition)) - model_refs = query.all() - # Work with model_refs - query.soft_delete(synchronize_session=False) - # synchronize_session=False should be set if there is no outer - # session and these entries are not used after this. - - When working with many rows, it is very important to use query.soft_delete, - which issues a single query. Using `model.soft_delete()`, as in the following - example, is very inefficient. - - .. code:: python - - for bar_ref in bar_refs: - bar_ref.soft_delete(session=session) - # This will produce count(bar_refs) db requests. - -""" - -import functools -import logging -import re -import time - -import six -from sqlalchemy import exc as sqla_exc -from sqlalchemy.interfaces import PoolListener -import sqlalchemy.orm -from sqlalchemy.pool import NullPool, StaticPool -from sqlalchemy.sql.expression import literal_column - -from designate.openstack.common.db import exception -from designate.openstack.common.gettextutils import _LE, _LW -from designate.openstack.common import timeutils - - -LOG = logging.getLogger(__name__) - - -class SqliteForeignKeysListener(PoolListener): - """Ensures that the foreign key constraints are enforced in SQLite. - - The foreign key constraints are disabled by default in SQLite, - so the foreign key constraints will be enabled here for every - database connection - """ - def connect(self, dbapi_con, con_record): - dbapi_con.execute('pragma foreign_keys=ON') - - -# note(boris-42): In current versions of DB backends unique constraint -# violation messages follow the structure: -# -# sqlite: -# 1 column - (IntegrityError) column c1 is not unique -# N columns - (IntegrityError) column c1, c2, ..., N are not unique -# -# sqlite since 3.7.16: -# 1 column - (IntegrityError) UNIQUE constraint failed: tbl.k1 -# -# N columns - (IntegrityError) UNIQUE constraint failed: tbl.k1, tbl.k2 -# -# postgres: -# 1 column - (IntegrityError) duplicate key value violates unique -# constraint "users_c1_key" -# N columns - (IntegrityError) duplicate key value violates unique -# constraint "name_of_our_constraint" -# -# mysql: -# 1 column - (IntegrityError) (1062, "Duplicate entry 'value_of_c1' for key -# 'c1'") -# N columns - (IntegrityError) (1062, "Duplicate entry 'values joined -# with -' for key 'name_of_our_constraint'") -# -# ibm_db_sa: -# N columns - (IntegrityError) SQL0803N One or more values in the INSERT -# statement, UPDATE statement, or foreign key update caused by a -# DELETE statement are not valid because the primary key, unique -# constraint or unique index identified by "2" constrains table -# "NOVA.KEY_PAIRS" from having duplicate values for the index -# key. -_DUP_KEY_RE_DB = { - "sqlite": (re.compile(r"^.*columns?([^)]+)(is|are)\s+not\s+unique$"), - re.compile(r"^.*UNIQUE\s+constraint\s+failed:\s+(.+)$")), - "postgresql": (re.compile(r"^.*duplicate\s+key.*\"([^\"]+)\"\s*\n.*$"),), - "mysql": (re.compile(r"^.*\(1062,.*'([^\']+)'\"\)$"),), - "ibm_db_sa": (re.compile(r"^.*SQL0803N.*$"),), -} - - -def _raise_if_duplicate_entry_error(integrity_error, engine_name): - """Raise exception if two entries are duplicated. - - In this function will be raised DBDuplicateEntry exception if integrity - error wrap unique constraint violation. - """ - - def get_columns_from_uniq_cons_or_name(columns): - # note(vsergeyev): UniqueConstraint name convention: "uniq_t0c10c2" - # where `t` it is table name and columns `c1`, `c2` - # are in UniqueConstraint. - uniqbase = "uniq_" - if not columns.startswith(uniqbase): - if engine_name == "postgresql": - return [columns[columns.index("_") + 1:columns.rindex("_")]] - return [columns] - return columns[len(uniqbase):].split("0")[1:] - - if engine_name not in ["ibm_db_sa", "mysql", "sqlite", "postgresql"]: - return - - # FIXME(johannes): The usage of the .message attribute has been - # deprecated since Python 2.6. However, the exceptions raised by - # SQLAlchemy can differ when using unicode() and accessing .message. - # An audit across all three supported engines will be necessary to - # ensure there are no regressions. - for pattern in _DUP_KEY_RE_DB[engine_name]: - match = pattern.match(integrity_error.message) - if match: - break - else: - return - - # NOTE(mriedem): The ibm_db_sa integrity error message doesn't provide the - # columns so we have to omit that from the DBDuplicateEntry error. - columns = '' - - if engine_name != 'ibm_db_sa': - columns = match.group(1) - - if engine_name == "sqlite": - columns = [c.split('.')[-1] for c in columns.strip().split(", ")] - else: - columns = get_columns_from_uniq_cons_or_name(columns) - raise exception.DBDuplicateEntry(columns, integrity_error) - - -# NOTE(comstud): In current versions of DB backends, Deadlock violation -# messages follow the structure: -# -# mysql: -# (OperationalError) (1213, 'Deadlock found when trying to get lock; try ' -# 'restarting transaction') -_DEADLOCK_RE_DB = { - "mysql": re.compile(r"^.*\(1213, 'Deadlock.*") -} - - -def _raise_if_deadlock_error(operational_error, engine_name): - """Raise exception on deadlock condition. - - Raise DBDeadlock exception if OperationalError contains a Deadlock - condition. - """ - re = _DEADLOCK_RE_DB.get(engine_name) - if re is None: - return - # FIXME(johannes): The usage of the .message attribute has been - # deprecated since Python 2.6. However, the exceptions raised by - # SQLAlchemy can differ when using unicode() and accessing .message. - # An audit across all three supported engines will be necessary to - # ensure there are no regressions. - m = re.match(operational_error.message) - if not m: - return - raise exception.DBDeadlock(operational_error) - - -def _wrap_db_error(f): - @functools.wraps(f) - def _wrap(self, *args, **kwargs): - try: - assert issubclass( - self.__class__, sqlalchemy.orm.session.Session - ), ('_wrap_db_error() can only be applied to methods of ' - 'subclasses of sqlalchemy.orm.session.Session.') - - return f(self, *args, **kwargs) - except UnicodeEncodeError: - raise exception.DBInvalidUnicodeParameter() - except sqla_exc.OperationalError as e: - _raise_if_db_connection_lost(e, self.bind) - _raise_if_deadlock_error(e, self.bind.dialect.name) - # NOTE(comstud): A lot of code is checking for OperationalError - # so let's not wrap it for now. - raise - # note(boris-42): We should catch unique constraint violation and - # wrap it by our own DBDuplicateEntry exception. Unique constraint - # violation is wrapped by IntegrityError. - except sqla_exc.IntegrityError as e: - # note(boris-42): SqlAlchemy doesn't unify errors from different - # DBs so we must do this. Also in some tables (for example - # instance_types) there are more than one unique constraint. This - # means we should get names of columns, which values violate - # unique constraint, from error message. - _raise_if_duplicate_entry_error(e, self.bind.dialect.name) - raise exception.DBError(e) - except Exception as e: - LOG.exception(_LE('DB exception wrapped.')) - raise exception.DBError(e) - return _wrap - - -def _synchronous_switch_listener(dbapi_conn, connection_rec): - """Switch sqlite connections to non-synchronous mode.""" - dbapi_conn.execute("PRAGMA synchronous = OFF") - - -def _add_regexp_listener(dbapi_con, con_record): - """Add REGEXP function to sqlite connections.""" - - def regexp(expr, item): - reg = re.compile(expr) - return reg.search(six.text_type(item)) is not None - dbapi_con.create_function('regexp', 2, regexp) - - -def _thread_yield(dbapi_con, con_record): - """Ensure other greenthreads get a chance to be executed. - - If we use eventlet.monkey_patch(), eventlet.greenthread.sleep(0) will - execute instead of time.sleep(0). - Force a context switch. With common database backends (eg MySQLdb and - sqlite), there is no implicit yield caused by network I/O since they are - implemented by C libraries that eventlet cannot monkey patch. - """ - time.sleep(0) - - -def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy): - """Ensures that MySQL and DB2 connections are alive. - - Borrowed from: - http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f - """ - cursor = dbapi_conn.cursor() - try: - ping_sql = 'select 1' - if engine.name == 'ibm_db_sa': - # DB2 requires a table expression - ping_sql = 'select 1 from (values (1)) AS t1' - cursor.execute(ping_sql) - except Exception as ex: - if engine.dialect.is_disconnect(ex, dbapi_conn, cursor): - msg = _LW('Database server has gone away: %s') % ex - LOG.warning(msg) - - # if the database server has gone away, all connections in the pool - # have become invalid and we can safely close all of them here, - # rather than waste time on checking of every single connection - engine.dispose() - - # this will be handled by SQLAlchemy and will force it to create - # a new connection and retry the original action - raise sqla_exc.DisconnectionError(msg) - else: - raise - - -def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None): - """Set the sql_mode session variable. - - MySQL supports several server modes. The default is None, but sessions - may choose to enable server modes like TRADITIONAL, ANSI, - several STRICT_* modes and others. - - Note: passing in '' (empty string) for sql_mode clears - the SQL mode for the session, overriding a potentially set - server default. - """ - - cursor = dbapi_con.cursor() - cursor.execute("SET SESSION sql_mode = %s", [sql_mode]) - - -def _mysql_get_effective_sql_mode(engine): - """Returns the effective SQL mode for connections from the engine pool. - - Returns ``None`` if the mode isn't available, otherwise returns the mode. - - """ - # Get the real effective SQL mode. Even when unset by - # our own config, the server may still be operating in a specific - # SQL mode as set by the server configuration. - # Also note that the checkout listener will be called on execute to - # set the mode if it's registered. - row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone() - if row is None: - return - return row[1] - - -def _mysql_check_effective_sql_mode(engine): - """Logs a message based on the effective SQL mode for MySQL connections.""" - realmode = _mysql_get_effective_sql_mode(engine) - - if realmode is None: - LOG.warning(_LW('Unable to detect effective SQL mode')) - return - - LOG.debug('MySQL server mode set to %s', realmode) - # 'TRADITIONAL' mode enables several other modes, so - # we need a substring match here - if not ('TRADITIONAL' in realmode.upper() or - 'STRICT_ALL_TABLES' in realmode.upper()): - LOG.warning(_LW("MySQL SQL mode is '%s', " - "consider enabling TRADITIONAL or STRICT_ALL_TABLES"), - realmode) - - -def _mysql_set_mode_callback(engine, sql_mode): - if sql_mode is not None: - mode_callback = functools.partial(_set_session_sql_mode, - sql_mode=sql_mode) - sqlalchemy.event.listen(engine, 'connect', mode_callback) - _mysql_check_effective_sql_mode(engine) - - -def _is_db_connection_error(args): - """Return True if error in connecting to db.""" - # NOTE(adam_g): This is currently MySQL specific and needs to be extended - # to support Postgres and others. - # For the db2, the error code is -30081 since the db2 is still not ready - conn_err_codes = ('2002', '2003', '2006', '2013', '-30081') - for err_code in conn_err_codes: - if args.find(err_code) != -1: - return True - return False - - -def _raise_if_db_connection_lost(error, engine): - # NOTE(vsergeyev): Function is_disconnect(e, connection, cursor) - # requires connection and cursor in incoming parameters, - # but we have no possibility to create connection if DB - # is not available, so in such case reconnect fails. - # But is_disconnect() ignores these parameters, so it - # makes sense to pass to function None as placeholder - # instead of connection and cursor. - if engine.dialect.is_disconnect(error, None, None): - raise exception.DBConnectionError(error) - - -def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, - idle_timeout=3600, - connection_debug=0, max_pool_size=None, max_overflow=None, - pool_timeout=None, sqlite_synchronous=True, - connection_trace=False, max_retries=10, retry_interval=10): - """Return a new SQLAlchemy engine.""" - - connection_dict = sqlalchemy.engine.url.make_url(sql_connection) - - engine_args = { - "pool_recycle": idle_timeout, - 'convert_unicode': True, - } - - logger = logging.getLogger('sqlalchemy.engine') - - # Map SQL debug level to Python log level - if connection_debug >= 100: - logger.setLevel(logging.DEBUG) - elif connection_debug >= 50: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.WARNING) - - if "sqlite" in connection_dict.drivername: - if sqlite_fk: - engine_args["listeners"] = [SqliteForeignKeysListener()] - engine_args["poolclass"] = NullPool - - if sql_connection == "sqlite://": - engine_args["poolclass"] = StaticPool - engine_args["connect_args"] = {'check_same_thread': False} - else: - if max_pool_size is not None: - engine_args['pool_size'] = max_pool_size - if max_overflow is not None: - engine_args['max_overflow'] = max_overflow - if pool_timeout is not None: - engine_args['pool_timeout'] = pool_timeout - - engine = sqlalchemy.create_engine(sql_connection, **engine_args) - - sqlalchemy.event.listen(engine, 'checkin', _thread_yield) - - if engine.name in ['mysql', 'ibm_db_sa']: - ping_callback = functools.partial(_ping_listener, engine) - sqlalchemy.event.listen(engine, 'checkout', ping_callback) - if engine.name == 'mysql': - if mysql_sql_mode: - _mysql_set_mode_callback(engine, mysql_sql_mode) - elif 'sqlite' in connection_dict.drivername: - if not sqlite_synchronous: - sqlalchemy.event.listen(engine, 'connect', - _synchronous_switch_listener) - sqlalchemy.event.listen(engine, 'connect', _add_regexp_listener) - - if connection_trace and engine.dialect.dbapi.__name__ == 'MySQLdb': - _patch_mysqldb_with_stacktrace_comments() - - try: - engine.connect() - except sqla_exc.OperationalError as e: - if not _is_db_connection_error(e.args[0]): - raise - - remaining = max_retries - if remaining == -1: - remaining = 'infinite' - while True: - msg = _LW('SQL connection failed. %s attempts left.') - LOG.warning(msg % remaining) - if remaining != 'infinite': - remaining -= 1 - time.sleep(retry_interval) - try: - engine.connect() - break - except sqla_exc.OperationalError as e: - if (remaining != 'infinite' and remaining == 0) or \ - not _is_db_connection_error(e.args[0]): - raise - return engine - - -class Query(sqlalchemy.orm.query.Query): - """Subclass of sqlalchemy.query with soft_delete() method.""" - def soft_delete(self, synchronize_session='evaluate'): - return self.update({'deleted': literal_column('id'), - 'updated_at': literal_column('updated_at'), - 'deleted_at': timeutils.utcnow()}, - synchronize_session=synchronize_session) - - -class Session(sqlalchemy.orm.session.Session): - """Custom Session class to avoid SqlAlchemy Session monkey patching.""" - @_wrap_db_error - def query(self, *args, **kwargs): - return super(Session, self).query(*args, **kwargs) - - @_wrap_db_error - def flush(self, *args, **kwargs): - return super(Session, self).flush(*args, **kwargs) - - @_wrap_db_error - def execute(self, *args, **kwargs): - return super(Session, self).execute(*args, **kwargs) - - -def get_maker(engine, autocommit=True, expire_on_commit=False): - """Return a SQLAlchemy sessionmaker using the given engine.""" - return sqlalchemy.orm.sessionmaker(bind=engine, - class_=Session, - autocommit=autocommit, - expire_on_commit=expire_on_commit, - query_cls=Query) - - -def _patch_mysqldb_with_stacktrace_comments(): - """Adds current stack trace as a comment in queries. - - Patches MySQLdb.cursors.BaseCursor._do_query. - """ - import MySQLdb.cursors - import traceback - - old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query - - def _do_query(self, q): - stack = '' - for filename, line, method, function in traceback.extract_stack(): - # exclude various common things from trace - if filename.endswith('session.py') and method == '_do_query': - continue - if filename.endswith('api.py') and method == 'wrapper': - continue - if filename.endswith('utils.py') and method == '_inner': - continue - if filename.endswith('exception.py') and method == '_wrap': - continue - # db/api is just a wrapper around db/sqlalchemy/api - if filename.endswith('db/api.py'): - continue - # only trace inside designate - index = filename.rfind('designate') - if index == -1: - continue - stack += "File:%s:%s Method:%s() Line:%s | " \ - % (filename[index:], line, method, function) - - # strip trailing " | " from stack - if stack: - stack = stack[:-3] - qq = "%s /* %s */" % (q, stack) - else: - qq = q - old_mysql_do_query(self, qq) - - setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) - - -class EngineFacade(object): - """A helper class for removing of global engine instances from designate.db. - - As a library, designate.db can't decide where to store/when to create engine - and sessionmaker instances, so this must be left for a target application. - - On the other hand, in order to simplify the adoption of designate.db changes, - we'll provide a helper class, which creates engine and sessionmaker - on its instantiation and provides get_engine()/get_session() methods - that are compatible with corresponding utility functions that currently - exist in target projects, e.g. in Nova. - - engine/sessionmaker instances will still be global (and they are meant to - be global), but they will be stored in the app context, rather that in the - designate.db context. - - Note: using of this helper is completely optional and you are encouraged to - integrate engine/sessionmaker instances into your apps any way you like - (e.g. one might want to bind a session to a request context). Two important - things to remember: - - 1. An Engine instance is effectively a pool of DB connections, so it's - meant to be shared (and it's thread-safe). - 2. A Session instance is not meant to be shared and represents a DB - transactional context (i.e. it's not thread-safe). sessionmaker is - a factory of sessions. - - """ - - def __init__(self, sql_connection, - sqlite_fk=False, autocommit=True, - expire_on_commit=False, **kwargs): - """Initialize engine and sessionmaker instances. - - :param sqlite_fk: enable foreign keys in SQLite - :type sqlite_fk: bool - - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - - :param expire_on_commit: expire session objects on commit - :type expire_on_commit: bool - - Keyword arguments: - - :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. - (defaults to TRADITIONAL) - :keyword idle_timeout: timeout before idle sql connections are reaped - (defaults to 3600) - :keyword connection_debug: verbosity of SQL debugging information. - 0=None, 100=Everything (defaults to 0) - :keyword max_pool_size: maximum number of SQL connections to keep open - in a pool (defaults to SQLAlchemy settings) - :keyword max_overflow: if set, use this value for max_overflow with - sqlalchemy (defaults to SQLAlchemy settings) - :keyword pool_timeout: if set, use this value for pool_timeout with - sqlalchemy (defaults to SQLAlchemy settings) - :keyword sqlite_synchronous: if True, SQLite uses synchronous mode - (defaults to True) - :keyword connection_trace: add python stack traces to SQL as comment - strings (defaults to False) - :keyword max_retries: maximum db connection retries during startup. - (setting -1 implies an infinite retry count) - (defaults to 10) - :keyword retry_interval: interval between retries of opening a sql - connection (defaults to 10) - - """ - - super(EngineFacade, self).__init__() - - self._engine = create_engine( - sql_connection=sql_connection, - sqlite_fk=sqlite_fk, - mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'), - idle_timeout=kwargs.get('idle_timeout', 3600), - connection_debug=kwargs.get('connection_debug', 0), - max_pool_size=kwargs.get('max_pool_size'), - max_overflow=kwargs.get('max_overflow'), - pool_timeout=kwargs.get('pool_timeout'), - sqlite_synchronous=kwargs.get('sqlite_synchronous', True), - connection_trace=kwargs.get('connection_trace', False), - max_retries=kwargs.get('max_retries', 10), - retry_interval=kwargs.get('retry_interval', 10)) - self._session_maker = get_maker( - engine=self._engine, - autocommit=autocommit, - expire_on_commit=expire_on_commit) - - def get_engine(self): - """Get the engine instance (note, that it's shared).""" - - return self._engine - - def get_session(self, **kwargs): - """Get a Session instance. - - If passed, keyword arguments values override the ones used when the - sessionmaker instance was created. - - :keyword autocommit: use autocommit mode for created Session instances - :type autocommit: bool - - :keyword expire_on_commit: expire session objects on commit - :type expire_on_commit: bool - - """ - - for arg in kwargs: - if arg not in ('autocommit', 'expire_on_commit'): - del kwargs[arg] - - return self._session_maker(**kwargs) - - @classmethod - def from_config(cls, connection_string, conf, - sqlite_fk=False, autocommit=True, expire_on_commit=False): - """Initialize EngineFacade using oslo.config config instance options. - - :param connection_string: SQLAlchemy connection string - :type connection_string: string - - :param conf: oslo.config config instance - :type conf: oslo.config.cfg.ConfigOpts - - :param sqlite_fk: enable foreign keys in SQLite - :type sqlite_fk: bool - - :param autocommit: use autocommit mode for created Session instances - :type autocommit: bool - - :param expire_on_commit: expire session objects on commit - :type expire_on_commit: bool - - """ - - return cls(sql_connection=connection_string, - sqlite_fk=sqlite_fk, - autocommit=autocommit, - expire_on_commit=expire_on_commit, - **dict(conf.database.items())) diff --git a/designate/openstack/common/db/sqlalchemy/test_base.py b/designate/openstack/common/db/sqlalchemy/test_base.py deleted file mode 100644 index 502713a67..000000000 --- a/designate/openstack/common/db/sqlalchemy/test_base.py +++ /dev/null @@ -1,153 +0,0 @@ -# Copyright (c) 2013 OpenStack Foundation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import functools -import os - -import fixtures -import six - -from designate.openstack.common.db.sqlalchemy import session -from designate.openstack.common.db.sqlalchemy import utils -from designate.openstack.common.fixture import lockutils -from designate.openstack.common import test - - -class DbFixture(fixtures.Fixture): - """Basic database fixture. - - Allows to run tests on various db backends, such as SQLite, MySQL and - PostgreSQL. By default use sqlite backend. To override default backend - uri set env variable OS_TEST_DBAPI_CONNECTION with database admin - credentials for specific backend. - """ - - def _get_uri(self): - return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://') - - def __init__(self, test): - super(DbFixture, self).__init__() - - self.test = test - - def setUp(self): - super(DbFixture, self).setUp() - - self.test.engine = session.create_engine(self._get_uri()) - self.test.sessionmaker = session.get_maker(self.test.engine) - self.addCleanup(self.test.engine.dispose) - - -class DbTestCase(test.BaseTestCase): - """Base class for testing of DB code. - - Using `DbFixture`. Intended to be the main database test case to use all - the tests on a given backend with user defined uri. Backend specific - tests should be decorated with `backend_specific` decorator. - """ - - FIXTURE = DbFixture - - def setUp(self): - super(DbTestCase, self).setUp() - self.useFixture(self.FIXTURE(self)) - - -ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql'] - - -def backend_specific(*dialects): - """Decorator to skip backend specific tests on inappropriate engines. - - ::dialects: list of dialects names under which the test will be launched. - """ - def wrap(f): - @functools.wraps(f) - def ins_wrap(self): - if not set(dialects).issubset(ALLOWED_DIALECTS): - raise ValueError( - "Please use allowed dialects: %s" % ALLOWED_DIALECTS) - if self.engine.name not in dialects: - msg = ('The test "%s" can be run ' - 'only on %s. Current engine is %s.') - args = (f.__name__, ' '.join(dialects), self.engine.name) - self.skip(msg % args) - else: - return f(self) - return ins_wrap - return wrap - - -@six.add_metaclass(abc.ABCMeta) -class OpportunisticFixture(DbFixture): - """Base fixture to use default CI databases. - - The databases exist in OpenStack CI infrastructure. But for the - correct functioning in local environment the databases must be - created manually. - """ - - DRIVER = abc.abstractproperty(lambda: None) - DBNAME = PASSWORD = USERNAME = 'openstack_citest' - - def _get_uri(self): - return utils.get_connect_string(backend=self.DRIVER, - user=self.USERNAME, - passwd=self.PASSWORD, - database=self.DBNAME) - - -@six.add_metaclass(abc.ABCMeta) -class OpportunisticTestCase(DbTestCase): - """Base test case to use default CI databases. - - The subclasses of the test case are running only when openstack_citest - database is available otherwise a tests will be skipped. - """ - - FIXTURE = abc.abstractproperty(lambda: None) - - def setUp(self): - # TODO(bnemec): Remove this once infra is ready for - # https://review.openstack.org/#/c/74963/ to merge. - self.useFixture(lockutils.LockFixture('opportunistic-db')) - credentials = { - 'backend': self.FIXTURE.DRIVER, - 'user': self.FIXTURE.USERNAME, - 'passwd': self.FIXTURE.PASSWORD, - 'database': self.FIXTURE.DBNAME} - - if self.FIXTURE.DRIVER and not utils.is_backend_avail(**credentials): - msg = '%s backend is not available.' % self.FIXTURE.DRIVER - return self.skip(msg) - - super(OpportunisticTestCase, self).setUp() - - -class MySQLOpportunisticFixture(OpportunisticFixture): - DRIVER = 'mysql' - - -class PostgreSQLOpportunisticFixture(OpportunisticFixture): - DRIVER = 'postgresql' - - -class MySQLOpportunisticTestCase(OpportunisticTestCase): - FIXTURE = MySQLOpportunisticFixture - - -class PostgreSQLOpportunisticTestCase(OpportunisticTestCase): - FIXTURE = PostgreSQLOpportunisticFixture diff --git a/designate/openstack/common/db/sqlalchemy/test_migrations.py b/designate/openstack/common/db/sqlalchemy/test_migrations.py deleted file mode 100644 index 159c19228..000000000 --- a/designate/openstack/common/db/sqlalchemy/test_migrations.py +++ /dev/null @@ -1,269 +0,0 @@ -# Copyright 2010-2011 OpenStack Foundation -# Copyright 2012-2013 IBM Corp. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import functools -import logging -import os -import subprocess - -import lockfile -from six import moves -from six.moves.urllib import parse -import sqlalchemy -import sqlalchemy.exc - -from designate.openstack.common.db.sqlalchemy import utils -from designate.openstack.common.gettextutils import _LE -from designate.openstack.common import test - -LOG = logging.getLogger(__name__) - - -def _have_mysql(user, passwd, database): - present = os.environ.get('TEST_MYSQL_PRESENT') - if present is None: - return utils.is_backend_avail(backend='mysql', - user=user, - passwd=passwd, - database=database) - return present.lower() in ('', 'true') - - -def _have_postgresql(user, passwd, database): - present = os.environ.get('TEST_POSTGRESQL_PRESENT') - if present is None: - return utils.is_backend_avail(backend='postgres', - user=user, - passwd=passwd, - database=database) - return present.lower() in ('', 'true') - - -def _set_db_lock(lock_path=None, lock_prefix=None): - def decorator(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - try: - path = lock_path or os.environ.get("DESIGNATE_LOCK_PATH") - lock = lockfile.FileLock(os.path.join(path, lock_prefix)) - with lock: - LOG.debug('Got lock "%s"' % f.__name__) - return f(*args, **kwargs) - finally: - LOG.debug('Lock released "%s"' % f.__name__) - return wrapper - return decorator - - -class BaseMigrationTestCase(test.BaseTestCase): - """Base class fort testing of migration utils.""" - - def __init__(self, *args, **kwargs): - super(BaseMigrationTestCase, self).__init__(*args, **kwargs) - - self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), - 'test_migrations.conf') - # Test machines can set the TEST_MIGRATIONS_CONF variable - # to override the location of the config file for migration testing - self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF', - self.DEFAULT_CONFIG_FILE) - self.test_databases = {} - self.migration_api = None - - def setUp(self): - super(BaseMigrationTestCase, self).setUp() - - # Load test databases from the config file. Only do this - # once. No need to re-run this on each test... - LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH) - if os.path.exists(self.CONFIG_FILE_PATH): - cp = moves.configparser.RawConfigParser() - try: - cp.read(self.CONFIG_FILE_PATH) - defaults = cp.defaults() - for key, value in defaults.items(): - self.test_databases[key] = value - except moves.configparser.ParsingError as e: - self.fail("Failed to read test_migrations.conf config " - "file. Got error: %s" % e) - else: - self.fail("Failed to find test_migrations.conf config " - "file.") - - self.engines = {} - for key, value in self.test_databases.items(): - self.engines[key] = sqlalchemy.create_engine(value) - - # We start each test case with a completely blank slate. - self._reset_databases() - - def tearDown(self): - # We destroy the test data store between each test case, - # and recreate it, which ensures that we have no side-effects - # from the tests - self._reset_databases() - super(BaseMigrationTestCase, self).tearDown() - - def execute_cmd(self, cmd=None): - process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - output = process.communicate()[0] - LOG.debug(output) - self.assertEqual(0, process.returncode, - "Failed to run: %s\n%s" % (cmd, output)) - - def _reset_pg(self, conn_pieces): - (user, - password, - database, - host) = utils.get_db_connection_info(conn_pieces) - os.environ['PGPASSWORD'] = password - os.environ['PGUSER'] = user - # note(boris-42): We must create and drop database, we can't - # drop database which we have connected to, so for such - # operations there is a special database template1. - sqlcmd = ("psql -w -U %(user)s -h %(host)s -c" - " '%(sql)s' -d template1") - - sql = ("drop database if exists %s;") % database - droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql} - self.execute_cmd(droptable) - - sql = ("create database %s;") % database - createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql} - self.execute_cmd(createtable) - - os.unsetenv('PGPASSWORD') - os.unsetenv('PGUSER') - - @_set_db_lock(lock_prefix='migration_tests-') - def _reset_databases(self): - for key, engine in self.engines.items(): - conn_string = self.test_databases[key] - conn_pieces = parse.urlparse(conn_string) - engine.dispose() - if conn_string.startswith('sqlite'): - # We can just delete the SQLite database, which is - # the easiest and cleanest solution - db_path = conn_pieces.path.strip('/') - if os.path.exists(db_path): - os.unlink(db_path) - # No need to recreate the SQLite DB. SQLite will - # create it for us if it's not there... - elif conn_string.startswith('mysql'): - # We can execute the MySQL client to destroy and re-create - # the MYSQL database, which is easier and less error-prone - # than using SQLAlchemy to do this via MetaData...trust me. - (user, password, database, host) = \ - utils.get_db_connection_info(conn_pieces) - sql = ("drop database if exists %(db)s; " - "create database %(db)s;") % {'db': database} - cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s " - "-e \"%(sql)s\"") % {'user': user, 'password': password, - 'host': host, 'sql': sql} - self.execute_cmd(cmd) - elif conn_string.startswith('postgresql'): - self._reset_pg(conn_pieces) - - -class WalkVersionsMixin(object): - def _walk_versions(self, engine=None, snake_walk=False, downgrade=True): - # Determine latest version script from the repo, then - # upgrade from 1 through to the latest, with no data - # in the databases. This just checks that the schema itself - # upgrades successfully. - - # Place the database under version control - self.migration_api.version_control(engine, self.REPOSITORY, - self.INIT_VERSION) - self.assertEqual(self.INIT_VERSION, - self.migration_api.db_version(engine, - self.REPOSITORY)) - - LOG.debug('latest version is %s' % self.REPOSITORY.latest) - versions = range(self.INIT_VERSION + 1, self.REPOSITORY.latest + 1) - - for version in versions: - # upgrade -> downgrade -> upgrade - self._migrate_up(engine, version, with_data=True) - if snake_walk: - downgraded = self._migrate_down( - engine, version - 1, with_data=True) - if downgraded: - self._migrate_up(engine, version) - - if downgrade: - # Now walk it back down to 0 from the latest, testing - # the downgrade paths. - for version in reversed(versions): - # downgrade -> upgrade -> downgrade - downgraded = self._migrate_down(engine, version - 1) - - if snake_walk and downgraded: - self._migrate_up(engine, version) - self._migrate_down(engine, version - 1) - - def _migrate_down(self, engine, version, with_data=False): - try: - self.migration_api.downgrade(engine, self.REPOSITORY, version) - except NotImplementedError: - # NOTE(sirp): some migrations, namely release-level - # migrations, don't support a downgrade. - return False - - self.assertEqual( - version, self.migration_api.db_version(engine, self.REPOSITORY)) - - # NOTE(sirp): `version` is what we're downgrading to (i.e. the 'target' - # version). So if we have any downgrade checks, they need to be run for - # the previous (higher numbered) migration. - if with_data: - post_downgrade = getattr( - self, "_post_downgrade_%03d" % (version + 1), None) - if post_downgrade: - post_downgrade(engine) - - return True - - def _migrate_up(self, engine, version, with_data=False): - """migrate up to a new version of the db. - - We allow for data insertion and post checks at every - migration version with special _pre_upgrade_### and - _check_### functions in the main test. - """ - # NOTE(sdague): try block is here because it's impossible to debug - # where a failed data migration happens otherwise - try: - if with_data: - data = None - pre_upgrade = getattr( - self, "_pre_upgrade_%03d" % version, None) - if pre_upgrade: - data = pre_upgrade(engine) - - self.migration_api.upgrade(engine, self.REPOSITORY, version) - self.assertEqual(version, - self.migration_api.db_version(engine, - self.REPOSITORY)) - if with_data: - check = getattr(self, "_check_%03d" % version, None) - if check: - check(engine, data) - except Exception: - LOG.error(_LE("Failed to migrate to version %s on engine %s") % - (version, engine)) - raise diff --git a/designate/openstack/common/db/sqlalchemy/utils.py b/designate/openstack/common/db/sqlalchemy/utils.py deleted file mode 100644 index 177cea460..000000000 --- a/designate/openstack/common/db/sqlalchemy/utils.py +++ /dev/null @@ -1,647 +0,0 @@ -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# Copyright 2010-2011 OpenStack Foundation. -# Copyright 2012 Justin Santa Barbara -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import re - -import sqlalchemy -from sqlalchemy import Boolean -from sqlalchemy import CheckConstraint -from sqlalchemy import Column -from sqlalchemy.engine import reflection -from sqlalchemy.ext.compiler import compiles -from sqlalchemy import func -from sqlalchemy import Index -from sqlalchemy import Integer -from sqlalchemy import MetaData -from sqlalchemy import or_ -from sqlalchemy.sql.expression import literal_column -from sqlalchemy.sql.expression import UpdateBase -from sqlalchemy import String -from sqlalchemy import Table -from sqlalchemy.types import NullType - -from designate.openstack.common import context as request_context -from designate.openstack.common.db.sqlalchemy import models -from designate.openstack.common.gettextutils import _, _LI, _LW -from designate.openstack.common import timeutils - - -LOG = logging.getLogger(__name__) - -_DBURL_REGEX = re.compile(r"[^:]+://([^:]+):([^@]+)@.+") - - -def sanitize_db_url(url): - match = _DBURL_REGEX.match(url) - if match: - return '%s****:****%s' % (url[:match.start(1)], url[match.end(2):]) - return url - - -class InvalidSortKey(Exception): - message = _("Sort key supplied was not valid.") - - -# copy from glance/db/sqlalchemy/api.py -def paginate_query(query, model, limit, sort_keys, marker=None, - sort_dir=None, sort_dirs=None): - """Returns a query with sorting / pagination criteria added. - - Pagination works by requiring a unique sort_key, specified by sort_keys. - (If sort_keys is not unique, then we risk looping through values.) - We use the last row in the previous page as the 'marker' for pagination. - So we must return values that follow the passed marker in the order. - With a single-valued sort_key, this would be easy: sort_key > X. - With a compound-values sort_key, (k1, k2, k3) we must do this to repeat - the lexicographical ordering: - (k1 > X1) or (k1 == X1 && k2 > X2) or (k1 == X1 && k2 == X2 && k3 > X3) - - We also have to cope with different sort_directions. - - Typically, the id of the last row is used as the client-facing pagination - marker, then the actual marker object must be fetched from the db and - passed in to us as marker. - - :param query: the query object to which we should add paging/sorting - :param model: the ORM model class - :param limit: maximum number of items to return - :param sort_keys: array of attributes by which results should be sorted - :param marker: the last item of the previous page; we returns the next - results after this value. - :param sort_dir: direction in which results should be sorted (asc, desc) - :param sort_dirs: per-column array of sort_dirs, corresponding to sort_keys - - :rtype: sqlalchemy.orm.query.Query - :return: The query with sorting/pagination added. - """ - - if 'id' not in sort_keys: - # TODO(justinsb): If this ever gives a false-positive, check - # the actual primary key, rather than assuming its id - LOG.warning(_LW('Id not in sort_keys; is sort_keys unique?')) - - assert(not (sort_dir and sort_dirs)) - - # Default the sort direction to ascending - if sort_dirs is None and sort_dir is None: - sort_dir = 'asc' - - # Ensure a per-column sort direction - if sort_dirs is None: - sort_dirs = [sort_dir for _sort_key in sort_keys] - - assert(len(sort_dirs) == len(sort_keys)) - - # Add sorting - for current_sort_key, current_sort_dir in zip(sort_keys, sort_dirs): - try: - sort_dir_func = { - 'asc': sqlalchemy.asc, - 'desc': sqlalchemy.desc, - }[current_sort_dir] - except KeyError: - raise ValueError(_("Unknown sort direction, " - "must be 'desc' or 'asc'")) - try: - sort_key_attr = getattr(model, current_sort_key) - except AttributeError: - raise InvalidSortKey() - query = query.order_by(sort_dir_func(sort_key_attr)) - - # Add pagination - if marker is not None: - marker_values = [] - for sort_key in sort_keys: - v = getattr(marker, sort_key) - marker_values.append(v) - - # Build up an array of sort criteria as in the docstring - criteria_list = [] - for i in range(len(sort_keys)): - crit_attrs = [] - for j in range(i): - model_attr = getattr(model, sort_keys[j]) - crit_attrs.append((model_attr == marker_values[j])) - - model_attr = getattr(model, sort_keys[i]) - if sort_dirs[i] == 'desc': - crit_attrs.append((model_attr < marker_values[i])) - else: - crit_attrs.append((model_attr > marker_values[i])) - - criteria = sqlalchemy.sql.and_(*crit_attrs) - criteria_list.append(criteria) - - f = sqlalchemy.sql.or_(*criteria_list) - query = query.filter(f) - - if limit is not None: - query = query.limit(limit) - - return query - - -def _read_deleted_filter(query, db_model, read_deleted): - if 'deleted' not in db_model.__table__.columns: - raise ValueError(_("There is no `deleted` column in `%s` table. " - "Project doesn't use soft-deleted feature.") - % db_model.__name__) - - default_deleted_value = db_model.__table__.c.deleted.default.arg - if read_deleted == 'no': - query = query.filter(db_model.deleted == default_deleted_value) - elif read_deleted == 'yes': - pass # omit the filter to include deleted and active - elif read_deleted == 'only': - query = query.filter(db_model.deleted != default_deleted_value) - else: - raise ValueError(_("Unrecognized read_deleted value '%s'") - % read_deleted) - return query - - -def _project_filter(query, db_model, context, project_only): - if project_only and 'project_id' not in db_model.__table__.columns: - raise ValueError(_("There is no `project_id` column in `%s` table.") - % db_model.__name__) - - if request_context.is_user_context(context) and project_only: - if project_only == 'allow_none': - is_none = None - query = query.filter(or_(db_model.project_id == context.project_id, - db_model.project_id == is_none)) - else: - query = query.filter(db_model.project_id == context.project_id) - - return query - - -def model_query(context, model, session, args=None, project_only=False, - read_deleted=None): - """Query helper that accounts for context's `read_deleted` field. - - :param context: context to query under - - :param model: Model to query. Must be a subclass of ModelBase. - :type model: models.ModelBase - - :param session: The session to use. - :type session: sqlalchemy.orm.session.Session - - :param args: Arguments to query. If None - model is used. - :type args: tuple - - :param project_only: If present and context is user-type, then restrict - query to match the context's project_id. If set to - 'allow_none', restriction includes project_id = None. - :type project_only: bool - - :param read_deleted: If present, overrides context's read_deleted field. - :type read_deleted: bool - - Usage: - - ..code:: python - - result = (utils.model_query(context, models.Instance, session=session) - .filter_by(uuid=instance_uuid) - .all()) - - query = utils.model_query( - context, Node, - session=session, - args=(func.count(Node.id), func.sum(Node.ram)) - ).filter_by(project_id=project_id) - - """ - - if not read_deleted: - if hasattr(context, 'read_deleted'): - # NOTE(viktors): some projects use `read_deleted` attribute in - # their contexts instead of `show_deleted`. - read_deleted = context.read_deleted - else: - read_deleted = context.show_deleted - - if not issubclass(model, models.ModelBase): - raise TypeError(_("model should be a subclass of ModelBase")) - - query = session.query(model) if not args else session.query(*args) - query = _read_deleted_filter(query, model, read_deleted) - query = _project_filter(query, model, context, project_only) - - return query - - -def get_table(engine, name): - """Returns an sqlalchemy table dynamically from db. - - Needed because the models don't work for us in migrations - as models will be far out of sync with the current data. - """ - metadata = MetaData() - metadata.bind = engine - return Table(name, metadata, autoload=True) - - -class InsertFromSelect(UpdateBase): - """Form the base for `INSERT INTO table (SELECT ... )` statement.""" - def __init__(self, table, select): - self.table = table - self.select = select - - -@compiles(InsertFromSelect) -def visit_insert_from_select(element, compiler, **kw): - """Form the `INSERT INTO table (SELECT ... )` statement.""" - return "INSERT INTO %s %s" % ( - compiler.process(element.table, asfrom=True), - compiler.process(element.select)) - - -class ColumnError(Exception): - """Error raised when no column or an invalid column is found.""" - - -def _get_not_supported_column(col_name_col_instance, column_name): - try: - column = col_name_col_instance[column_name] - except KeyError: - msg = _("Please specify column %s in col_name_col_instance " - "param. It is required because column has unsupported " - "type by sqlite).") - raise ColumnError(msg % column_name) - - if not isinstance(column, Column): - msg = _("col_name_col_instance param has wrong type of " - "column instance for column %s It should be instance " - "of sqlalchemy.Column.") - raise ColumnError(msg % column_name) - return column - - -def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns, - **col_name_col_instance): - """Drop unique constraint from table. - - DEPRECATED: this function is deprecated and will be removed from designate.db - in a few releases. Please use UniqueConstraint.drop() method directly for - sqlalchemy-migrate migration scripts. - - This method drops UC from table and works for mysql, postgresql and sqlite. - In mysql and postgresql we are able to use "alter table" construction. - Sqlalchemy doesn't support some sqlite column types and replaces their - type with NullType in metadata. We process these columns and replace - NullType with the correct column type. - - :param migrate_engine: sqlalchemy engine - :param table_name: name of table that contains uniq constraint. - :param uc_name: name of uniq constraint that will be dropped. - :param columns: columns that are in uniq constraint. - :param col_name_col_instance: contains pair column_name=column_instance. - column_instance is instance of Column. These params - are required only for columns that have unsupported - types by sqlite. For example BigInteger. - """ - - from migrate.changeset import UniqueConstraint - - meta = MetaData() - meta.bind = migrate_engine - t = Table(table_name, meta, autoload=True) - - if migrate_engine.name == "sqlite": - override_cols = [ - _get_not_supported_column(col_name_col_instance, col.name) - for col in t.columns - if isinstance(col.type, NullType) - ] - for col in override_cols: - t.columns.replace(col) - - uc = UniqueConstraint(*columns, table=t, name=uc_name) - uc.drop() - - -def drop_old_duplicate_entries_from_table(migrate_engine, table_name, - use_soft_delete, *uc_column_names): - """Drop all old rows having the same values for columns in uc_columns. - - This method drop (or mark ad `deleted` if use_soft_delete is True) old - duplicate rows form table with name `table_name`. - - :param migrate_engine: Sqlalchemy engine - :param table_name: Table with duplicates - :param use_soft_delete: If True - values will be marked as `deleted`, - if False - values will be removed from table - :param uc_column_names: Unique constraint columns - """ - meta = MetaData() - meta.bind = migrate_engine - - table = Table(table_name, meta, autoload=True) - columns_for_group_by = [table.c[name] for name in uc_column_names] - - columns_for_select = [func.max(table.c.id)] - columns_for_select.extend(columns_for_group_by) - - duplicated_rows_select = sqlalchemy.sql.select( - columns_for_select, group_by=columns_for_group_by, - having=func.count(table.c.id) > 1) - - for row in migrate_engine.execute(duplicated_rows_select): - # NOTE(boris-42): Do not remove row that has the biggest ID. - delete_condition = table.c.id != row[0] - is_none = None # workaround for pyflakes - delete_condition &= table.c.deleted_at == is_none - for name in uc_column_names: - delete_condition &= table.c[name] == row[name] - - rows_to_delete_select = sqlalchemy.sql.select( - [table.c.id]).where(delete_condition) - for row in migrate_engine.execute(rows_to_delete_select).fetchall(): - LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: " - "%(table)s") % dict(id=row[0], table=table_name)) - - if use_soft_delete: - delete_statement = table.update().\ - where(delete_condition).\ - values({ - 'deleted': literal_column('id'), - 'updated_at': literal_column('updated_at'), - 'deleted_at': timeutils.utcnow() - }) - else: - delete_statement = table.delete().where(delete_condition) - migrate_engine.execute(delete_statement) - - -def _get_default_deleted_value(table): - if isinstance(table.c.id.type, Integer): - return 0 - if isinstance(table.c.id.type, String): - return "" - raise ColumnError(_("Unsupported id columns type")) - - -def _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes): - table = get_table(migrate_engine, table_name) - - insp = reflection.Inspector.from_engine(migrate_engine) - real_indexes = insp.get_indexes(table_name) - existing_index_names = dict( - [(index['name'], index['column_names']) for index in real_indexes]) - - # NOTE(boris-42): Restore indexes on `deleted` column - for index in indexes: - if 'deleted' not in index['column_names']: - continue - name = index['name'] - if name in existing_index_names: - column_names = [table.c[c] for c in existing_index_names[name]] - old_index = Index(name, *column_names, unique=index["unique"]) - old_index.drop(migrate_engine) - - column_names = [table.c[c] for c in index['column_names']] - new_index = Index(index["name"], *column_names, unique=index["unique"]) - new_index.create(migrate_engine) - - -def change_deleted_column_type_to_boolean(migrate_engine, table_name, - **col_name_col_instance): - if migrate_engine.name == "sqlite": - return _change_deleted_column_type_to_boolean_sqlite( - migrate_engine, table_name, **col_name_col_instance) - insp = reflection.Inspector.from_engine(migrate_engine) - indexes = insp.get_indexes(table_name) - - table = get_table(migrate_engine, table_name) - - old_deleted = Column('old_deleted', Boolean, default=False) - old_deleted.create(table, populate_default=False) - - table.update().\ - where(table.c.deleted == table.c.id).\ - values(old_deleted=True).\ - execute() - - table.c.deleted.drop() - table.c.old_deleted.alter(name="deleted") - - _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) - - -def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name, - **col_name_col_instance): - insp = reflection.Inspector.from_engine(migrate_engine) - table = get_table(migrate_engine, table_name) - - columns = [] - for column in table.columns: - column_copy = None - if column.name != "deleted": - if isinstance(column.type, NullType): - column_copy = _get_not_supported_column(col_name_col_instance, - column.name) - else: - column_copy = column.copy() - else: - column_copy = Column('deleted', Boolean, default=0) - columns.append(column_copy) - - constraints = [constraint.copy() for constraint in table.constraints] - - meta = table.metadata - new_table = Table(table_name + "__tmp__", meta, - *(columns + constraints)) - new_table.create() - - indexes = [] - for index in insp.get_indexes(table_name): - column_names = [new_table.c[c] for c in index['column_names']] - indexes.append(Index(index["name"], *column_names, - unique=index["unique"])) - - c_select = [] - for c in table.c: - if c.name != "deleted": - c_select.append(c) - else: - c_select.append(table.c.deleted == table.c.id) - - ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select)) - migrate_engine.execute(ins) - - table.drop() - [index.create(migrate_engine) for index in indexes] - - new_table.rename(table_name) - new_table.update().\ - where(new_table.c.deleted == new_table.c.id).\ - values(deleted=True).\ - execute() - - -def change_deleted_column_type_to_id_type(migrate_engine, table_name, - **col_name_col_instance): - if migrate_engine.name == "sqlite": - return _change_deleted_column_type_to_id_type_sqlite( - migrate_engine, table_name, **col_name_col_instance) - insp = reflection.Inspector.from_engine(migrate_engine) - indexes = insp.get_indexes(table_name) - - table = get_table(migrate_engine, table_name) - - new_deleted = Column('new_deleted', table.c.id.type, - default=_get_default_deleted_value(table)) - new_deleted.create(table, populate_default=True) - - deleted = True # workaround for pyflakes - table.update().\ - where(table.c.deleted == deleted).\ - values(new_deleted=table.c.id).\ - execute() - table.c.deleted.drop() - table.c.new_deleted.alter(name="deleted") - - _restore_indexes_on_deleted_columns(migrate_engine, table_name, indexes) - - -def _change_deleted_column_type_to_id_type_sqlite(migrate_engine, table_name, - **col_name_col_instance): - # NOTE(boris-42): sqlaclhemy-migrate can't drop column with check - # constraints in sqlite DB and our `deleted` column has - # 2 check constraints. So there is only one way to remove - # these constraints: - # 1) Create new table with the same columns, constraints - # and indexes. (except deleted column). - # 2) Copy all data from old to new table. - # 3) Drop old table. - # 4) Rename new table to old table name. - insp = reflection.Inspector.from_engine(migrate_engine) - meta = MetaData(bind=migrate_engine) - table = Table(table_name, meta, autoload=True) - default_deleted_value = _get_default_deleted_value(table) - - columns = [] - for column in table.columns: - column_copy = None - if column.name != "deleted": - if isinstance(column.type, NullType): - column_copy = _get_not_supported_column(col_name_col_instance, - column.name) - else: - column_copy = column.copy() - else: - column_copy = Column('deleted', table.c.id.type, - default=default_deleted_value) - columns.append(column_copy) - - def is_deleted_column_constraint(constraint): - # NOTE(boris-42): There is no other way to check is CheckConstraint - # associated with deleted column. - if not isinstance(constraint, CheckConstraint): - return False - sqltext = str(constraint.sqltext) - return (sqltext.endswith("deleted in (0, 1)") or - sqltext.endswith("deleted IN (:deleted_1, :deleted_2)")) - - constraints = [] - for constraint in table.constraints: - if not is_deleted_column_constraint(constraint): - constraints.append(constraint.copy()) - - new_table = Table(table_name + "__tmp__", meta, - *(columns + constraints)) - new_table.create() - - indexes = [] - for index in insp.get_indexes(table_name): - column_names = [new_table.c[c] for c in index['column_names']] - indexes.append(Index(index["name"], *column_names, - unique=index["unique"])) - - ins = InsertFromSelect(new_table, table.select()) - migrate_engine.execute(ins) - - table.drop() - [index.create(migrate_engine) for index in indexes] - - new_table.rename(table_name) - deleted = True # workaround for pyflakes - new_table.update().\ - where(new_table.c.deleted == deleted).\ - values(deleted=new_table.c.id).\ - execute() - - # NOTE(boris-42): Fix value of deleted column: False -> "" or 0. - deleted = False # workaround for pyflakes - new_table.update().\ - where(new_table.c.deleted == deleted).\ - values(deleted=default_deleted_value).\ - execute() - - -def get_connect_string(backend, database, user=None, passwd=None): - """Get database connection - - Try to get a connection with a very specific set of values, if we get - these then we'll run the tests, otherwise they are skipped - """ - args = {'backend': backend, - 'user': user, - 'passwd': passwd, - 'database': database} - if backend == 'sqlite': - template = '%(backend)s:///%(database)s' - else: - template = "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" - return template % args - - -def is_backend_avail(backend, database, user=None, passwd=None): - try: - connect_uri = get_connect_string(backend=backend, - database=database, - user=user, - passwd=passwd) - engine = sqlalchemy.create_engine(connect_uri) - connection = engine.connect() - except Exception: - # intentionally catch all to handle exceptions even if we don't - # have any backend code loaded. - return False - else: - connection.close() - engine.dispose() - return True - - -def get_db_connection_info(conn_pieces): - database = conn_pieces.path.strip('/') - loc_pieces = conn_pieces.netloc.split('@') - host = loc_pieces[1] - - auth_pieces = loc_pieces[0].split(':') - user = auth_pieces[0] - password = "" - if len(auth_pieces) > 1: - password = auth_pieces[1].strip() - - return (user, password, database, host) diff --git a/designate/sqlalchemy/models.py b/designate/sqlalchemy/models.py index 55dd3a7cf..fdb8258db 100644 --- a/designate/sqlalchemy/models.py +++ b/designate/sqlalchemy/models.py @@ -13,87 +13,40 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from oslo.db.sqlalchemy import models +from oslo.db import exception as oslo_db_exc from sqlalchemy import Column, DateTime from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import object_mapper from sqlalchemy.types import CHAR from designate.openstack.common import timeutils from designate import exceptions -class Base(object): - __abstract__ = True - __table_initialized__ = False - +class Base(models.ModelBase): + # TODO(ekarlso): Remove me when o.db patch lands for this. def save(self, session): """ Save this object """ session.add(self) try: session.flush() - except IntegrityError as e: - non_unique_strings = ( - 'duplicate entry', - 'not unique', - 'unique constraint failed' - ) - - for non_unique_string in non_unique_strings: - if non_unique_string in str(e).lower(): - raise exceptions.Duplicate(str(e)) - - # Not a Duplicate error.. Re-raise. + except oslo_db_exc.DBDuplicateEntry as e: + raise exceptions.Duplicate(str(e)) + except IntegrityError: raise def delete(self, session): - """ Delete this object """ session.delete(self) session.flush() - def __setitem__(self, key, value): - setattr(self, key, value) - - def __getitem__(self, key): - return getattr(self, key) - - def __iter__(self): - columns = dict(object_mapper(self).columns).keys() - # NOTE(russellb): Allow models to specify other keys that can be looked - # up, beyond the actual db columns. An example would be the 'name' - # property for an Instance. - if hasattr(self, '_extra_keys'): - columns.extend(self._extra_keys()) - self._i = iter(columns) - return self - - def next(self): - n = self._i.next() - return n, getattr(self, n) - - def update(self, values): - """ Make the model object behave like a dict """ - for k, v in values.iteritems(): - setattr(self, k, v) - - def iteritems(self): - """ - Make the model object behave like a dict. - - Includes attributes from joins. - """ - local = dict(self) - joined = dict([(k, v) for k, v in self.__dict__.iteritems() - if not k[0] == '_']) - local.update(joined) - return local.iteritems() - +# TODO(ekarlso): Get this into o.db? class SoftDeleteMixin(object): deleted = Column(CHAR(32), nullable=False, default="0", server_default="0") deleted_at = Column(DateTime, nullable=True, default=None) - def soft_delete(self, session=None): + def soft_delete(self, session): """ Mark this object as deleted. """ self.deleted = self.id.replace('-', '') self.deleted_at = timeutils.utcnow() diff --git a/designate/sqlalchemy/session.py b/designate/sqlalchemy/session.py index 1b1f39a3a..b816fc4f5 100644 --- a/designate/sqlalchemy/session.py +++ b/designate/sqlalchemy/session.py @@ -16,229 +16,34 @@ """Session Handling for SQLAlchemy backend.""" -import re -import time - -import sqlalchemy -from sqlalchemy.exc import DisconnectionError, OperationalError -import sqlalchemy.orm -from sqlalchemy.pool import NullPool, StaticPool from oslo.config import cfg +from oslo.db.sqlalchemy import session from designate.openstack.common import log as logging -from designate.openstack.common.gettextutils import _LW LOG = logging.getLogger(__name__) -_MAKERS = {} -_ENGINES = {} +CONF = cfg.CONF -SQLOPTS = [ - cfg.StrOpt('database_connection', - default='sqlite:///$state_path/designate.sqlite', - secret=True, - help='The database driver to use'), - cfg.IntOpt('connection_debug', default=0, - help='Verbosity of SQL debugging information. 0=None,' - ' 100=Everything'), - cfg.BoolOpt('connection_trace', default=False, - help='Add python stack traces to SQL as comment strings'), - cfg.BoolOpt('sqlite_synchronous', default=True, - help='If passed, use synchronous mode for sqlite'), - cfg.IntOpt('idle_timeout', default=3600, - help='timeout before idle sql connections are reaped'), - cfg.IntOpt('max_retries', default=10, - help='maximum db connection retries during startup. ' - '(setting -1 implies an infinite retry count)'), - cfg.IntOpt('retry_interval', default=10, - help='interval between retries of opening a sql connection') -] +_FACADES = {} -def get_session(config_group, - autocommit=True, - expire_on_commit=False, - autoflush=True): - """Return a SQLAlchemy session.""" - global _MAKERS +def _create_facade_lazily(name): + if name not in _FACADES: + _FACADES[name] = session.EngineFacade( + cfg.CONF[name].connection, + **dict(cfg.CONF[name].iteritems())) - if config_group not in _MAKERS: - engine = get_engine(config_group) - _MAKERS[config_group] = get_maker(engine, - autocommit, - expire_on_commit, - autoflush) - - session = _MAKERS[config_group]() - return session + return _FACADES[name] -def synchronous_switch_listener(dbapi_conn, connection_rec): - """Switch sqlite connections to non-synchronous mode""" - dbapi_conn.execute("PRAGMA synchronous = OFF") +def get_engine(name): + facade = _create_facade_lazily(name) + return facade.get_engine() -def add_regexp_listener(dbapi_con, con_record): - """Add REGEXP function to sqlite connections.""" - - def regexp(expr, item): - reg = re.compile(expr) - return reg.search(unicode(item)) is not None - dbapi_con.create_function('regexp', 2, regexp) - - -def ping_listener(dbapi_conn, connection_rec, connection_proxy): - """ - Ensures that MySQL connections checked out of the - pool are alive. - - Borrowed from: - http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f - """ - try: - dbapi_conn.cursor().execute('select 1') - except dbapi_conn.OperationalError as ex: - if ex.args[0] in (2006, 2013, 2014, 2045, 2055): - LOG.warn(_LW('Got mysql server has gone away: %s'), ex) - raise DisconnectionError("Database server went away") - else: - raise - - -def is_db_connection_error(args): - """Return True if error in connecting to db.""" - # NOTE(adam_g): This is currently MySQL specific and needs to be extended - # to support Postgres and others. - conn_err_codes = ('2002', '2003', '2006') - for err_code in conn_err_codes: - if args.find(err_code) != -1: - return True - return False - - -def get_engine(config_group): - """Return a SQLAlchemy engine.""" - global _ENGINES - - database_connection = cfg.CONF[config_group].database_connection - - if config_group not in _ENGINES: - connection_dict = sqlalchemy.engine.url.make_url( - database_connection) - - engine_args = { - "pool_recycle": cfg.CONF[config_group].idle_timeout, - "echo": False, - 'convert_unicode': True, - } - - # Map our SQL debug level to SQLAlchemy's options - if cfg.CONF[config_group].connection_debug >= 100: - engine_args['echo'] = 'debug' - elif cfg.CONF[config_group].connection_debug >= 50: - engine_args['echo'] = True - - if "sqlite" in connection_dict.drivername: - engine_args["poolclass"] = NullPool - - if database_connection == "sqlite://": - engine_args["poolclass"] = StaticPool - engine_args["connect_args"] = {'check_same_thread': False} - - _ENGINES[config_group] = sqlalchemy.create_engine(database_connection, - **engine_args) - - if 'mysql' in connection_dict.drivername: - sqlalchemy.event.listen(_ENGINES[config_group], - 'checkout', - ping_listener) - elif "sqlite" in connection_dict.drivername: - if not cfg.CONF[config_group].sqlite_synchronous: - sqlalchemy.event.listen(_ENGINES[config_group], - 'connect', - synchronous_switch_listener) - sqlalchemy.event.listen(_ENGINES[config_group], - 'connect', - add_regexp_listener) - - if (cfg.CONF[config_group].connection_trace and - _ENGINES[config_group].dialect.dbapi.__name__ == 'MySQLdb'): - import MySQLdb.cursors - _do_query = debug_mysql_do_query() - setattr(MySQLdb.cursors.BaseCursor, '_do_query', _do_query) - - try: - _ENGINES[config_group].connect() - except OperationalError as e: - if not is_db_connection_error(e.args[0]): - raise - - remaining = cfg.CONF[config_group].max_retries - if remaining == -1: - remaining = 'infinite' - while True: - LOG.warn(_LW('SQL connection failed. %s attempts left.') % - remaining) - if remaining != 'infinite': - remaining -= 1 - time.sleep(cfg.CONF[config_group].retry_interval) - try: - _ENGINES[config_group].connect() - break - except OperationalError as e: - if (remaining != 'infinite' and remaining == 0) or \ - not is_db_connection_error(e.args[0]): - raise - return _ENGINES[config_group] - - -def get_maker(engine, autocommit=True, expire_on_commit=False, autoflush=True): - """Return a SQLAlchemy sessionmaker using the given engine.""" - return sqlalchemy.orm.sessionmaker(bind=engine, - autocommit=autocommit, - autoflush=autoflush, - expire_on_commit=expire_on_commit) - - -def debug_mysql_do_query(): - """Return a debug version of MySQLdb.cursors._do_query""" - import traceback - - import MySQLdb.cursors - - old_mysql_do_query = MySQLdb.cursors.BaseCursor._do_query - - def _do_query(self, q): - stack = '' - for file, line, method, function in traceback.extract_stack(): - # exclude various common things from trace - if file.endswith('session.py') and method == '_do_query': - continue - if file.endswith('api.py') and method == 'wrapper': - continue - if file.endswith('utils.py') and method == '_inner': - continue - if file.endswith('exception.py') and method == '_wrap': - continue - # nova/db/api is just a wrapper around nova/db/sqlalchemy/api - if file.endswith('nova/db/api.py'): - continue - # only trace inside nova - index = file.rfind('nova') - if index == -1: - continue - stack += "File:%s:%s Method:%s() Line:%s | " \ - % (file[index:], line, method, function) - - # strip trailing " | " from stack - if stack: - stack = stack[:-3] - qq = "%s /* %s */" % (q, stack) - else: - qq = q - old_mysql_do_query(self, qq) - - # return the new _do_query method - return _do_query +def get_session(name, **kwargs): + facade = _create_facade_lazily(name) + return facade.get_session(**kwargs) diff --git a/designate/storage/impl_sqlalchemy/__init__.py b/designate/storage/impl_sqlalchemy/__init__.py index 21199e4ce..f51b284fc 100644 --- a/designate/storage/impl_sqlalchemy/__init__.py +++ b/designate/storage/impl_sqlalchemy/__init__.py @@ -15,22 +15,20 @@ # under the License. import time +from oslo.config import cfg +from oslo.db.sqlalchemy import utils as oslo_utils +from oslo.db import options from sqlalchemy.orm import exc from sqlalchemy import exc as sqlalchemy_exc from sqlalchemy import distinct, func -from oslo.config import cfg from designate.openstack.common import log as logging -from designate.openstack.common.db.sqlalchemy.utils import paginate_query -from designate.openstack.common.db.sqlalchemy.utils import InvalidSortKey from designate import exceptions from designate import objects +from designate.sqlalchemy import session from designate.storage import base from designate.storage.impl_sqlalchemy import models from designate.sqlalchemy.models import SoftDeleteMixin -from designate.sqlalchemy.session import get_session -from designate.sqlalchemy.session import get_engine -from designate.sqlalchemy.session import SQLOPTS LOG = logging.getLogger(__name__) @@ -39,7 +37,7 @@ cfg.CONF.register_group(cfg.OptGroup( name='storage:sqlalchemy', title="Configuration for SQLAlchemy Storage" )) -cfg.CONF.register_opts(SQLOPTS, group='storage:sqlalchemy') +cfg.CONF.register_opts(options.database_opts, group='storage:sqlalchemy') class SQLAlchemyStorage(base.Storage): @@ -49,8 +47,8 @@ class SQLAlchemyStorage(base.Storage): def __init__(self): super(SQLAlchemyStorage, self).__init__() - self.engine = get_engine(self.name) - self.session = get_session(self.name) + self.engine = session.get_engine(self.name) + self.session = session.get_session(self.name) def begin(self): self.session.begin(subtransactions=True) @@ -136,13 +134,13 @@ class SQLAlchemyStorage(base.Storage): sort_dir = sort_dir or 'asc' try: - query = paginate_query( + query = oslo_utils.paginate_query( query, model, limit, [sort_key, 'id', 'created_at'], marker=marker, sort_dir=sort_dir) return query.all() - except InvalidSortKey as sort_key_error: + except oslo_utils.InvalidSortKey as sort_key_error: raise exceptions.InvalidSortKey(sort_key_error.message) # Any ValueErrors are propagated back to the user as is. # Limits, sort_dir and sort_key are checked at the API layer. diff --git a/designate/storage/impl_sqlalchemy/models.py b/designate/storage/impl_sqlalchemy/models.py index f347c0ddc..2fcef21bf 100644 --- a/designate/storage/impl_sqlalchemy/models.py +++ b/designate/storage/impl_sqlalchemy/models.py @@ -18,7 +18,8 @@ import hashlib from oslo.config import cfg -from sqlalchemy import (Column, DateTime, String, Text, Integer, ForeignKey, +from oslo.db.sqlalchemy import models as oslo_models +from sqlalchemy import (Column, String, Text, Integer, ForeignKey, Enum, Boolean, Unicode, UniqueConstraint, event) from sqlalchemy.orm import relationship, backref from sqlalchemy.ext.declarative import declarative_base @@ -26,8 +27,7 @@ from sqlalchemy.ext.declarative import declarative_base from designate.openstack.common import log as logging from designate.openstack.common import timeutils from designate.sqlalchemy.types import UUID -from designate.sqlalchemy.models import Base as CommonBase -from designate.sqlalchemy.models import SoftDeleteMixin +from designate.sqlalchemy import models from designate import utils @@ -41,11 +41,9 @@ TSIG_ALGORITHMS = ['hmac-md5', 'hmac-sha1', 'hmac-sha224', 'hmac-sha256', 'hmac-sha384', 'hmac-sha512'] -class Base(CommonBase): +class Base(models.Base, oslo_models.TimestampMixin): id = Column(UUID, default=utils.generate_uuid, primary_key=True) version = Column(Integer, default=1, nullable=False) - created_at = Column(DateTime, default=timeutils.utcnow) - updated_at = Column(DateTime, onupdate=timeutils.utcnow) __mapper_args__ = { 'version_id_col': version @@ -82,7 +80,7 @@ class Tld(Base): description = Column(Unicode(160), nullable=True) -class Domain(SoftDeleteMixin, Base): +class Domain(models.SoftDeleteMixin, Base): __tablename__ = 'domains' __table_args__ = ( UniqueConstraint('name', 'deleted', name='unique_domain_name'), diff --git a/designate/tests/__init__.py b/designate/tests/__init__.py index a41a4f819..7f8f8ce2c 100644 --- a/designate/tests/__init__.py +++ b/designate/tests/__init__.py @@ -55,7 +55,7 @@ cfg.CONF.import_opt('backend_driver', 'designate.agent', group='service:agent') cfg.CONF.import_opt('auth_strategy', 'designate.api', group='service:api') -cfg.CONF.import_opt('database_connection', 'designate.storage.impl_sqlalchemy', +cfg.CONF.import_opt('connection', 'designate.storage.impl_sqlalchemy', group='storage:sqlalchemy') @@ -274,7 +274,7 @@ class TestCase(base.BaseTestCase): self.db_fixture = self.useFixture( DatabaseFixture.get_fixture(REPOSITORY)) self.config( - database_connection=self.db_fixture.url, + connection=self.db_fixture.url, group='storage:sqlalchemy' ) diff --git a/designate/tests/test_backend/test_powerdns.py b/designate/tests/test_backend/test_powerdns.py index 0407a76fd..ee670b7e6 100644 --- a/designate/tests/test_backend/test_powerdns.py +++ b/designate/tests/test_backend/test_powerdns.py @@ -69,7 +69,7 @@ class PowerDNSBackendTestCase(tests.TestCase, BackendTestMixin): self.db_fixture = DatabaseFixture.get_fixture(REPOSITORY) self.useFixture(self.db_fixture) self.config(backend_driver='powerdns', group='service:agent') - self.config(database_connection=self.db_fixture.url, + self.config(connection=self.db_fixture.url, group='backend:powerdns') self.backend = self.get_backend_driver() self.backend.start() diff --git a/doc/source/backends/powerdns.rst b/doc/source/backends/powerdns.rst index 9723545f2..093098dd4 100644 --- a/doc/source/backends/powerdns.rst +++ b/doc/source/backends/powerdns.rst @@ -26,7 +26,7 @@ Parameter Default Note =============================== ====================================== ============================================================== domain_type NATIVE PowerDNS Domain Type also_notify [] List of additional IPs to send NOTIFYs to. -database_connection sqlite:///$pystatepath/powerdns.sqlite Database connection string +connection sqlite:///$pystatepath/powerdns.sqlite Database connection string connection_debug 0 Verbosity of SQL debugging information. 0=None, 100=Everything connection_trace False Add python stack traces to SQL as comment strings idle_timeout 3600 timeout before idle sql connections are reaped @@ -60,10 +60,10 @@ You need to configure PowerDNS to use the MySQL backend. .. note:: PowerDNS can connect via socket or host/port. -3. Configure the options for designate-central - specifaclly "database_connection" to point to your MySQL database:: +3. Configure the options for designate-central - specifaclly "connection" to point to your MySQL database:: [backend:powerdns] - database_connection = mysql://:@:/ + connection = mysql://:@:/ 4. Setup the database schema. diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index b5d602fad..d614c0b73 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -40,7 +40,7 @@ os-password admin Password os-tenant-id Tenant ID to use for openstack service access os-tenant-name admin Tenant name to use for openstack service access os-auth-url http://localhost:5000/v2.0 Auth URL to use for openstack service access -database_connection sqlite:///$pystatepath/designate.db Database connection string +connection sqlite:///$pystatepath/designate.db Database connection string =============================== ==================================== ============================================================== Storage - SQL Alchemy diff --git a/doc/source/examples/basic-config-sample.conf b/doc/source/examples/basic-config-sample.conf index 19f5d8d3c..06ec9505e 100644 --- a/doc/source/examples/basic-config-sample.conf +++ b/doc/source/examples/basic-config-sample.conf @@ -63,7 +63,7 @@ enabled_extensions_v1 = diagnostics, quotas, reports, sync [storage:sqlalchemy] # Database connection string - to configure options for a given implementation # like sqlalchemy or other see below -database_connection = sqlite:///$state_path/designate.sqlite +connection = sqlite:///$state_path/designate.sqlite connection_debug = 100 connection_trace = True sqlite_synchronous = True @@ -78,7 +78,7 @@ retry_interval = 10 # PowerDNS Backend #----------------------- [backend:powerdns] -database_connection = sqlite:///$state_path/pdns.sqlite +connection = sqlite:///$state_path/pdns.sqlite connection_debug = 100 connection_trace = True sqlite_synchronous = True diff --git a/doc/source/examples/config-mysql-pdns.conf b/doc/source/examples/config-mysql-pdns.conf index 4cf99dce0..31415b743 100644 --- a/doc/source/examples/config-mysql-pdns.conf +++ b/doc/source/examples/config-mysql-pdns.conf @@ -74,7 +74,7 @@ admin_password = designate [storage:sqlalchemy] # Database connection string - to configure options for a given implementation # like sqlalchemy or other see below -database_connection = mysql://designate:designate@localhost/designate +connection = mysql://designate:designate@localhost/designate #connection_debug = 100 #connection_trace = True #sqlite_synchronous = True @@ -89,7 +89,7 @@ retry_interval = 10 # PowerDNS Backend #----------------------- [backend:powerdns] -database_connection = mysql://powerdns:powerdns@localhost/powerdns +connection = mysql://powerdns:powerdns@localhost/powerdns #connection_debug = 100 #connection_trace = True #sqlite_synchronous = True diff --git a/doc/source/getting-started.rst b/doc/source/getting-started.rst index d0e21ea38..45b2ae2b4 100644 --- a/doc/source/getting-started.rst +++ b/doc/source/getting-started.rst @@ -133,11 +133,9 @@ Initialize & Start the Central Service :: - #Initialize and sync the Designate database: - $ designate-manage database init + #Sync the Designate database: $ designate-manage database sync - #Initialize and sync the PowerDNS database: - $ designate-manage powerdns init + #Sync the PowerDNS database: $ designate-manage powerdns sync #Restart PowerDNS $ service pdns restart diff --git a/doc/source/install/ubuntu.rst b/doc/source/install/ubuntu.rst index 8ef30b039..73f30c98b 100644 --- a/doc/source/install/ubuntu.rst +++ b/doc/source/install/ubuntu.rst @@ -186,11 +186,9 @@ Sync Database schemas Initialize and sync the database schemas for Designate and PowerDNS:: - $ designate-manage database-init - $ designate-manage database-sync + $ designate-manage database sync - $ designate-manage powerdns database-init - $ designate-manage powerdns database-sync + $ designate-manage powerdns database sync Register Designate with Keystone ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/etc/designate/designate.conf.sample b/etc/designate/designate.conf.sample index 9a5d6776b..6be6594c8 100644 --- a/etc/designate/designate.conf.sample +++ b/etc/designate/designate.conf.sample @@ -149,7 +149,7 @@ debug = False [storage:sqlalchemy] # Database connection string - to configure options for a given implementation # like sqlalchemy or other see below -#database_connection = sqlite:///$state_path/designate.sqlite +#connection = sqlite:///$state_path/designate.sqlite #connection_debug = 100 #connection_trace = False #sqlite_synchronous = True @@ -195,7 +195,7 @@ debug = False # Bind9+MySQL Backend #----------------------- [backend:mysqlbind9] -#database_connection = mysql://user:password@host/schema +#connection = mysql://user:password@host/schema #rndc_host = 127.0.0.1 #rndc_port = 953 #rndc_config_file = /etc/rndc.conf @@ -207,7 +207,7 @@ debug = False # PowerDNS Backend #----------------------- [backend:powerdns] -#database_connection = mysql://user:password@host/pdns +#connection = mysql://user:password@host/pdns #connection_debug = 100 #connection_trace = False #sqlite_synchronous = True diff --git a/openstack-common.conf b/openstack-common.conf index 6126de692..cfdb98e61 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -2,8 +2,6 @@ # The list of modules to copy from oslo-incubator.git module=context -module=db -module=db.sqlalchemy module=excutils module=fixture module=gettextutils diff --git a/requirements.txt b/requirements.txt index c7f076274..19b4979e0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,3 +25,4 @@ WebOb>=1.2.3 dnspython>=1.9.4 posix_ipc oslotest +oslo.db>=0.2.0 # Apache-2.0