Enable MySQL Storage Engine selection

Enables new functionality for selecting InnoDB or MySQL Cluster
as a DB storage backend in OpenStack services.

Closes-Bug: 1564110
Change-Id: I9f1fd2a87fdf75332de2339d3ff4f08ce9220dcf
This commit is contained in:
oorgeron 2017-01-27 15:00:56 -07:00
parent d5f1f288a4
commit c5db636d4e
9 changed files with 371 additions and 0 deletions

View File

@ -243,6 +243,10 @@ class DBDataError(DBError):
"""
class DBNotSupportedError(DBError):
"""Raised when a database backend has raised sqla.exc.NotSupportedError"""
class InvalidSortKey(Exception):
"""A sort key destined for database query usage is invalid."""

View File

@ -44,6 +44,10 @@ database_opts = [
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.BoolOpt('mysql_enable_ndb',
default=False,
help='If True, transparently enables support for handling '
'MySQL Cluster (NDB).'),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',

View File

@ -134,6 +134,7 @@ class _TransactionFactory(object):
self._engine_cfg = {
'sqlite_fk': _Default(False),
'mysql_sql_mode': _Default('TRADITIONAL'),
'mysql_enable_ndb': _Default(False),
'idle_timeout': _Default(3600),
'connection_debug': _Default(0),
'max_pool_size': _Default(),
@ -205,6 +206,8 @@ class _TransactionFactory(object):
:param mysql_sql_mode: MySQL SQL mode, defaults to TRADITIONAL
:param mysql_enable_ndb: enable MySQL Cluster (NDB) support
:param idle_timeout: connection pool recycle time,
defaults to 3600. Note the connection does not actually have to
be "idle" to be recycled.
@ -1176,6 +1179,9 @@ class LegacyEngineFacade(object):
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
(defaults to TRADITIONAL)
:keyword mysql_enable_ndb: If True, transparently enables support for
handling MySQL Cluster (NDB).
(defaults to False)
:keyword idle_timeout: timeout before idle sql connections are reaped
(defaults to 3600)
:keyword connection_debug: verbosity of SQL debugging information.

View File

@ -32,6 +32,7 @@ from sqlalchemy.sql.expression import select
from oslo_db import exception
from oslo_db.sqlalchemy import exc_filters
from oslo_db.sqlalchemy import ndb
from oslo_db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
@ -101,6 +102,7 @@ def _setup_logging(connection_debug=0):
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
mysql_enable_ndb=False,
idle_timeout=3600,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
@ -132,6 +134,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
engine = sqlalchemy.create_engine(url, **engine_args)
if mysql_enable_ndb:
ndb.enable_ndb_support(engine)
_init_events(
engine,
mysql_sql_mode=mysql_sql_mode,
@ -265,6 +270,9 @@ def _init_events(engine, mysql_sql_mode=None, **kw):
"consider enabling TRADITIONAL or STRICT_ALL_TABLES",
realmode)
if ndb.ndb_status(engine):
ndb.init_ndb_events(engine)
@_init_events.dispatch_for("sqlite")
def _init_events(engine, sqlite_synchronous=True, sqlite_fk=False, **kw):

View File

@ -395,6 +395,11 @@ def _is_db_connection_error(operational_error, match, engine_name,
raise exception.DBConnectionError(operational_error)
@filters("*", sqla_exc.NotSupportedError, r".*")
def _raise_for_NotSupportedError(error, match, engine_name, is_disconnect):
raise exception.DBNotSupportedError(error)
@filters("*", sqla_exc.DBAPIError, r".*")
def _raise_for_remaining_DBAPIError(error, match, engine_name, is_disconnect):
"""Filter for remaining DBAPIErrors.

137
oslo_db/sqlalchemy/ndb.py Normal file
View File

