From 1d2df1a2a3d77dd39744b9bf91a73a4451193732 Mon Sep 17 00:00:00 2001 From: Roman Podoliaka Date: Wed, 21 May 2014 13:05:43 +0300 Subject: [PATCH] Handle slave database connection in EngineFacade Make it possible to use an additional (aka, 'slave') database connection. This might be useful for offloading of read operations to reduce the load on the RDBMS. Currently, this is only used in Nova, but can be ported to other projects easily. Change-Id: I4296347bb9ee0743738fe1126d1cef8e43b9f96e --- oslo/db/options.py | 4 ++ oslo/db/sqlalchemy/session.py | 90 ++++++++++++++++++++++------- tests/sqlalchemy/test_sqlalchemy.py | 38 ++++++++++++ 3 files changed, 110 insertions(+), 22 deletions(-) diff --git a/oslo/db/options.py b/oslo/db/options.py index 19058d00..72e626c6 100644 --- a/oslo/db/options.py +++ b/oslo/db/options.py @@ -39,6 +39,10 @@ database_opts = [ group='DATABASE'), cfg.DeprecatedOpt('connection', group='sql'), ]), + cfg.StrOpt('slave_connection', + secret=True, + help='The SQLAlchemy connection string to use to connect to the' + ' slave database.'), cfg.StrOpt('mysql_sql_mode', default='TRADITIONAL', help='The SQL mode to be used for MySQL sessions. ' diff --git a/oslo/db/sqlalchemy/session.py b/oslo/db/sqlalchemy/session.py index 3b3a4cab..056644f4 100644 --- a/oslo/db/sqlalchemy/session.py +++ b/oslo/db/sqlalchemy/session.py @@ -793,11 +793,22 @@ class EngineFacade(object): """ - def __init__(self, sql_connection, + def __init__(self, sql_connection, slave_connection=None, sqlite_fk=False, autocommit=True, expire_on_commit=False, **kwargs): """Initialize engine and sessionmaker instances. + :param sql_connection: the connection string for the database to use + :type sql_connection: string + + :param slave_connection: the connection string for the 'slave' database + to use. If not provided, the master database + will be used for all operations. Note: this + is meant to be used for offloading of read + operations to asynchronously replicated slaves + to reduce the load on the master database. + :type slave_connection: string + :param sqlite_fk: enable foreign keys in SQLite :type sqlite_fk: bool @@ -839,39 +850,73 @@ class EngineFacade(object): super(EngineFacade, self).__init__() - self._engine = create_engine( - sql_connection=sql_connection, - sqlite_fk=sqlite_fk, - mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'), - idle_timeout=kwargs.get('idle_timeout', 3600), - connection_debug=kwargs.get('connection_debug', 0), - max_pool_size=kwargs.get('max_pool_size'), - max_overflow=kwargs.get('max_overflow'), - pool_timeout=kwargs.get('pool_timeout'), - sqlite_synchronous=kwargs.get('sqlite_synchronous', True), - connection_trace=kwargs.get('connection_trace', False), - max_retries=kwargs.get('max_retries', 10), - retry_interval=kwargs.get('retry_interval', 10), - thread_checkin=kwargs.get('thread_checkin', True)) - self._session_maker = get_maker( - engine=self._engine, - autocommit=autocommit, - expire_on_commit=expire_on_commit) + engine_kwargs = { + 'sqlite_fk': sqlite_fk, + 'mysql_sql_mode': kwargs.get('mysql_sql_mode', 'TRADITIONAL'), + 'idle_timeout': kwargs.get('idle_timeout', 3600), + 'connection_debug': kwargs.get('connection_debug', 0), + 'max_pool_size': kwargs.get('max_pool_size'), + 'max_overflow': kwargs.get('max_overflow'), + 'pool_timeout': kwargs.get('pool_timeout'), + 'sqlite_synchronous': kwargs.get('sqlite_synchronous', True), + 'connection_trace': kwargs.get('connection_trace', False), + 'max_retries': kwargs.get('max_retries', 10), + 'retry_interval': kwargs.get('retry_interval', 10), + 'thread_checkin': kwargs.get('thread_checkin', True) + } + maker_kwargs = { + 'autocommit': autocommit, + 'expire_on_commit': expire_on_commit + } - def get_engine(self): - """Get the engine instance (note, that it's shared).""" + self._engine = create_engine(sql_connection=sql_connection, + **engine_kwargs) + self._session_maker = get_maker(engine=self._engine, + **maker_kwargs) + if slave_connection: + self._slave_engine = create_engine(sql_connection=slave_connection, + **engine_kwargs) + self._slave_session_maker = get_maker(engine=self._slave_engine, + **maker_kwargs) + else: + self._slave_engine = None + self._slave_session_maker = None + + def get_engine(self, use_slave=False): + """Get the engine instance (note, that it's shared). + + :param use_slave: if possible, use 'slave' database for this engine. + If the connection string for the slave database + wasn't provided, 'master' engine will be returned. + (defaults to False) + :type use_slave: bool + + """ + + if use_slave and self._slave_engine: + return self._slave_engine return self._engine - def get_session(self, **kwargs): + def get_session(self, use_slave=False, **kwargs): """Get a Session instance. + :param use_slave: if possible, use 'slave' database connection for + this session. If the connection string for the + slave database wasn't provided, a session bound + to the 'master' engine will be returned. + (defaults to False) + :type use_slave: bool + Keyword arugments will be passed to a sessionmaker instance as is (if passed, they will override the ones used when the sessionmaker instance was created). See SQLAlchemy Session docs for details. """ + if use_slave and self._slave_session_maker: + return self._slave_session_maker(**kwargs) + return self._session_maker(**kwargs) @classmethod @@ -896,6 +941,7 @@ class EngineFacade(object): conf.register_opts(options.database_opts, 'database') return cls(sql_connection=conf.database.connection, + slave_connection=conf.database.slave_connection, sqlite_fk=sqlite_fk, autocommit=autocommit, expire_on_commit=expire_on_commit, diff --git a/tests/sqlalchemy/test_sqlalchemy.py b/tests/sqlalchemy/test_sqlalchemy.py index a6f097ba..d5260041 100644 --- a/tests/sqlalchemy/test_sqlalchemy.py +++ b/tests/sqlalchemy/test_sqlalchemy.py @@ -353,6 +353,7 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): def test_creation_from_config(self, create_engine, get_maker): conf = mock.MagicMock() conf.database.connection = 'sqlite:///:memory:' + conf.database.slave_connection = None conf.database.items.return_value = [ ('connection_debug', 100), ('max_pool_size', 10), @@ -383,6 +384,43 @@ class EngineFacadeTestCase(oslo_test.BaseTestCase): autocommit=False, expire_on_commit=True) + def test_slave_connection(self): + paths = self.create_tempfiles([('db.master', ''), ('db.slave', '')], + ext='') + master_path = 'sqlite:///' + paths[0] + slave_path = 'sqlite:///' + paths[1] + + facade = session.EngineFacade( + sql_connection=master_path, + slave_connection=slave_path + ) + + master = facade.get_engine() + self.assertEqual(master_path, str(master.url)) + slave = facade.get_engine(use_slave=True) + self.assertEqual(slave_path, str(slave.url)) + + master_session = facade.get_session() + self.assertEqual(master_path, str(master_session.bind.url)) + slave_session = facade.get_session(use_slave=True) + self.assertEqual(slave_path, str(slave_session.bind.url)) + + def test_slave_connection_string_not_provided(self): + master_path = 'sqlite:///' + self.create_tempfiles( + [('db.master', '')], ext='')[0] + + facade = session.EngineFacade(sql_connection=master_path) + + master = facade.get_engine() + slave = facade.get_engine(use_slave=True) + self.assertIs(master, slave) + self.assertEqual(master_path, str(master.url)) + + master_session = facade.get_session() + self.assertEqual(master_path, str(master_session.bind.url)) + slave_session = facade.get_session(use_slave=True) + self.assertEqual(master_path, str(slave_session.bind.url)) + class MysqlSetCallbackTest(oslo_test.BaseTestCase):