diff --git a/neutron/tests/common/base.py b/neutron/tests/common/base.py
index 2340cd61403..d40685afb99 100644
--- a/neutron/tests/common/base.py
+++ b/neutron/tests/common/base.py
@@ -14,7 +14,6 @@
 import functools
 
 from neutron_lib import constants as n_const
-from oslo_db.sqlalchemy import test_base
 import testtools.testcase
 import unittest2.case
 
@@ -69,21 +68,3 @@ def no_skip_on_missing_deps(wrapped):
                     'is enabled, skip reason: %s' % (wrapped.__name__, e))
             raise
     return wrapper
-
-
-class MySQLTestCase(test_base.MySQLOpportunisticTestCase):
-    """Base test class for MySQL tests.
-
-    If the MySQL db is unavailable then this test is skipped, unless
-    OS_FAIL_ON_MISSING_DEPS is enabled.
-    """
-    SKIP_ON_UNAVAILABLE_DB = not base.bool_from_env('OS_FAIL_ON_MISSING_DEPS')
-
-
-class PostgreSQLTestCase(test_base.PostgreSQLOpportunisticTestCase):
-    """Base test class for PostgreSQL tests.
-
-    If the PostgreSQL db is unavailable then this test is skipped, unless
-    OS_FAIL_ON_MISSING_DEPS is enabled.
-    """
-    SKIP_ON_UNAVAILABLE_DB = not base.bool_from_env('OS_FAIL_ON_MISSING_DEPS')
diff --git a/neutron/tests/fullstack/base.py b/neutron/tests/fullstack/base.py
index 65d8984dbb5..ea35b092f6f 100644
--- a/neutron/tests/fullstack/base.py
+++ b/neutron/tests/fullstack/base.py
@@ -15,14 +15,11 @@
 import os
 
 from oslo_config import cfg
-from oslo_db.sqlalchemy import test_base
 
-from neutron.db.migration import cli as migration
-from neutron.tests import base as tests_base
-from neutron.tests.common import base
 from neutron.tests.common import helpers
 from neutron.tests.fullstack.resources import client as client_resource
 from neutron.tests import tools
+from neutron.tests.unit import testlib_api
 
 
 # This is the directory from which infra fetches log files for fullstack tests
@@ -30,20 +27,31 @@ DEFAULT_LOG_DIR = os.path.join(helpers.get_test_log_path(),
                                'dsvm-fullstack-logs')
 
 
-class BaseFullStackTestCase(base.MySQLTestCase):
+class BaseFullStackTestCase(testlib_api.MySQLTestCaseMixin,
+                            testlib_api.SqlTestCase):
     """Base test class for full-stack tests."""
 
+    BUILD_WITH_MIGRATIONS = True
+
     def setUp(self, environment):
         super(BaseFullStackTestCase, self).setUp()
 
-        tests_base.setup_test_logging(
-            cfg.CONF, DEFAULT_LOG_DIR, '%s.txt' % self.get_name())
+        # NOTE(zzzeek): the opportunistic DB fixtures have built for
+        # us a per-test (or per-process) database.  Set the URL of this
+        # database in CONF as the full stack tests need to actually run a
+        # neutron server against this database.
+        _orig_db_url = cfg.CONF.database.connection
+        cfg.CONF.set_override(
+            'connection', str(self.engine.url), group='database')
+        self.addCleanup(
+            cfg.CONF.set_override,
+            "connection", _orig_db_url, group="database"
+        )
 
         # NOTE(ihrachys): seed should be reset before environment fixture below
         # since the latter starts services that may rely on generated port
         # numbers
         tools.reset_random_seed()
-        self.create_db_tables()
         self.environment = environment
         self.environment.test_name = self.get_name()
         self.useFixture(self.environment)
@@ -54,35 +62,3 @@ class BaseFullStackTestCase(base.MySQLTestCase):
     def get_name(self):
         class_name, test_name = self.id().split(".")[-2:]
         return "%s.%s" % (class_name, test_name)