@ -0,0 +1,137 @@
# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Core functions for MySQL Cluster (NDB) Support."""
import re
from sqlalchemy import String, event, schema
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.types import VARCHAR
engine_regex = re.compile("engine=innodb", re.IGNORECASE)
trans_regex = re.compile("savepoint|rollback|release savepoint", re.IGNORECASE)
def enable_ndb_support(engine):
"""Enable NDB Support.
Function to flag the MySQL engine dialect to support features specific
to MySQL Cluster (NDB).
"""
engine.dialect._oslodb_enable_ndb_support = True
def ndb_status(engine_or_compiler):
"""Test if NDB Support is enabled.
Function to test if NDB support is enabled or not.
"""
return getattr(engine_or_compiler.dialect,
'_oslodb_enable_ndb_support',
False)
def init_ndb_events(engine):
"""Initialize NDB Events.
Function starts NDB specific events.
"""
@event.listens_for(engine, "before_cursor_execute", retval=True)
def before_cursor_execute(conn, cursor, statement, parameters, context,
executemany):
"""Listen for specific SQL strings and replace automatically.
Function will intercept any raw execute calls and automatically
convert InnoDB to NDBCLUSTER, drop SAVEPOINT requests, drop
ROLLBACK requests, and drop RELEASE SAVEPOINT requests.
"""
if ndb_status(engine):
statement = engine_regex.sub("ENGINE=NDBCLUSTER", statement)
if re.match(trans_regex, statement):
statement = "SET @oslo_db_ndb_savepoint_rollback_disabled = 0;"
return statement, parameters
@compiles(schema.CreateTable, "mysql")
def prefix_inserts(create_table, compiler, **kw):
"""Replace InnoDB with NDBCLUSTER automatically.
Function will intercept CreateTable() calls and automatically
convert InnoDB to NDBCLUSTER. Targets compiler events.
"""
existing = compiler.visit_create_table(create_table, **kw)
if ndb_status(compiler):
existing = engine_regex.sub("ENGINE=NDBCLUSTER", existing)
return existing
class AutoStringTinyText(String):
"""Class definition for AutoStringTinyText.
Class is used by compiler function _auto-string_tiny_text().
"""
pass
@compiles(AutoStringTinyText, 'mysql')
def _auto_string_tiny_text(element, compiler, **kw):
if ndb_status(compiler):
return "TINYTEXT"
else:
return compiler.visit_string(element, **kw)
class AutoStringText(String):
"""Class definition for AutoStringText.
Class is used by compiler function _auto_string_text().
"""
pass
@compiles(AutoStringText, 'mysql')
def _auto_string_text(element, compiler, **kw):
if ndb_status(compiler):
return "TEXT"
else:
return compiler.visit_string(element, **kw)
class AutoStringSize(String):
"""Class definition for AutoStringSize.
Class is used by the compiler function _auto_string_size().
"""
def __init__(self, length, ndb_size, **kw):
"""Initialize and extend the String arguments.
Function adds the innodb_size and ndb_size arguments to the
function String().
"""
super(AutoStringSize, self).__init__(length=length, **kw)
self.ndb_size = ndb_size
self.length = length
@compiles(AutoStringSize, 'mysql')
def _auto_string_size(element, compiler, **kw):
if ndb_status(compiler):
return compiler.process(VARCHAR(element.ndb_size), **kw)
else:
return compiler.visit_string(element, **kw)

View File

