diff --git a/oslo_db/exception.py b/oslo_db/exception.py index e66fe98c..c8da9966 100644 --- a/oslo_db/exception.py +++ b/oslo_db/exception.py @@ -243,6 +243,10 @@ class DBDataError(DBError): """ +class DBNotSupportedError(DBError): + """Raised when a database backend has raised sqla.exc.NotSupportedError""" + + class InvalidSortKey(Exception): """A sort key destined for database query usage is invalid.""" diff --git a/oslo_db/options.py b/oslo_db/options.py index 1ce23815..824661fe 100644 --- a/oslo_db/options.py +++ b/oslo_db/options.py @@ -44,6 +44,10 @@ database_opts = [ 'server-set SQL mode. To use whatever SQL mode ' 'is set by the server configuration, ' 'set this to no value. Example: mysql_sql_mode='), + cfg.BoolOpt('mysql_enable_ndb', + default=False, + help='If True, transparently enables support for handling ' + 'MySQL Cluster (NDB).'), cfg.IntOpt('idle_timeout', default=3600, deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout', diff --git a/oslo_db/sqlalchemy/enginefacade.py b/oslo_db/sqlalchemy/enginefacade.py index 88bca6b0..5cc5bf2c 100644 --- a/oslo_db/sqlalchemy/enginefacade.py +++ b/oslo_db/sqlalchemy/enginefacade.py @@ -134,6 +134,7 @@ class _TransactionFactory(object): self._engine_cfg = { 'sqlite_fk': _Default(False), 'mysql_sql_mode': _Default('TRADITIONAL'), + 'mysql_enable_ndb': _Default(False), 'idle_timeout': _Default(3600), 'connection_debug': _Default(0), 'max_pool_size': _Default(), @@ -205,6 +206,8 @@ class _TransactionFactory(object): :param mysql_sql_mode: MySQL SQL mode, defaults to TRADITIONAL + :param mysql_enable_ndb: enable MySQL Cluster (NDB) support + :param idle_timeout: connection pool recycle time, defaults to 3600. Note the connection does not actually have to be "idle" to be recycled. @@ -1176,6 +1179,9 @@ class LegacyEngineFacade(object): :keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions. (defaults to TRADITIONAL) + :keyword mysql_enable_ndb: If True, transparently enables support for + handling MySQL Cluster (NDB). + (defaults to False) :keyword idle_timeout: timeout before idle sql connections are reaped (defaults to 3600) :keyword connection_debug: verbosity of SQL debugging information. diff --git a/oslo_db/sqlalchemy/engines.py b/oslo_db/sqlalchemy/engines.py index 601be765..e2ad028b 100644 --- a/oslo_db/sqlalchemy/engines.py +++ b/oslo_db/sqlalchemy/engines.py @@ -32,6 +32,7 @@ from sqlalchemy.sql.expression import select from oslo_db import exception from oslo_db.sqlalchemy import exc_filters +from oslo_db.sqlalchemy import ndb from oslo_db.sqlalchemy import utils LOG = logging.getLogger(__name__) @@ -101,6 +102,7 @@ def _setup_logging(connection_debug=0): def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, + mysql_enable_ndb=False, idle_timeout=3600, connection_debug=0, max_pool_size=None, max_overflow=None, pool_timeout=None, sqlite_synchronous=True, @@ -132,6 +134,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None, engine = sqlalchemy.create_engine(url, **engine_args) + if mysql_enable_ndb: + ndb.enable_ndb_support(engine) + _init_events( engine, mysql_sql_mode=mysql_sql_mode, @@ -265,6 +270,9 @@ def _init_events(engine, mysql_sql_mode=None, **kw): "consider enabling TRADITIONAL or STRICT_ALL_TABLES", realmode) + if ndb.ndb_status(engine): + ndb.init_ndb_events(engine) + @_init_events.dispatch_for("sqlite") def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw): diff --git a/oslo_db/sqlalchemy/exc_filters.py b/oslo_db/sqlalchemy/exc_filters.py index 5743e89e..2f575d84 100644 --- a/oslo_db/sqlalchemy/exc_filters.py +++ b/oslo_db/sqlalchemy/exc_filters.py @@ -395,6 +395,11 @@ def _is_db_connection_error(operational_error, match, engine_name, raise exception.DBConnectionError(operational_error) +@filters("*", sqla_exc.NotSupportedError, r".*") +def _raise_for_NotSupportedError(error, match, engine_name, is_disconnect): + raise exception.DBNotSupportedError(error) + + @filters("*", sqla_exc.DBAPIError, r".*") def _raise_for_remaining_DBAPIError(error, match, engine_name, is_disconnect): """Filter for remaining DBAPIErrors. diff --git a/oslo_db/sqlalchemy/ndb.py b/oslo_db/sqlalchemy/ndb.py new file mode 100644 index 00000000..c7a3de90 --- /dev/null +++ b/oslo_db/sqlalchemy/ndb.py @@ -0,0 +1,137 @@ +# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Core functions for MySQL Cluster (NDB) Support.""" + +import re + +from sqlalchemy import String, event, schema +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.types import VARCHAR + +engine_regex = re.compile("engine=innodb", re.IGNORECASE) +trans_regex = re.compile("savepoint|rollback|release savepoint", re.IGNORECASE) + + +def enable_ndb_support(engine): + """Enable NDB Support. + + Function to flag the MySQL engine dialect to support features specific + to MySQL Cluster (NDB). + """ + engine.dialect._oslodb_enable_ndb_support = True + + +def ndb_status(engine_or_compiler): + """Test if NDB Support is enabled. + + Function to test if NDB support is enabled or not. + """ + return getattr(engine_or_compiler.dialect, + '_oslodb_enable_ndb_support', + False) + + +def init_ndb_events(engine): + """Initialize NDB Events. + + Function starts NDB specific events. + """ + @event.listens_for(engine, "before_cursor_execute", retval=True) + def before_cursor_execute(conn, cursor, statement, parameters, context, + executemany): + """Listen for specific SQL strings and replace automatically. + + Function will intercept any raw execute calls and automatically + convert InnoDB to NDBCLUSTER, drop SAVEPOINT requests, drop + ROLLBACK requests, and drop RELEASE SAVEPOINT requests. + """ + if ndb_status(engine): + statement = engine_regex.sub("ENGINE=NDBCLUSTER", statement) + if re.match(trans_regex, statement): + statement = "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;" + + return statement, parameters + + +@compiles(schema.CreateTable, "mysql") +def prefix_inserts(create_table, compiler, **kw): + """Replace InnoDB with NDBCLUSTER automatically. + + Function will intercept CreateTable() calls and automatically + convert InnoDB to NDBCLUSTER. Targets compiler events. + """ + existing = compiler.visit_create_table(create_table, **kw) + if ndb_status(compiler): + existing = engine_regex.sub("ENGINE=NDBCLUSTER", existing) + + return existing + + +class AutoStringTinyText(String): + """Class definition for AutoStringTinyText. + + Class is used by compiler function _auto-string_tiny_text(). + """ + + pass + + +@compiles(AutoStringTinyText, 'mysql') +def _auto_string_tiny_text(element, compiler, **kw): + if ndb_status(compiler): + return "TINYTEXT" + else: + return compiler.visit_string(element, **kw) + + +class AutoStringText(String): + """Class definition for AutoStringText. + + Class is used by compiler function _auto_string_text(). + """ + + pass + + +@compiles(AutoStringText, 'mysql') +def _auto_string_text(element, compiler, **kw): + if ndb_status(compiler): + return "TEXT" + else: + return compiler.visit_string(element, **kw) + + +class AutoStringSize(String): + """Class definition for AutoStringSize. + + Class is used by the compiler function _auto_string_size(). + """ + + def __init__(self, length, ndb_size, **kw): + """Initialize and extend the String arguments. + + Function adds the innodb_size and ndb_size arguments to the + function String(). + """ + super(AutoStringSize, self).__init__(length=length, **kw) + self.ndb_size = ndb_size + self.length = length + + +@compiles(AutoStringSize, 'mysql') +def _auto_string_size(element, compiler, **kw): + if ndb_status(compiler): + return compiler.process(VARCHAR(element.ndb_size), **kw) + else: + return compiler.visit_string(element, **kw) diff --git a/oslo_db/sqlalchemy/utils.py b/oslo_db/sqlalchemy/utils.py index 120b1290..ac027cab 100644 --- a/oslo_db/sqlalchemy/utils.py +++ b/oslo_db/sqlalchemy/utils.py @@ -1139,6 +1139,36 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version', return [i[0] for i in noninnodb] +def get_non_ndbcluster_tables(connectable, skip_tables=None): + """Get a list of tables which don't use MySQL Cluster (NDB) storage engine. + + :param connectable: a SQLAlchemy Engine or Connection instance + :param skip_tables: a list of tables which might have a different + storage engine + """ + query_str = """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = :database AND + engine != 'ndbcluster' + """ + + params = {} + if skip_tables: + params = dict( + ('skip_%s' % i, table_name) + for i, table_name in enumerate(skip_tables) + ) + + placeholders = ', '.join(':' + p for p in params) + query_str += ' AND table_name NOT IN (%s)' % placeholders + + params['database'] = connectable.engine.url.database + query = text(query_str) + nonndbcluster = connectable.execute(query, **params) + return [i[0] for i in nonndbcluster] + + class NonCommittingConnectable(object): """A ``Connectable`` substitute which rolls all operations back. diff --git a/oslo_db/tests/sqlalchemy/test_ndb.py b/oslo_db/tests/sqlalchemy/test_ndb.py new file mode 100644 index 00000000..a5a811ba --- /dev/null +++ b/oslo_db/tests/sqlalchemy/test_ndb.py @@ -0,0 +1,176 @@ +# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Tests for MySQL Cluster (NDB) Support.""" + +import logging + +import mock + +from oslo_db import exception +from oslo_db.sqlalchemy import enginefacade +from oslo_db.sqlalchemy import engines +from oslo_db.sqlalchemy import ndb +from oslo_db.sqlalchemy import test_fixtures +from oslo_db.sqlalchemy import utils + +from oslotest import base as test_base + +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import Text + +from sqlalchemy import create_engine +from sqlalchemy import schema + +from sqlalchemy.dialects.mysql.types import TINYTEXT + +LOG = logging.getLogger(__name__) + +_MOCK_CONNECTION = 'mysql+pymysql://' +_TEST_TABLE = Table("test_ndb", MetaData(), + Column('id', Integer, primary_key=True), + Column('test1', ndb.AutoStringTinyText(255)), + Column('test2', ndb.AutoStringText(4096)), + Column('test3', ndb.AutoStringSize(255, 64)), + mysql_engine='InnoDB') + + +class NDBMockTestBase(test_base.BaseTestCase): + def setUp(self): + super(NDBMockTestBase, self).setUp() + mock_dbapi = mock.Mock() + self.test_engine = test_engine = create_engine( + _MOCK_CONNECTION, module=mock_dbapi) + test_engine.dialect._oslodb_enable_ndb_support = True + ndb.init_ndb_events(test_engine) + + +class NDBEventTestCase(NDBMockTestBase): + + def test_ndb_createtable_override(self): + test_engine = self.test_engine + self.assertRegex( + str(schema.CreateTable(_TEST_TABLE).compile( + dialect=test_engine.dialect)), + "ENGINE=NDBCLUSTER") + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_engine_override(self): + test_engine = self.test_engine + statement = "ENGINE=InnoDB" + for fn in test_engine.dispatch.before_cursor_execute: + statement, dialect = fn( + mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False) + self.assertEqual(statement, "ENGINE=NDBCLUSTER") + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_savepoint_override(self): + test_engine = self.test_engine + statement = "SAVEPOINT xyx" + for fn in test_engine.dispatch.before_cursor_execute: + statement, dialect = fn( + mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False) + self.assertEqual(statement, + "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;") + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_rollback_override(self): + test_engine = self.test_engine + statement = "ROLLBACK TO SAVEPOINT xyz" + for fn in test_engine.dispatch.before_cursor_execute: + statement, dialect = fn( + mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False) + self.assertEqual(statement, + "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;") + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_rollback_release_override(self): + test_engine = self.test_engine + statement = "RELEASE SAVEPOINT xyz" + for fn in test_engine.dispatch.before_cursor_execute: + statement, dialect = fn( + mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False) + self.assertEqual(statement, + "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;") + test_engine.dialect._oslodb_enable_ndb_support = False + + +class NDBDatatypesTestCase(NDBMockTestBase): + def test_ndb_autostringtinytext(self): + test_engine = self.test_engine + self.assertEqual("TINYTEXT", + str(ndb.AutoStringTinyText(255).compile( + dialect=test_engine.dialect))) + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_autostringtext(self): + test_engine = self.test_engine + self.assertEqual("TEXT", + str(ndb.AutoStringText(4096).compile( + dialect=test_engine.dialect))) + test_engine.dialect._oslodb_enable_ndb_support = False + + def test_ndb_autostringsize(self): + test_engine = self.test_engine + self.assertEqual('VARCHAR(64)', + str(ndb.AutoStringSize(255, 64).compile( + dialect=test_engine.dialect))) + test_engine.dialect._oslodb_enable_ndb_support = False + + +class NDBOpportunisticTestCase( + test_fixtures.OpportunisticDBTestMixin, test_base.BaseTestCase): + + FIXTURE = test_fixtures.MySQLOpportunisticFixture + + def init_db(self, use_ndb): + # get the MySQL engine created by the opportunistic + # provisioning system + self.engine = enginefacade.writer.get_engine() + if use_ndb: + # if we want NDB, make a new local engine that uses the + # URL / database / schema etc. of the provisioned engine, + # since NDB-ness is a per-table thing + self.engine = engines.create_engine( + self.engine.url, mysql_enable_ndb=True + ) + self.addCleanup(self.engine.dispose) + self.test_table = _TEST_TABLE + try: + self.test_table.create(self.engine) + except exception.DBNotSupportedError: + self.skip("MySQL NDB Cluster not available") + + def test_ndb_enabled(self): + self.init_db(True) + self.assertTrue(ndb.ndb_status(self.engine)) + self.assertIsInstance(self.test_table.c.test1.type, TINYTEXT) + self.assertIsInstance(self.test_table.c.test2.type, Text) + self.assertIsInstance(self.test_table.c.test3.type, String) + self.assertEqual(64, self.test_table.c.test3.type.length) + self.assertEqual([], utils.get_non_ndbcluster_tables(self.engine)) + + def test_ndb_disabled(self): + self.init_db(False) + self.assertFalse(ndb.ndb_status(self.engine)) + self.assertIsInstance(self.test_table.c.test1.type, String) + self.assertEqual(255, self.test_table.c.test1.type.length) + self.assertIsInstance(self.test_table.c.test2.type, String) + self.assertEqual(4096, self.test_table.c.test2.type.length) + self.assertIsInstance(self.test_table.c.test3.type, String) + self.assertEqual(255, self.test_table.c.test3.type.length) + self.assertEqual([], utils.get_non_innodb_tables(self.engine)) diff --git a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py index a37cdc69..5f1df726 100644 --- a/oslo_db/tests/sqlalchemy/test_sqlalchemy.py +++ b/oslo_db/tests/sqlalchemy/test_sqlalchemy.py @@ -341,6 +341,7 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): connection_debug=100, max_pool_size=10, mysql_sql_mode='TRADITIONAL', + mysql_enable_ndb=False, sqlite_fk=False, idle_timeout=mock.ANY, retry_interval=mock.ANY,