-
-    def create_db_tables(self):
-        """Populate the new database.
-
-        MySQLTestCase creates a new database for each test, but these need to
-        be populated with the appropriate tables. Before we can do that, we
-        must change the 'connection' option which the Neutron code knows to
-        look at.
-
-        Currently, the username and password options are hard-coded by
-        oslo.db and neutron/tests/functional/contrib/gate_hook.sh. Also,
-        we only support MySQL for now, but the groundwork for adding Postgres
-        is already laid.
-        """
-        conn = ("mysql+pymysql://%(username)s:%(password)s"
-                "@127.0.0.1/%(db_name)s" % {
-                    'username': test_base.DbFixture.USERNAME,
-                    'password': test_base.DbFixture.PASSWORD,
-                    'db_name': self.engine.url.database})
-
-        alembic_config = migration.get_neutron_config()
-        alembic_config.neutron_config = cfg.CONF
-        self.original_conn = cfg.CONF.database.connection
-        self.addCleanup(self._revert_connection_address)
-        cfg.CONF.set_override('connection', conn, group='database')
-
-        migration.do_alembic_command(alembic_config, 'upgrade', 'heads')
-
-    def _revert_connection_address(self):
-        cfg.CONF.set_override('connection',
-                              self.original_conn,
-                              group='database')
diff --git a/neutron/tests/fullstack/test_connectivity.py b/neutron/tests/fullstack/test_connectivity.py
index 8701e4ef979..1fef368a66c 100644
--- a/neutron/tests/fullstack/test_connectivity.py
+++ b/neutron/tests/fullstack/test_connectivity.py
@@ -19,9 +19,9 @@ import testscenarios
 from neutron.tests.fullstack import base
 from neutron.tests.fullstack.resources import environment
 from neutron.tests.fullstack.resources import machine
+from neutron.tests.unit import testlib_api
 
-
-load_tests = testscenarios.load_tests_apply_scenarios
+load_tests = testlib_api.module_load_tests
 
 
 class BaseConnectivitySameNetworkTest(base.BaseFullStackTestCase):
diff --git a/neutron/tests/fullstack/test_l3_agent.py b/neutron/tests/fullstack/test_l3_agent.py
index c2b248fed01..a3a94551891 100644
--- a/neutron/tests/fullstack/test_l3_agent.py
+++ b/neutron/tests/fullstack/test_l3_agent.py
@@ -28,6 +28,9 @@ from neutron.tests.common import machine_fixtures
 from neutron.tests.fullstack import base
 from neutron.tests.fullstack.resources import environment
 from neutron.tests.fullstack.resources import machine
+from neutron.tests.unit import testlib_api
+
+load_tests = testlib_api.module_load_tests
 
 
 class TestL3Agent(base.BaseFullStackTestCase):
diff --git a/neutron/tests/fullstack/test_qos.py b/neutron/tests/fullstack/test_qos.py
index 73e7b4f93f7..97981ffcb30 100644
--- a/neutron/tests/fullstack/test_qos.py
+++ b/neutron/tests/fullstack/test_qos.py
@@ -16,7 +16,6 @@ import functools
 
 from neutron_lib import constants
 from oslo_utils import uuidutils
-import testscenarios
 
 from neutron.agent.common import ovs_lib
 from neutron.agent.linux import bridge_lib
@@ -27,6 +26,7 @@ from neutron.tests.common.agents import l2_extensions
 from neutron.tests.fullstack import base
 from neutron.tests.fullstack.resources import environment
 from neutron.tests.fullstack.resources import machine
+from neutron.tests.unit import testlib_api
 
 from neutron.plugins.ml2.drivers.linuxbridge.agent.common import \
     config as linuxbridge_agent_config
@@ -36,8 +36,7 @@ from neutron.plugins.ml2.drivers.openvswitch.mech_driver import \
     mech_openvswitch as mech_ovs
 
 
-load_tests = testscenarios.load_tests_apply_scenarios
-
+load_tests = testlib_api.module_load_tests
 
 BANDWIDTH_BURST = 100
 BANDWIDTH_LIMIT = 500
diff --git a/neutron/tests/functional/db/test_ipam.py b/neutron/tests/functional/db/test_ipam.py
index 0f28f74b017..2520fa48a57 100644
--- a/neutron/tests/functional/db/test_ipam.py
+++ b/neutron/tests/functional/db/test_ipam.py
@@ -22,12 +22,18 @@ from neutron import context
 from neutron.db import db_base_plugin_v2 as base_plugin
 from neutron.db import models_v2
 from neutron.ipam.drivers.neutrondb_ipam import db_models as ipam_models
-from neutron.tests import base
-from neutron.tests.common import base as common_base
 from neutron.tests.unit import testlib_api
 
 
-class IpamTestCase(base.BaseTestCase):
+# required in order for testresources to optimize same-backend
+# tests together
+load_tests = testlib_api.module_load_tests
+# FIXME(zzzeek): needs to be provided by oslo.db, current version
+# is not working
+# load_tests = test_base.optimize_db_test_loader(__file__)
+
+
+class IpamTestCase(testlib_api.SqlTestCase):
     """
     Base class for tests that aim to test ip allocation.
     """
@@ -36,7 +42,6 @@ class IpamTestCase(base.BaseTestCase):
     def setUp(self):
         super(IpamTestCase, self).setUp()
         cfg.CONF.set_override('notify_nova_on_port_status_changes', False)
-        self.useFixture(testlib_api.SqlFixture())
         if self.use_pluggable_ipam:
             self._turn_on_pluggable_ipam()
         else:
@@ -155,17 +160,17 @@ class IpamTestCase(base.BaseTestCase):
             self._create_port(self.port_id)
 
 
