Handle slave database connection in EngineFacade
Make it possible to use an additional (aka, 'slave') database connection. This might be useful for offloading of read operations to reduce the load on the RDBMS. Currently, this is only used in Nova, but can be ported to other projects easily. Change-Id: I4296347bb9ee0743738fe1126d1cef8e43b9f96e
This commit is contained in:
parent
f9167b0e1e
commit
1d2df1a2a3
@ -39,6 +39,10 @@ database_opts = [
|
||||
group='DATABASE'),
|
||||
cfg.DeprecatedOpt('connection',
|
||||
group='sql'), ]),
|
||||
cfg.StrOpt('slave_connection',
|
||||
secret=True,
|
||||
help='The SQLAlchemy connection string to use to connect to the'
|
||||
' slave database.'),
|
||||
cfg.StrOpt('mysql_sql_mode',
|
||||
default='TRADITIONAL',
|
||||
help='The SQL mode to be used for MySQL sessions. '
|
||||
|
@ -793,11 +793,22 @@ class EngineFacade(object):
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, sql_connection,
|
||||
def __init__(self, sql_connection, slave_connection=None,
|
||||
sqlite_fk=False, autocommit=True,
|
||||
expire_on_commit=False, **kwargs):
|
||||
"""Initialize engine and sessionmaker instances.
|
||||
|
||||
:param sql_connection: the connection string for the database to use
|
||||
:type sql_connection: string
|
||||
|
||||
:param slave_connection: the connection string for the 'slave' database
|
||||
to use. If not provided, the master database
|
||||
will be used for all operations. Note: this
|
||||
is meant to be used for offloading of read
|
||||
operations to asynchronously replicated slaves
|
||||
to reduce the load on the master database.
|
||||
:type slave_connection: string
|
||||
|
||||
:param sqlite_fk: enable foreign keys in SQLite
|
||||
:type sqlite_fk: bool
|
||||
|
||||
@ -839,39 +850,73 @@ class EngineFacade(object):
|
||||
|
||||
super(EngineFacade, self).__init__()
|
||||
|
||||
self._engine = create_engine(
|
||||
sql_connection=sql_connection,
|
||||
sqlite_fk=sqlite_fk,
|
||||
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
|
||||
idle_timeout=kwargs.get('idle_timeout', 3600),
|
||||
connection_debug=kwargs.get('connection_debug', 0),
|
||||
max_pool_size=kwargs.get('max_pool_size'),
|
||||
max_overflow=kwargs.get('max_overflow'),
|
||||
pool_timeout=kwargs.get('pool_timeout'),
|
||||
sqlite_synchronous=kwargs.get('sqlite_synchronous', True),
|
||||
connection_trace=kwargs.get('connection_trace', False),
|
||||
max_retries=kwargs.get('max_retries', 10),
|
||||
retry_interval=kwargs.get('retry_interval', 10),
|
||||
thread_checkin=kwargs.get('thread_checkin', True))
|
||||
self._session_maker = get_maker(
|
||||
engine=self._engine,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit)
|
||||
engine_kwargs = {
|
||||
'sqlite_fk': sqlite_fk,
|
||||
'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
|
||||
'idle_timeout': kwargs.get('idle_timeout', 3600),
|
||||
'connection_debug': kwargs.get('connection_debug', 0),
|
||||
'max_pool_size': kwargs.get('max_pool_size'),
|
||||
'max_overflow': kwargs.get('max_overflow'),
|
||||
'pool_timeout': kwargs.get('pool_timeout'),
|
||||
'sqlite_synchronous': kwargs.get('sqlite_synchronous', True),
|
||||
'connection_trace': kwargs.get('connection_trace', False),
|
||||
'max_retries': kwargs.get('max_retries', 10),
|
||||
'retry_interval': kwargs.get('retry_interval', 10),
|
||||
'thread_checkin': kwargs.get('thread_checkin', True)
|
||||
}
|
||||
maker_kwargs = {
|
||||
'autocommit': autocommit,
|
||||
'expire_on_commit': expire_on_commit
|
||||
}
|
||||
|
||||
def get_engine(self):
|
||||
"""Get the engine instance (note, that it's shared)."""
|
||||
self._engine = create_engine(sql_connection=sql_connection,
|
||||
**engine_kwargs)
|
||||
self._session_maker = get_maker(engine=self._engine,
|
||||
**maker_kwargs)
|
||||
if slave_connection:
|
||||
self._slave_engine = create_engine(sql_connection=slave_connection,
|
||||
**engine_kwargs)
|
||||
self._slave_session_maker = get_maker(engine=self._slave_engine,
|
||||
**maker_kwargs)
|
||||
else:
|
||||
self._slave_engine = None
|
||||
self._slave_session_maker = None
|
||||
|
||||
def get_engine(self, use_slave=False):
|
||||
"""Get the engine instance (note, that it's shared).
|
||||
|
||||
:param use_slave: if possible, use 'slave' database for this engine.
|
||||
If the connection string for the slave database
|
||||
wasn't provided, 'master' engine will be returned.
|
||||
(defaults to False)
|
||||
:type use_slave: bool
|
||||
|
||||
"""
|
||||
|
||||
if use_slave and self._slave_engine:
|
||||
return self._slave_engine
|
||||
|
||||
return self._engine
|
||||
|
||||
def get_session(self, **kwargs):
|
||||
def get_session(self, use_slave=False, **kwargs):
|
||||
"""Get a Session instance.
|
||||
|
||||
:param use_slave: if possible, use 'slave' database connection for
|
||||
this session. If the connection string for the
|
||||
slave database wasn't provided, a session bound
|
||||
to the 'master' engine will be returned.
|
||||
(defaults to False)
|
||||
:type use_slave: bool
|
||||
|
||||
Keyword arugments will be passed to a sessionmaker instance as is (if
|
||||
passed, they will override the ones used when the sessionmaker instance
|
||||
was created). See SQLAlchemy Session docs for details.
|
||||
|
||||
"""
|
||||
|
||||
if use_slave and self._slave_session_maker:
|
||||
return self._slave_session_maker(**kwargs)
|
||||
|
||||
return self._session_maker(**kwargs)
|
||||
|
||||
@classmethod
|
||||
@ -896,6 +941,7 @@ class EngineFacade(object):
|
||||
conf.register_opts(options.database_opts, 'database')
|
||||
|
||||
return cls(sql_connection=conf.database.connection,
|
||||
slave_connection=conf.database.slave_connection,
|
||||
sqlite_fk=sqlite_fk,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit,
|
||||
|
@ -353,6 +353,7 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
|
||||
def test_creation_from_config(self, create_engine, get_maker):
|
||||
conf = mock.MagicMock()
|
||||
conf.database.connection = 'sqlite:///:memory:'
|
||||
conf.database.slave_connection = None
|
||||
conf.database.items.return_value = [
|
||||
('connection_debug', 100),
|
||||
('max_pool_size', 10),
|
||||
@ -383,6 +384,43 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase):
|
||||
autocommit=False,
|
||||
expire_on_commit=True)
|
||||
|
||||
def test_slave_connection(self):
|
||||
paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')],
|
||||
ext='')
|
||||
master_path = 'sqlite:///' + paths[0]
|
||||
slave_path = 'sqlite:///' + paths[1]
|
||||
|
||||
facade = session.EngineFacade(
|
||||
sql_connection=master_path,
|
||||
slave_connection=slave_path
|
||||
)
|
||||
|
||||
master = facade.get_engine()
|
||||
self.assertEqual(master_path, str(master.url))
|
||||
slave = facade.get_engine(use_slave=True)
|
||||
self.assertEqual(slave_path, str(slave.url))
|
||||
|
||||
master_session = facade.get_session()
|
||||
self.assertEqual(master_path, str(master_session.bind.url))
|
||||
slave_session = facade.get_session(use_slave=True)
|
||||
self.assertEqual(slave_path, str(slave_session.bind.url))
|
||||
|
||||
def test_slave_connection_string_not_provided(self):
|
||||
master_path = 'sqlite:///' + self.create_tempfiles(
|
||||
[('db.master', '')], ext='')[0]
|
||||
|
||||
facade = session.EngineFacade(sql_connection=master_path)
|
||||
|
||||
master = facade.get_engine()
|
||||
slave = facade.get_engine(use_slave=True)
|
||||
self.assertIs(master, slave)
|
||||
self.assertEqual(master_path, str(master.url))
|
||||
|
||||
master_session = facade.get_session()
|
||||
self.assertEqual(master_path, str(master_session.bind.url))
|
||||
slave_session = facade.get_session(use_slave=True)
|
||||
self.assertEqual(master_path, str(slave_session.bind.url))
|
||||
|
||||
|
||||
class MysqlSetCallbackTest(oslo_test.BaseTestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user