@ -1139,6 +1139,36 @@ def get_non_innodb_tables(connectable, skip_tables=('migrate_version',
return [i[0] for i in noninnodb]
def get_non_ndbcluster_tables(connectable, skip_tables=None):
"""Get a list of tables which don't use MySQL Cluster (NDB) storage engine.
:param connectable: a SQLAlchemy Engine or Connection instance
:param skip_tables: a list of tables which might have a different
storage engine
"""
query_str = """
SELECT table_name
FROM information_schema.tables
WHERE table_schema = :database AND
engine != 'ndbcluster'
"""
params = {}
if skip_tables:
params = dict(
('skip_%s' % i, table_name)
for i, table_name in enumerate(skip_tables)
)
placeholders = ', '.join(':' + p for p in params)
query_str += ' AND table_name NOT IN (%s)' % placeholders
params['database'] = connectable.engine.url.database
query = text(query_str)
nonndbcluster = connectable.execute(query, **params)
return [i[0] for i in nonndbcluster]
class NonCommittingConnectable(object):
"""A ``Connectable`` substitute which rolls all operations back.

View File

@ -0,0 +1,176 @@
# Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for MySQL Cluster (NDB) Support."""
import logging
import mock
from oslo_db import exception
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import engines
from oslo_db.sqlalchemy import ndb
from oslo_db.sqlalchemy import test_fixtures
from oslo_db.sqlalchemy import utils
from oslotest import base as test_base
from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy import schema
from sqlalchemy.dialects.mysql.types import TINYTEXT
LOG = logging.getLogger(__name__)
_MOCK_CONNECTION = 'mysql+pymysql://'
_TEST_TABLE = Table("test_ndb", MetaData(),
Column('id', Integer, primary_key=True),
Column('test1', ndb.AutoStringTinyText(255)),
Column('test2', ndb.AutoStringText(4096)),
Column('test3', ndb.AutoStringSize(255, 64)),
mysql_engine='InnoDB')
class NDBMockTestBase(test_base.BaseTestCase):
def setUp(self):
super(NDBMockTestBase, self).setUp()
mock_dbapi = mock.Mock()
self.test_engine = test_engine = create_engine(
_MOCK_CONNECTION, module=mock_dbapi)
test_engine.dialect._oslodb_enable_ndb_support = True
ndb.init_ndb_events(test_engine)
class NDBEventTestCase(NDBMockTestBase):
def test_ndb_createtable_override(self):
test_engine = self.test_engine
self.assertRegex(
str(schema.CreateTable(_TEST_TABLE).compile(
dialect=test_engine.dialect)),
"ENGINE=NDBCLUSTER")
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_engine_override(self):
test_engine = self.test_engine
statement = "ENGINE=InnoDB"
for fn in test_engine.dispatch.before_cursor_execute:
statement, dialect = fn(
mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
self.assertEqual(statement, "ENGINE=NDBCLUSTER")
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_savepoint_override(self):
test_engine = self.test_engine
statement = "SAVEPOINT xyx"
for fn in test_engine.dispatch.before_cursor_execute:
statement, dialect = fn(
mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
self.assertEqual(statement,
"SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_rollback_override(self):
test_engine = self.test_engine
statement = "ROLLBACK TO SAVEPOINT xyz"
for fn in test_engine.dispatch.before_cursor_execute:
statement, dialect = fn(
mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
self.assertEqual(statement,
"SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_rollback_release_override(self):
test_engine = self.test_engine
statement = "RELEASE SAVEPOINT xyz"
for fn in test_engine.dispatch.before_cursor_execute:
statement, dialect = fn(
mock.Mock(), mock.Mock(), statement, {}, mock.Mock(), False)
self.assertEqual(statement,
"SET @oslo_db_ndb_savepoint_rollback_disabled = 0;")
test_engine.dialect._oslodb_enable_ndb_support = False
class NDBDatatypesTestCase(NDBMockTestBase):
def test_ndb_autostringtinytext(self):
test_engine = self.test_engine
self.assertEqual("TINYTEXT",
str(ndb.AutoStringTinyText(255).compile(
dialect=test_engine.dialect)))
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_autostringtext(self):
test_engine = self.test_engine
self.assertEqual("TEXT",
str(ndb.AutoStringText(4096).compile(
dialect=test_engine.dialect)))
test_engine.dialect._oslodb_enable_ndb_support = False
def test_ndb_autostringsize(self):
test_engine = self.test_engine
self.assertEqual('VARCHAR(64)',
str(ndb.AutoStringSize(255, 64).compile(
dialect=test_engine.dialect)))
test_engine.dialect._oslodb_enable_ndb_support = False
class NDBOpportunisticTestCase(
test_fixtures.OpportunisticDBTestMixin, test_base.BaseTestCase):
FIXTURE = test_fixtures.MySQLOpportunisticFixture
def init_db(self, use_ndb):
# get the MySQL engine created by the opportunistic
# provisioning system
self.engine = enginefacade.writer.get_engine()
if use_ndb:
# if we want NDB, make a new local engine that uses the
# URL / database / schema etc. of the provisioned engine,
# since NDB-ness is a per-table thing
self.engine = engines.create_engine(
self.engine.url, mysql_enable_ndb=True
)
self.addCleanup(self.engine.dispose)
self.test_table = _TEST_TABLE
try:
self.test_table.create(self.engine)
except exception.DBNotSupportedError:
self.skip("MySQL NDB Cluster not available")
def test_ndb_enabled(self):
self.init_db(True)
self.assertTrue(ndb.ndb_status(self.engine))
self.assertIsInstance(self.test_table.c.test1.type, TINYTEXT)
self.assertIsInstance(self.test_table.c.test2.type, Text)
self.assertIsInstance(self.test_table.c.test3.type, String)
self.assertEqual(64, self.test_table.c.test3.type.length)
self.assertEqual([], utils.get_non_ndbcluster_tables(self.engine))
def test_ndb_disabled(self):
self.init_db(False)
self.assertFalse(ndb.ndb_status(self.engine))
self.assertIsInstance(self.test_table.c.test1.type, String)
self.assertEqual(255, self.test_table.c.test1.type.length)
self.assertIsInstance(self.test_table.c.test2.type, String)
self.assertEqual(4096, self.test_table.c.test2.type.length)
self.assertIsInstance(self.test_table.c.test3.type, String)
self.assertEqual(255, self.test_table.c.test3.type.length)
self.assertEqual([], utils.get_non_innodb_tables(self.engine))

View File

@ -341,6 +341,7 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
connection_debug=100,
max_pool_size=10,
mysql_sql_mode='TRADITIONAL',
mysql_enable_ndb=False,
sqlite_fk=False,
idle_timeout=mock.ANY,
retry_interval=mock.ANY,