-class TestIpamMySql(common_base.MySQLTestCase, IpamTestCase):
+class TestIpamMySql(testlib_api.MySQLTestCaseMixin, IpamTestCase):
     pass
 
 
-class TestIpamPsql(common_base.PostgreSQLTestCase, IpamTestCase):
+class TestIpamPsql(testlib_api.PostgreSQLTestCaseMixin, IpamTestCase):
     pass
 
 
-class TestPluggableIpamMySql(common_base.MySQLTestCase, IpamTestCase):
+class TestPluggableIpamMySql(testlib_api.MySQLTestCaseMixin, IpamTestCase):
     use_pluggable_ipam = True
 
 
-class TestPluggableIpamPsql(common_base.PostgreSQLTestCase, IpamTestCase):
+class TestPluggableIpamPsql(testlib_api.PostgreSQLTestCaseMixin, IpamTestCase):
     use_pluggable_ipam = True
diff --git a/neutron/tests/functional/db/test_migrations.conf b/neutron/tests/functional/db/test_migrations.conf
deleted file mode 100644
index e77fb725090..00000000000
--- a/neutron/tests/functional/db/test_migrations.conf
+++ /dev/null
@@ -1,14 +0,0 @@
-[migration_dbs]
-# Migration DB details are listed separately as they can't be connected to
-# concurrently. These databases can't be the same as above
-
-# Note, sqlite:// is in-memory and unique each time it is spawned.
-# However file sqlite's are not unique.
-
-#sqlite=sqlite://
-#sqlitefile=sqlite:///test_migrations.db
-#mysql=mysql+mysqldb://user:pass@localhost/test_migrations
-#postgresql=postgresql+psycopg2://user:pass@localhost/test_migrations
-
-[walk_style]
-snake_walk=yes
diff --git a/neutron/tests/functional/db/test_migrations.py b/neutron/tests/functional/db/test_migrations.py
index 3bc8987ad3d..0957e9e471e 100644
--- a/neutron/tests/functional/db/test_migrations.py
+++ b/neutron/tests/functional/db/test_migrations.py
@@ -14,20 +14,15 @@
 
 import collections
 
-import abc
 from alembic.ddl import base as alembic_ddl
 from alembic import script as alembic_script
 from contextlib import contextmanager
-import os
 from oslo_config import cfg
 from oslo_config import fixture as config_fixture
-from oslo_db.sqlalchemy import session
-from oslo_db.sqlalchemy import test_base
 from oslo_db.sqlalchemy import test_migrations
 from oslo_db.sqlalchemy import utils as oslo_utils
+from oslotest import base as oslotest_base
 import six
-from six.moves import configparser
-from six.moves.urllib import parse
 import sqlalchemy
 from sqlalchemy import event
 from sqlalchemy.sql import ddl as sqla_ddl
@@ -38,8 +33,7 @@ import subprocess
 from neutron.db.migration.alembic_migrations import external
 from neutron.db.migration import cli as migration
 from neutron.db.migration.models import head as head_models
-from neutron.tests import base as base_tests
-from neutron.tests.common import base
+from neutron.tests.unit import testlib_api
 
 cfg.CONF.import_opt('core_plugin', 'neutron.common.config')
 
@@ -129,6 +123,8 @@ class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
         - wrong value.
     '''
 
+    BUILD_SCHEMA = False
+
     def setUp(self):
         super(_TestModelsMigrations, self).setUp()
         self.cfg = self.useFixture(config_fixture.Config())
@@ -207,8 +203,10 @@ class _TestModelsMigrations(test_migrations.ModelsMigrationsSync):
         return True
 
 
-class TestModelsMigrationsMysql(_TestModelsMigrations,
-                                base.MySQLTestCase):
+class TestModelsMigrationsMysql(testlib_api.MySQLTestCaseMixin,
+                                _TestModelsMigrations,
+                                testlib_api.SqlTestCaseLight):
+
     @contextmanager
     def _listener(self, engine, listener_func):
         try:
@@ -347,12 +345,14 @@ class TestModelsMigrationsMysql(_TestModelsMigrations,
         self._test_has_offline_migrations('heads', False)
 
 
-class TestModelsMigrationsPsql(_TestModelsMigrations,
-                               base.PostgreSQLTestCase):
+class TestModelsMigrationsPsql(testlib_api.PostgreSQLTestCaseMixin,
+                               _TestModelsMigrations,
+                               testlib_api.SqlTestCaseLight):
     pass
 
 
-class TestSanityCheck(test_base.DbTestCase):
+class TestSanityCheck(testlib_api.SqlTestCaseLight):
+    BUILD_SCHEMA = False
 
     def setUp(self):
         super(TestSanityCheck, self).setUp()
@@ -381,7 +381,7 @@ class TestSanityCheck(test_base.DbTestCase):
                               script.check_sanity, conn)
 
 
-class TestWalkDowngrade(test_base.DbTestCase):
+class TestWalkDowngrade(oslotest_base.BaseTestCase):
 
     def setUp(self):
         super(TestWalkDowngrade, self).setUp()
@@ -400,84 +400,16 @@ class TestWalkDowngrade(test_base.DbTestCase):
 
         if failed_revisions:
             self.fail('Migrations %s have downgrade' % failed_revisions)
-
-
-def _is_backend_avail(backend,
-                      user="openstack_citest",
-                      passwd="openstack_citest",
-                      database="openstack_citest"):
-        # is_backend_avail will be soon deprecated from oslo_db
-        # thats why its added here
-        try:
-            connect_uri = oslo_utils.get_connect_string(backend, user=user,
-                                                        passwd=passwd,
-                                                        database=database)
-            engine = session.create_engine(connect_uri)
-            connection = engine.connect()
-        except Exception:
-            # intentionally catch all to handle exceptions even if we don't
-            # have any backend code loaded.
-            return False
-        else:
-            connection.close()
-            engine.dispose()
             return True
 
 
-@six.add_metaclass(abc.ABCMeta)
-class _TestWalkMigrations(base_tests.BaseTestCase, test_base.DbTestCase):
+class _TestWalkMigrations(object):
     '''This will add framework for testing schema migarations
        for different backends.
 
-    Right now it supports pymysql and postgresql backends. Pymysql
-    and postgresql commands are executed to walk between to do updates.
-    For upgrade and downgrade migrate_up and migrate down functions
-    have been added.
     '''
 
-    DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
-                                       'test_migrations.conf')
-    CONFIG_FILE_PATH = os.environ.get('NEUTRON_TEST_MIGRATIONS_CONF',
-                                      DEFAULT_CONFIG_FILE)
-
-    def setUp(self):
-        if not _is_backend_avail(self.BACKEND):
-            self.skipTest("%s not available" % self.BACKEND)
-
-        super(_TestWalkMigrations, self).setUp()
-
-        self.snake_walk = False
-        self.test_databases = {}
-
-        if os.path.exists(self.CONFIG_FILE_PATH):
-            cp = configparser.RawConfigParser()
-            try:
-                cp.read(self.CONFIG_FILE_PATH)
-                options = cp.options('migration_dbs')
-                for key in options:
-                    self.test_databases[key] = cp.get('migration_dbs', key)
-                self.snake_walk = cp.getboolean('walk_style', 'snake_walk')
-            except configparser.ParsingError as e:
-                self.fail("Failed to read test_migrations.conf config "
-                          "file. Got error: %s" % e)
-        else:
-            self.fail("Failed to find test_migrations.conf config "
-                      "file.")
-
-        self.engines = {}
-        for key, value in self.test_databases.items():
-            self.engines[key] = sqlalchemy.create_engine(value)
-
-        # We start each test case with a completely blank slate.
-        self._reset_databases()
-
-    def assertColumnInTable(self, engine, table_name, column):
-        table = oslo_utils.get_table(engine, table_name)
-        self.assertIn(column, table.columns)
-
-    def assertColumnNotInTables(self, engine, table_name, column):
-        table = oslo_utils.get_table(engine, table_name)
-        self.assertNotIn(column, table.columns)
+    BUILD_SCHEMA = False
 
     def execute_cmd(self, cmd=None):
         proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
@@ -486,23 +418,6 @@ class _TestWalkMigrations(base_tests.BaseTestCase, test_base.DbTestCase):
         self.assertEqual(0, proc.returncode, 'Command failed with '
                          'output:\n%s' % output)
 
-    @abc.abstractproperty
-    def BACKEND(self):
-        pass
-
-    @abc.abstractmethod
-    def _database_recreate(self, user, password, database, host):
-        pass
-
-    def _reset_databases(self):
-        for key, engine in self.engines.items():
-            conn_string = self.test_databases[key]
-            conn_pieces = parse.urlparse(conn_string)
-            engine.dispose()
-            user, password, database, host = oslo_utils.get_db_connection_info(
-                conn_pieces)
-            self._database_recreate(user, password, database, host)
-
     def _get_alembic_config(self, uri):
         db_config = migration.get_neutron_config()
         self.script_dir = alembic_script.ScriptDirectory.from_config(db_config)
@@ -512,71 +427,18 @@ class _TestWalkMigrations(base_tests.BaseTestCase, test_base.DbTestCase):
                                               group='database')
         return db_config
 
-    def _revisions(self, downgrade=False):
+    def _revisions(self):
         """Provides revisions and its parent revisions.
 
-        :param downgrade: whether to include downgrade behavior or not.
-        :type downgrade: Bool
         :return: List of tuples. Every tuple contains revision and its parent
         revision.
         """
         revisions = list(self.script_dir.walk_revisions("base", "heads"))
-        if not downgrade:
-            revisions = list(reversed(revisions))
+        revisions = list(reversed(revisions))
 
         for rev in revisions:
-            if downgrade:
-                # Destination, current
-                yield rev.down_revision, rev.revision
-            else:
-                # Destination, current
-                yield rev.revision, rev.down_revision
-
-    def _walk_versions(self, config, engine, downgrade=True, snake_walk=False):
-        """Test migrations ability to upgrade and downgrade.
-
-        :param downgrade: whether to include downgrade behavior or not.
-        :type downgrade: Bool
-        :snake_walk: enable mode when at every upgrade revision will be
-        downgraded and upgraded in previous state at upgrade and backward at
-        downgrade.
-        :type snake_walk: Bool
-        """
-
-        revisions = self._revisions()
-        for dest, curr in revisions:
-            self._migrate_up(config, engine, dest, curr, with_data=True)
-
-            if snake_walk and dest != 'None':
-                # NOTE(I159): Pass reversed arguments into `_migrate_down`
-                # method because we have been upgraded to a destination
-                # revision and now we going to downgrade back.
-                self._migrate_down(config, engine, curr, dest, with_data=True)
-                self._migrate_up(config, engine, dest, curr, with_data=True)
-
-        if downgrade:
-            revisions = self._revisions(downgrade)
-            for dest, curr in revisions:
-                self._migrate_down(config, engine, dest, curr, with_data=True)
-                if snake_walk:
-                    self._migrate_up(config, engine, curr,
-                                     dest, with_data=True)
-                    self._migrate_down(config, engine, dest,
-                                       curr, with_data=True)
-
-    def _migrate_down(self, config, engine, dest, curr, with_data=False):
-        # First upgrade it to current to do downgrade
-        if dest:
-            migration.do_alembic_command(config, 'downgrade', dest)
-        else:
-            meta = sqlalchemy.MetaData(bind=engine)
-            meta.drop_all()
-
-        if with_data:
-            post_downgrade = getattr(
-                self, "_post_downgrade_%s" % curr, None)
-            if post_downgrade:
-                post_downgrade(engine)
+            # Destination, current
+            yield rev.revision, rev.down_revision
 
     def _migrate_up(self, config, engine, dest, curr, with_data=False):
         if with_data:
@@ -591,71 +453,24 @@ class _TestWalkMigrations(base_tests.BaseTestCase, test_base.DbTestCase):
             if check and data:
                 check(engine, data)
 
+    def test_walk_versions(self):
+        """Test migrations ability to upgrade and downgrade.
 
-class TestWalkMigrationsMysql(_TestWalkMigrations):
-
-    BACKEND = 'mysql+pymysql'
-
-    def _database_recreate(self, user, password, database, host):
-        # We can execute the MySQL client to destroy and re-create
-        # the MYSQL database, which is easier and less error-prone
-        # than using SQLAlchemy to do this via MetaData...trust me.
-        sql = ("drop database if exists %(database)s; create "
-               "database %(database)s;") % {'database': database}
-        cmd = ("mysql -u \"%(user)s\" -p%(password)s -h %(host)s "
-               "-e \"%(sql)s\"") % {'user': user, 'password': password,
-                                    'host': host, 'sql': sql}
-        self.execute_cmd(cmd)
-
-    def test_mysql_opportunistically(self):
-        connect_string = oslo_utils.get_connect_string(self.BACKEND,
-            "openstack_citest", user="openstack_citest",
-            passwd="openstack_citest")
-        engine = session.create_engine(connect_string)
-        config = self._get_alembic_config(connect_string)
-        self.engines["mysqlcitest"] = engine
-        self.test_databases["mysqlcitest"] = connect_string
-
-        # build a fully populated mysql database with all the tables
-        self._reset_databases()
-        self._walk_versions(config, engine, False, False)
+        """
+        engine = self.engine
+        config = self._get_alembic_config(engine.url)
+        revisions = self._revisions()
+        for dest, curr in revisions:
+            self._migrate_up(config, engine, dest, curr, with_data=True)
 
 
-class TestWalkMigrationsPsql(_TestWalkMigrations):
+class TestWalkMigrationsMysql(testlib_api.MySQLTestCaseMixin,
+                              _TestWalkMigrations,
+                              testlib_api.SqlTestCaseLight):
+    pass
 
-    BACKEND = 'postgresql'
 
-    def _database_recreate(self, user, password, database, host):
-        os.environ['PGPASSWORD'] = password
-        os.environ['PGUSER'] = user
-        # note(boris-42): We must create and drop database, we can't
-        # drop database which we have connected to, so for such
-        # operations there is a special database template1.
-        sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
-                  " '%(sql)s' -d template1")
-        sql = "drop database if exists %(database)s;"
-        sql = sql % {'database': database}
-        droptable = sqlcmd % {'user': user, 'host': host,
-                              'sql': sql}
-        self.execute_cmd(droptable)
-        sql = "create database %(database)s;"
-        sql = sql % {'database': database}
-        createtable = sqlcmd % {'user': user, 'host': host,
-                                'sql': sql}
-        self.execute_cmd(createtable)
-
-    def test_postgresql_opportunistically(self):
-        # add this to the global lists to make reset work with it, it's removed
-        # automatically in tearDown so no need to clean it up here.
-        connect_string = oslo_utils.get_connect_string(self.BACKEND,
-                                                       "openstack_citest",
-                                                       "openstack_citest",
-                                                       "openstack_citest")
-        engine = session.create_engine(connect_string)
-        config = self._get_alembic_config(connect_string)
-        self.engines["postgresqlcitest"] = engine
-        self.test_databases["postgresqlcitest"] = connect_string
-
-        # build a fully populated postgresql database with all the tables
-        self._reset_databases()
-        self._walk_versions(config, engine, False, False)
+class TestWalkMigrationsPsql(testlib_api.PostgreSQLTestCaseMixin,
+                             _TestWalkMigrations,
+                             testlib_api.SqlTestCaseLight):
+    pass
diff --git a/neutron/tests/retargetable/client_fixtures.py b/neutron/tests/retargetable/client_fixtures.py
index 8703f43807a..e59bfdb6d94 100644
--- a/neutron/tests/retargetable/client_fixtures.py
+++ b/neutron/tests/retargetable/client_fixtures.py
@@ -73,7 +73,7 @@ class PluginClientFixture(AbstractClientFixture):
 
     def _setUp(self):
         super(PluginClientFixture, self)._setUp()
-        self.useFixture(testlib_api.SqlFixture())
+        self.useFixture(testlib_api.StaticSqlFixture())
         self.useFixture(self.plugin_conf)
         self.useFixture(base.PluginFixture(self.plugin_conf.plugin_name))
 
diff --git a/neutron/tests/unit/testlib_api.py b/neutron/tests/unit/testlib_api.py
index 30955cc221c..c6f3e50f0ad 100644
--- a/neutron/tests/unit/testlib_api.py
+++ b/neutron/tests/unit/testlib_api.py
@@ -15,10 +15,18 @@
 
 import fixtures
 import six
+import testresources
+import testscenarios
 import testtools
 
+from oslo_db import exception as oslodb_exception
+from oslo_db.sqlalchemy import enginefacade
+from oslo_db.sqlalchemy import provision
+from oslo_db.sqlalchemy import session
+
 from neutron.db import api as db_api
 # Import all data models
+from neutron.db.migration import cli as migration
 from neutron.db.migration.models import head  # noqa
 from neutron.db import model_base
 from neutron.tests import base
@@ -58,39 +66,323 @@ def create_request(path, body, content_type, method='GET',
 
 
 class SqlFixture(fixtures.Fixture):
+    """Base of a fixture which can create a schema and delete from
+    its tables.
 
-    # flag to indicate that the models have been loaded
-    _TABLES_ESTABLISHED = False
+    """
+
+    @classmethod
+    def _generate_schema(cls, engine):
+        model_base.BASEV2.metadata.create_all(engine)
+
+    def _delete_from_schema(self, engine):
+        with engine.begin() as conn:
+            for table in reversed(
+                    model_base.BASEV2.metadata.sorted_tables):
+                conn.execute(table.delete())
+
+    def _init_resources(self):
+        raise NotImplementedError()
 
     def _setUp(self):
-        # Register all data models
-        engine = db_api.get_engine()
-        if not SqlFixture._TABLES_ESTABLISHED:
-            model_base.BASEV2.metadata.create_all(engine)
-            SqlFixture._TABLES_ESTABLISHED = True
+        self._init_resources()
 
-        def clear_tables():
-            with engine.begin() as conn:
-                for table in reversed(
-                        model_base.BASEV2.metadata.sorted_tables):
-                    conn.execute(table.delete())
+        # check if the fixtures failed to get
+        # an engine.  The test setUp() itself should also be checking
+        # this and raising skipTest.
+        if not hasattr(self, 'engine'):
+            return
 
-        self.addCleanup(clear_tables)
+        engine = self.engine
+        self.addCleanup(lambda: self._delete_from_schema(engine))
+
+        self.sessionmaker = session.get_maker(engine)
+
+        self.enginefacade_factory = enginefacade._TestTransactionFactory(
+            self.engine, self.sessionmaker, apply_global=False,
+            synchronous_reader=True)
+
+        _restore_factory = db_api.context_manager._root_factory
+        _restore_facade = db_api._FACADE
+
+        db_api.context_manager._root_factory = self.enginefacade_factory
+
+        db_api._FACADE = self.enginefacade_factory.get_legacy_facade()
+
+        engine = db_api._FACADE.get_engine()
+
+        self.addCleanup(
+            lambda: setattr(
+                db_api.context_manager,
+                "_root_factory", _restore_factory))
+        self.addCleanup(
+            lambda: setattr(
+                db_api, "_FACADE", _restore_facade))
+
+        self.useFixture(EnableSQLiteFKsFixture(engine))
 
 
-class SqlTestCaseLight(base.DietTestCase):
+class EnableSQLiteFKsFixture(fixtures.Fixture):
+    """Turn SQLite PRAGMA foreign keys on and off for tests.
+
+    FIXME(zzzeek): figure out some way to get oslo.db test_base to honor
+    oslo_db.engines.create_engine() arguments like sqlite_fks as well
+    as handling that it needs to be turned off during drops.
+
+    """
+
+    def __init__(self, engine):
+        self.engine = engine
+
+    def _setUp(self):
+        if self.engine.name == 'sqlite':
+            self.engine.execute("PRAGMA foreign_keys=ON")
+
+            def disable_fks():
+                with self.engine.connect() as conn:
+                    conn.connection.rollback()
+                    conn.execute("PRAGMA foreign_keys=OFF")
+            self.addCleanup(disable_fks)
+
+
+class StaticSqlFixture(SqlFixture):
+    """Fixture which keeps a single sqlite memory database at the global
+    scope.
+
+    """
+
+    _GLOBAL_RESOURCES = False
+
+    @classmethod
+    def _init_resources(cls):
+        # this is a classlevel version of what testresources
+        # does w/ the resources attribute as well as the
+        # setUpResources() step (which requires a test instance, that
+        # SqlFixture does not have).  Because this is a SQLite memory
+        # database, we don't actually tear it down, so we can keep
+        # it running throughout all tests.
+        if cls._GLOBAL_RESOURCES:
+            return
+        else:
+            cls._GLOBAL_RESOURCES = True
+            cls.schema_resource = provision.SchemaResource(
+                provision.DatabaseResource("sqlite"),
+                cls._generate_schema, teardown=False)
+            dependency_resources = {}
+            for name, resource in cls.schema_resource.resources:
+                dependency_resources[name] = resource.getResource()
+            cls.schema_resource.make(dependency_resources)
+            cls.engine = dependency_resources['database'].engine
+
+
+class StaticSqlFixtureNoSchema(SqlFixture):
+    """Fixture which keeps a single sqlite memory database at the global
+    scope
+
+    """
+
+    _GLOBAL_RESOURCES = False
+
+    @classmethod
+    def _init_resources(cls):
+        if cls._GLOBAL_RESOURCES:
+            return
+        else:
+            cls._GLOBAL_RESOURCES = True
+            cls.database_resource = provision.DatabaseResource("sqlite")
+            dependency_resources = {}
+            for name, resource in cls.database_resource.resources:
+                dependency_resources[name] = resource.getResource()
+            cls.engine = dependency_resources['backend'].engine
+
+    def _delete_from_schema(self, engine):
+        pass
+
+
+class OpportunisticSqlFixture(SqlFixture):
+    """Fixture which uses testresources with oslo_db provisioning to
+    check for available backends and optimize test runs.
+
+    Requires that the test itself implement the resources attribute.
+
+    """
+
+    DRIVER = 'sqlite'
+
+    def __init__(self, test):
+        super(OpportunisticSqlFixture, self).__init__()
+        self.test = test
+
+    @classmethod
+    def _generate_schema_w_migrations(cls, engine):
+        alembic_config = migration.get_neutron_config()
+        with engine.connect() as conn:
+            alembic_config.attributes['connection'] = conn
+            migration.do_alembic_command(
+                alembic_config, 'upgrade', 'heads')
+
+    def _delete_from_schema(self, engine):
+        if self.test.BUILD_SCHEMA:
+            super(OpportunisticSqlFixture, self)._delete_from_schema(engine)
+
+    def _init_resources(self):
+        testresources.setUpResources(
+            self.test, self.test.resources, testresources._get_result())
+        self.addCleanup(
+            testresources.tearDownResources,
+            self.test, self.test.resources, testresources._get_result()
+        )
+
+        # unfortunately, fixtures won't let us call a skip() from
+        # here.  So the test has to check this also.
+        # see https://github.com/testing-cabal/fixtures/issues/31
+        if hasattr(self.test, 'db'):
+            self.engine = self.test.engine = self.test.db.engine
+
+    @classmethod
+    def resources_collection(cls, test):
+        # reimplement current oslo.db code.
+        # FIXME(zzzeek) The patterns here are up in the air enough
+        # that I think keeping this totally separate will give us the
+        # most leverage in being able to fix oslo.db in an upcoming
+        # release, then port neutron back to the working version.
+
+        driver = test.DRIVER
+
+        if driver not in test._database_resources:
+            try:
+                test._database_resources[driver] = \
+                    provision.DatabaseResource(driver)
+            except oslodb_exception.BackendNotAvailable:
+                test._database_resources[driver] = None
+
+        database_resource = test._database_resources[driver]
+        if database_resource is None:
+            return []
+
+        key = (driver, None)
+        if test.BUILD_SCHEMA:
+            if key not in test._schema_resources:
+                test._schema_resources[key] = provision.SchemaResource(
+                    database_resource,
+                    cls._generate_schema_w_migrations
+                    if test.BUILD_WITH_MIGRATIONS
+                    else cls._generate_schema, teardown=False)
+
+            schema_resource = test._schema_resources[key]
+            return [
+                ('schema', schema_resource),
+                ('db', database_resource)
+            ]
+        else:
+            return [
+                ('db', database_resource)
+            ]
+
+
+class BaseSqlTestCase(object):
+    BUILD_SCHEMA = True
+
+    def setUp(self):
+        super(BaseSqlTestCase, self).setUp()
+
+        self._setup_database_fixtures()
+
+    def _setup_database_fixtures(self):
+        if self.BUILD_SCHEMA:
+            fixture = StaticSqlFixture()
+        else:
+            fixture = StaticSqlFixtureNoSchema()
+        self.useFixture(fixture)
+        self.engine = fixture.engine
+
+
+class SqlTestCaseLight(BaseSqlTestCase, base.DietTestCase):
     """All SQL taste, zero plugin/rpc sugar"""
 
-    def setUp(self):
-        super(SqlTestCaseLight, self).setUp()
-        self.useFixture(SqlFixture())
+
+class SqlTestCase(BaseSqlTestCase, base.BaseTestCase):
+    """regular sql test"""
 
 
-class SqlTestCase(base.BaseTestCase):
+class OpportunisticDBTestMixin(object):
+    """Mixin that converts a BaseSqlTestCase to use the OpportunisticSqlFixture.
+    """
 
-    def setUp(self):
-        super(SqlTestCase, self).setUp()
-        self.useFixture(SqlFixture())
+    SKIP_ON_UNAVAILABLE_DB = not base.bool_from_env('OS_FAIL_ON_MISSING_DEPS')
+
+    FIXTURE = OpportunisticSqlFixture
+
+    BUILD_WITH_MIGRATIONS = False
+
+    def _setup_database_fixtures(self):
+        self.useFixture(self.FIXTURE(self))
+
+        if not hasattr(self, 'db'):
+            msg = "backend '%s' unavailable" % self.DRIVER
+            if self.SKIP_ON_UNAVAILABLE_DB:
+                self.skip(msg)
+            else:
+                self.fail(msg)
+
+    _schema_resources = {}
+    _database_resources = {}
+
+    @property
+    def resources(self):
+        """this attribute is used by testresources for optimized
+        sorting of tests.
+
+        This is the big requirement that allows testresources to sort
+        tests such that database "resources" can be kept open for
+        many tests at once.
+
+        IMO(zzzeek) "sorting" should not be needed; only that necessary
+        resources stay open as long as they are needed (or long enough to
+        reduce overhead).  testresources would be improved to not depend on
+        custom, incompatible-with-pytest "suite classes", fixture information
+        leaking out of the Fixture classes themselves, and exotic sorting
+        schemes for something that can nearly always be handled "good enough"
+        with unittest-standard setupclass/setupmodule schemes.
+
+        """
+
+        return self.FIXTURE.resources_collection(self)
+
+
+class MySQLTestCaseMixin(OpportunisticDBTestMixin):
+    """Mixin that turns any BaseSqlTestCase into a MySQL test suite.
+
+    If the MySQL db is unavailable then this test is skipped, unless
+    OS_FAIL_ON_MISSING_DEPS is enabled.
+    """
+    DRIVER = "mysql"
+
+
+class PostgreSQLTestCaseMixin(OpportunisticDBTestMixin):
+    """Mixin that turns any BaseSqlTestCase into a PostgresSQL test suite.
+
+    If the PostgreSQL db is unavailable then this test is skipped, unless
+    OS_FAIL_ON_MISSING_DEPS is enabled.
+    """
+    DRIVER = "postgresql"
+
+
+def module_load_tests(loader, found_tests, pattern):
+    """Apply OptimisingTestSuite on a per-module basis.
+
+    FIXME(zzzeek): oslo.db provides this but the contract that
+    "pattern" should be None no longer seems to behave as it used
+    to at the module level, so this function needs to be added in this
+    form.
+
+    """
+
+    result = testresources.OptimisingTestSuite()
+    found_tests = testscenarios.load_tests_apply_scenarios(
+        loader, found_tests, pattern)
+    result.addTest(found_tests)
+    return result
 
 
 class WebTestCase(SqlTestCase):