Add DB API testing framework
This commit adds functional tests for the db api. It is still a rough implementation right now, but to start I'm not worried about that too much since it works. Things will just be a bit slow, because it drops the db and runs the migrations on an empty db on each test method, locks against every other api test method and the migration tests, and requires a local mysql db exist with the hardcoded values from the migration opportunistic test functions. But it works! Ideally we would just use the in memory sqlite db for this testing since it provides the isolation we really desire for running individual tests. However, because the migrations don't work on sqlite this makes it kinda difficult. To mitigate the locking penalty a bit a group regex is added to the testr.conf to serialize tests at the class level. This will at least mean that api tests will run serially since they're all in the same class. In an effort to decrease code duplication as part of adding additional DB functional tests the existing migration tests are refactored to use a common set of fixtures and util methods to handle the test db setup and connection configuration information. We also should slowly ramp up the number of tests here since the coverage is very basic as this was intended to mostly start the framework. But that, can be a follow up patch. Change-Id: Iea6dd86498002ab20977b5fb02b4c7d751a08bca
This commit is contained in:
parent
580af9b48a
commit
6608e63b19
@ -6,3 +6,4 @@ test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
|
||||
${PYTHON:-python} -m subunit.run discover -t ./ ./subunit2sql/tests $LISTOPT $IDOPTION
|
||||
test_id_option=--load-list $IDFILE
|
||||
test_list_option=--list
|
||||
group_regex=([^\.]*\.)*
|
||||
|
@ -1,6 +1,7 @@
|
||||
alembic>=0.4.1
|
||||
oslo.config>=1.4.0.0a3
|
||||
oslo.db!=1.12.0,<2.0.0
|
||||
oslo.concurrency
|
||||
pbr>=1.0.0
|
||||
python-subunit>=0.0.18
|
||||
six>=1.5.2
|
||||
|
@ -68,6 +68,18 @@ def _filter_runs_by_date(query, start_date=None, stop_date=None):
|
||||
return query
|
||||
|
||||
|
||||
def get_engine(use_slave=False):
|
||||
"""Get a new sqlalchemy engine instance
|
||||
|
||||
:param bool use_slave if possible, use 'slave' database for this engine
|
||||
|
||||
:return: The engine object for the database connection
|
||||
:rtype: sqlalchemy.engine.Engine
|
||||
"""
|
||||
facade = _create_facade_lazily()
|
||||
return facade.get_engine(use_slave=use_slave)
|
||||
|
||||
|
||||
def create_test(test_id, run_count=0, success=0, failure=0, run_time=0.0,
|
||||
session=None):
|
||||
"""Create a new test record in the database.
|
||||
|
69
subunit2sql/tests/db/test_api.py
Normal file
69
subunit2sql/tests/db/test_api.py
Normal file
@ -0,0 +1,69 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import testscenarios
|
||||
|
||||
from subunit2sql.db import api
|
||||
from subunit2sql.tests import base
|
||||
from subunit2sql.tests import subunit2sql_fixtures as fixtures
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
|
||||
class TestDatabaseAPI(base.TestCase):
|
||||
|
||||
scenarios = [('mysql', {'dialect': 'mysql'})]
|
||||
|
||||
def setUp(self):
|
||||
super(TestDatabaseAPI, self).setUp()
|
||||
self.useFixture(fixtures.LockFixture(self.dialect))
|
||||
if self.dialect == 'mysql':
|
||||
self.useFixture(fixtures.MySQLConfFixture())
|
||||
else:
|
||||
self.useFixture(fixtures.PostgresConfFixture())
|
||||
self.useFixture(fixtures.Database())
|
||||
|
||||
def test_create_test(self):
|
||||
api.create_test('1234')
|
||||
res = api.get_all_tests()
|
||||
self.assertEqual(len(res), 1)
|
||||
self.assertEqual(res[0].test_id, '1234')
|
||||
|
||||
def test_create_test_and_get_by_test_id(self):
|
||||
create_res = api.create_test('fake_test', 2, 1, 1, 1.2)
|
||||
res = api.get_test_by_test_id('fake_test')
|
||||
self.assertEqual(res.id, create_res.id)
|
||||
self.assertEqual(res.test_id, 'fake_test')
|
||||
self.assertEqual(res.run_time, 1.2)
|
||||
self.assertEqual(res.run_count, 2)
|
||||
|
||||
def test_get_test_by_test_id_invalid_id(self):
|
||||
res = api.get_test_by_test_id('fake_test')
|
||||
self.assertIsNone(res)
|
||||
|
||||
def test_create_run_and_list(self):
|
||||
res = api.create_run()
|
||||
self.assertIsNotNone(res)
|
||||
all_runs = api.get_all_runs()
|
||||
self.assertEqual(len(all_runs), 1)
|
||||
self.assertEqual(res.id, all_runs[0].id)
|
||||
|
||||
def test_create_test_run_and_list(self):
|
||||
run = api.create_run()
|
||||
test = api.create_test('fake_test')
|
||||
test_run = api.create_test_run(test.id, run.id, 'fail')
|
||||
self.assertIsNotNone(test_run)
|
||||
all_test_runs = api.get_all_test_runs()
|
||||
self.assertEqual(len(all_test_runs), 1)
|
||||
self.assertEqual(test_run.id, all_test_runs[0].id)
|
82
subunit2sql/tests/db_test_utils.py
Normal file
82
subunit2sql/tests/db_test_utils.py
Normal file
@ -0,0 +1,82 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from alembic import command
|
||||
from alembic import config as alembic_config
|
||||
from oslo_config import cfg
|
||||
import sqlalchemy
|
||||
|
||||
from subunit2sql.db import api as session
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
script_location = os.path.join(os.path.dirname(os.path.dirname(
|
||||
os.path.abspath(__file__))), 'migrations')
|
||||
|
||||
|
||||
def get_connect_string(backend,
|
||||
user="openstack_citest",
|
||||
passwd="openstack_citest",
|
||||
database="openstack_citest"):
|
||||
"""Generate a db uri for testing locally.
|
||||
|
||||
Try to get a connection with a very specific set of values, if we get
|
||||
these then we'll run the tests, otherwise they are skipped
|
||||
"""
|
||||
if backend == "mysql":
|
||||
backend = "mysql+mysqldb"
|
||||
elif backend == "postgres":
|
||||
backend = "postgresql+psycopg2"
|
||||
|
||||
return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
|
||||
% {'backend': backend, 'user': user, 'passwd': passwd,
|
||||
'database': database})
|
||||
|
||||
|
||||
def is_backend_avail(backend,
|
||||
user="openstack_citest",
|
||||
passwd="openstack_citest",
|
||||
database="openstack_citest"):
|
||||
try:
|
||||
if backend == "mysql":
|
||||
connect_uri = get_connect_string("mysql", user=user,
|
||||
passwd=passwd, database=database)
|
||||
elif backend == "postgres":
|
||||
connect_uri = get_connect_string("postgres", user=user,
|
||||
passwd=passwd, database=database)
|
||||
engine = sqlalchemy.create_engine(connect_uri)
|
||||
connection = engine.connect()
|
||||
except Exception:
|
||||
# intentionally catch all to handle exceptions even if we don't
|
||||
# have any backend code loaded.
|
||||
return False
|
||||
else:
|
||||
connection.close()
|
||||
engine.dispose()
|
||||
return True
|
||||
|
||||
|
||||
def run_migration(target, engine=None):
|
||||
engine = engine or session.get_engine()
|
||||
engine.connect()
|
||||
config = alembic_config.Config(os.path.join(script_location,
|
||||
'alembic.ini'))
|
||||
config.set_main_option('script_location', 'subunit2sql:migrations')
|
||||
config.subunit2sql_config = CONF
|
||||
with engine.begin() as connection:
|
||||
config.attributes['connection'] = connection
|
||||
command.upgrade(config, target)
|
||||
engine.dispose()
|
@ -17,65 +17,16 @@
|
||||
import ConfigParser
|
||||
import datetime
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
|
||||
from alembic import config
|
||||
from alembic import script
|
||||
from oslo_config import cfg
|
||||
from oslo_db import options
|
||||
import six
|
||||
from six.moves.urllib import parse
|
||||
import sqlalchemy
|
||||
|
||||
from subunit2sql import exceptions as exc
|
||||
from subunit2sql.migrations import cli
|
||||
from subunit2sql.tests import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_cli_opts(options.database_opts, group='database')
|
||||
|
||||
|
||||
def _get_connect_string(backend,
|
||||
user="openstack_citest",
|
||||
passwd="openstack_citest",
|
||||
database="openstack_citest"):
|
||||
"""Generate a db uri for testing locally.
|
||||
|
||||
Try to get a connection with a very specific set of values, if we get
|
||||
these then we'll run the tests, otherwise they are skipped
|
||||
"""
|
||||
if backend == "mysql":
|
||||
backend = "mysql+mysqldb"
|
||||
elif backend == "postgres":
|
||||
backend = "postgresql+psycopg2"
|
||||
|
||||
return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
|
||||
% {'backend': backend, 'user': user, 'passwd': passwd,
|
||||
'database': database})
|
||||
|
||||
|
||||
def _is_backend_avail(backend,
|
||||
user="openstack_citest",
|
||||
passwd="openstack_citest",
|
||||
database="openstack_citest"):
|
||||
try:
|
||||
if backend == "mysql":
|
||||
connect_uri = _get_connect_string("mysql", user=user,
|
||||
passwd=passwd, database=database)
|
||||
elif backend == "postgres":
|
||||
connect_uri = _get_connect_string("postgres", user=user,
|
||||
passwd=passwd, database=database)
|
||||
engine = sqlalchemy.create_engine(connect_uri)
|
||||
connection = engine.connect()
|
||||
except Exception:
|
||||
# intentionally catch all to handle exceptions even if we don't
|
||||
# have any backend code loaded.
|
||||
return False
|
||||
else:
|
||||
connection.close()
|
||||
engine.dispose()
|
||||
return True
|
||||
from subunit2sql.tests import db_test_utils
|
||||
from subunit2sql.tests import subunit2sql_fixtures as fixtures
|
||||
|
||||
|
||||
def get_table(engine, name):
|
||||
@ -123,173 +74,44 @@ class TestWalkMigrations(base.TestCase):
|
||||
for key, value in self.test_databases.items():
|
||||
self.engines[key] = sqlalchemy.create_engine(value)
|
||||
|
||||
# We start each test case with a completely blank slate.
|
||||
self._reset_databases()
|
||||
|
||||
def assertColumnExists(self, engine, table, column):
|
||||
table = get_table(engine, table)
|
||||
self.assertIn(column, table.c)
|
||||
|
||||
def _reset_databases(self):
|
||||
def execute_cmd(cmd=None):
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, shell=True)
|
||||
output = proc.communicate()[0]
|
||||
self.assertEqual(0, proc.returncode, 'Command failed with '
|
||||
'output:\n%s' % output)
|
||||
for key, engine in self.engines.items():
|
||||
conn_string = self.test_databases[key]
|
||||
conn_pieces = parse.urlparse(conn_string)
|
||||
engine.dispose()
|
||||
if conn_string.startswith('sqlite'):
|
||||
# We can just delete the SQLite database, which is
|
||||
# the easiest and cleanest solution
|
||||
db_path = conn_pieces.path[1:]
|
||||
if os.path.exists(db_path):
|
||||
os.unlink(db_path)
|
||||
# No need to recreate the SQLite DB. SQLite will
|
||||
# create it for us if it's not there...
|
||||
elif conn_string.startswith('mysql'):
|
||||
# We can execute the MySQL client to destroy and re-create
|
||||
# the MYSQL database, which is easier and less error-prone
|
||||
# than using SQLAlchemy to do this via MetaData...trust me.
|
||||
database = conn_pieces.path.strip('/')
|
||||
loc_pieces = conn_pieces.netloc.split('@')
|
||||
host = loc_pieces[1]
|
||||
auth_pieces = loc_pieces[0].split(':')
|
||||
user = auth_pieces[0]
|
||||
password = ""
|
||||
if len(auth_pieces) > 1:
|
||||
if auth_pieces[1].strip():
|
||||
password = "-p\"%s\"" % auth_pieces[1]
|
||||
sql = ("drop database if exists %(database)s; create "
|
||||
"database %(database)s;") % {'database': database}
|
||||
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
|
||||
"-e \"%(sql)s\"") % {'user': user, 'password': password,
|
||||
'host': host, 'sql': sql}
|
||||
execute_cmd(cmd)
|
||||
elif conn_string.startswith('postgresql'):
|
||||
database = conn_pieces.path.strip('/')
|
||||
loc_pieces = conn_pieces.netloc.split('@')
|
||||
host = loc_pieces[1]
|
||||
|
||||
auth_pieces = loc_pieces[0].split(':')
|
||||
user = auth_pieces[0]
|
||||
password = ""
|
||||
if len(auth_pieces) > 1:
|
||||
password = auth_pieces[1].strip()
|
||||
# note(boris-42): This file is used for authentication
|
||||
# without password prompt.
|
||||
createpgpass = ("echo '*:*:*:%(user)s:%(password)s' > "
|
||||
"~/.pgpass && chmod 0600 ~/.pgpass" %
|
||||
{'user': user, 'password': password})
|
||||
execute_cmd(createpgpass)
|
||||
# note(boris-42): We must create and drop database, we can't
|
||||
# drop database which we have connected to, so for such
|
||||
# operations there is a special database template1.
|
||||
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
|
||||
" '%(sql)s' -d template1")
|
||||
sql = ("drop database if exists %(database)s;")
|
||||
sql = sql % {'database': database}
|
||||
droptable = sqlcmd % {'user': user, 'host': host,
|
||||
'sql': sql}
|
||||
execute_cmd(droptable)
|
||||
sql = ("create database %(database)s;")
|
||||
sql = sql % {'database': database}
|
||||
createtable = sqlcmd % {'user': user, 'host': host,
|
||||
'sql': sql}
|
||||
execute_cmd(createtable)
|
||||
|
||||
def _get_alembic_config(self, uri):
|
||||
db_config = config.Config(os.path.join(self.script_location,
|
||||
'alembic.ini'))
|
||||
db_config.set_main_option('script_location', 'subunit2sql:migrations')
|
||||
db_config.subunit2sql_config = CONF
|
||||
db_config.subunit2sql_config.set_override('connection',
|
||||
six.text_type(uri),
|
||||
group='database')
|
||||
self.script_dir = script.ScriptDirectory.from_config(db_config)
|
||||
return db_config
|
||||
|
||||
def _revisions(self, downgrade=False):
|
||||
def _revisions(self):
|
||||
"""Provides revisions and its parent revisions.
|
||||
|
||||
:param downgrade: whether to include downgrade behavior or not.
|
||||
:type downgrade: Bool
|
||||
:return: List of tuples. Every tuple contains revision and its parent
|
||||
revision.
|
||||
"""
|
||||
|
||||
revisions = list(self.script_dir.walk_revisions("base", "head"))
|
||||
|
||||
if not downgrade:
|
||||
revisions = list(reversed(revisions))
|
||||
db_config = config.Config(os.path.join(self.script_location,
|
||||
'alembic.ini'))
|
||||
db_config.set_main_option('script_location', 'subunit2sql:migrations')
|
||||
script_dir = script.ScriptDirectory.from_config(db_config)
|
||||
revisions = list(script_dir.walk_revisions("base", "head"))
|
||||
|
||||
if not revisions:
|
||||
raise exc.DbMigrationError('There is no suitable migrations.')
|
||||
|
||||
for rev in revisions:
|
||||
if downgrade:
|
||||
# Destination, current
|
||||
yield rev.down_revision, rev.revision
|
||||
else:
|
||||
# Destination, current
|
||||
yield rev.revision, rev.down_revision
|
||||
for rev in list(reversed(revisions)):
|
||||
# Destination, current
|
||||
yield rev.revision, rev.down_revision
|
||||
|
||||
def _walk_versions(self, config, engine, downgrade=True, snake_walk=False):
|
||||
"""Test migrations ability to upgrade and downgrade.
|
||||
|
||||
:param downgrade: whether to include downgrade behavior or not.
|
||||
:type downgrade: Bool
|
||||
:snake_walk: enable mode when at every upgrade revision will be
|
||||
downgraded and upgraded in previous state at upgrade and backward at
|
||||
downgrade.
|
||||
:type snake_walk: Bool
|
||||
"""
|
||||
def _walk_versions(self, engine):
|
||||
"""Test migrations ability to upgrade."""
|
||||
|
||||
revisions = self._revisions()
|
||||
for dest, curr in revisions:
|
||||
self._migrate_up(config, engine, dest, curr, with_data=True)
|
||||
self._migrate_up(engine, dest, curr, with_data=True)
|
||||
|
||||
if snake_walk and dest != 'None':
|
||||
# NOTE(I159): Pass reversed arguments into `_migrate_down`
|
||||
# method because we have been upgraded to a destination
|
||||
# revision and now we going to downgrade back.
|
||||
self._migrate_down(config, curr, dest, with_data=True)
|
||||
self._migrate_up(config, dest, curr, with_data=True)
|
||||
|
||||
if downgrade:
|
||||
revisions = self._revisions(downgrade)
|
||||
for dest, curr in revisions:
|
||||
self._migrate_down(config, engine, dest, curr, with_data=True)
|
||||
if snake_walk:
|
||||
self._migrate_up(config, engine, curr, dest,
|
||||
with_data=True)
|
||||
self._migrate_down(config, engine, dest, curr,
|
||||
with_data=True)
|
||||
|
||||
def _migrate_down(self, config, engine, dest, curr, with_data=False):
|
||||
|
||||
if dest:
|
||||
cli.do_alembic_command(config, 'downgrade', dest)
|
||||
else:
|
||||
meta = sqlalchemy.MetaData(bind=engine)
|
||||
meta.drop_all()
|
||||
|
||||
if with_data:
|
||||
post_downgrade = getattr(
|
||||
self, "_post_downgrade_%s" % curr, None)
|
||||
if post_downgrade:
|
||||
post_downgrade(engine)
|
||||
|
||||
def _migrate_up(self, config, engine, dest, curr, with_data=False):
|
||||
def _migrate_up(self, engine, dest, curr, with_data=False):
|
||||
if with_data:
|
||||
data = None
|
||||
pre_upgrade = getattr(
|
||||
self, "_pre_upgrade_%s" % dest, None)
|
||||
if pre_upgrade:
|
||||
data = pre_upgrade(engine)
|
||||
cli.do_alembic_command(config, 'upgrade', dest)
|
||||
db_test_utils.run_migration(dest, engine)
|
||||
if with_data:
|
||||
check = getattr(self, "_check_%s" % dest, None)
|
||||
if check and data:
|
||||
@ -302,8 +124,7 @@ class TestWalkMigrations(base.TestCase):
|
||||
that there are no errors in the version scripts for each engine
|
||||
"""
|
||||
for key, engine in self.engines.items():
|
||||
config = self._get_alembic_config(self.test_databases[key])
|
||||
self._walk_versions(config, engine, self.snake_walk)
|
||||
self._walk_versions(engine)
|
||||
|
||||
def test_mysql_connect_fail(self):
|
||||
"""Test graceful mysql connection failure.
|
||||
@ -311,24 +132,25 @@ class TestWalkMigrations(base.TestCase):
|
||||
Test that we can trigger a mysql connection failure and we fail
|
||||
gracefully to ensure we don't break people without mysql
|
||||
"""
|
||||
if _is_backend_avail('mysql', user="openstack_cifail"):
|
||||
if db_test_utils.is_backend_avail('mysql', user="openstack_cifail"):
|
||||
self.fail("Shouldn't have connected")
|
||||
|
||||
def test_mysql_opportunistically(self):
|
||||
if not db_test_utils.is_backend_avail('mysql'):
|
||||
raise self.skipTest('mysql is not available')
|
||||
|
||||
self.useFixture(fixtures.LockFixture('mysql'))
|
||||
self.useFixture(fixtures.MySQLConfFixture())
|
||||
# Test that table creation on mysql only builds InnoDB tables
|
||||
if not _is_backend_avail('mysql'):
|
||||
self.skipTest("mysql not available")
|
||||
# add this to the global lists to make reset work with it, it's removed
|
||||
# automatically in tearDown so no need to clean it up here.
|
||||
connect_string = _get_connect_string("mysql")
|
||||
connect_string = db_test_utils.get_connect_string("mysql")
|
||||
engine = sqlalchemy.create_engine(connect_string)
|
||||
config = self._get_alembic_config(connect_string)
|
||||
self.engines["mysqlcitest"] = engine
|
||||
self.test_databases["mysqlcitest"] = connect_string
|
||||
|
||||
# build a fully populated mysql database with all the tables
|
||||
self._reset_databases()
|
||||
self._walk_versions(config, engine, False, False)
|
||||
self._walk_versions(engine)
|
||||
|
||||
connection = engine.connect()
|
||||
# sanity check
|
||||
@ -352,24 +174,25 @@ class TestWalkMigrations(base.TestCase):
|
||||
Test that we can trigger a postgres connection failure and we fail
|
||||
gracefully to ensure we don't break people without postgres
|
||||
"""
|
||||
if _is_backend_avail('postgresql', user="openstack_cifail"):
|
||||
if db_test_utils.is_backend_avail('postgresql',
|
||||
user="openstack_cifail"):
|
||||
self.fail("Shouldn't have connected")
|
||||
|
||||
def test_postgresql_opportunistically(self):
|
||||
# Test postgresql database migration walk
|
||||
if not _is_backend_avail('postgres'):
|
||||
self.skipTest("postgresql not available")
|
||||
if not db_test_utils.is_backend_avail('postgres'):
|
||||
raise self.skipTest('postgres is not available')
|
||||
self.useFixture(fixtures.LockFixture('postgres'))
|
||||
self.useFixture(fixtures.PostgresConfFixture())
|
||||
# add this to the global lists to make reset work with it, it's removed
|
||||
# automatically in tearDown so no need to clean it up here.
|
||||
connect_string = _get_connect_string("postgres")
|
||||
connect_string = db_test_utils.get_connect_string("postgres")
|
||||
engine = sqlalchemy.create_engine(connect_string)
|
||||
config = self._get_alembic_config(connect_string)
|
||||
self.engines["postgresqlcitest"] = engine
|
||||
self.test_databases["postgresqlcitest"] = connect_string
|
||||
|
||||
# build a fully populated postgresql database with all the tables
|
||||
self._reset_databases()
|
||||
self._walk_versions(config, engine, False, False)
|
||||
self._walk_versions(engine)
|
||||
|
||||
def _pre_upgrade_1f92cfe8a6d3(self, engine):
|
||||
tests = get_table(engine, 'tests')
|
||||
|
146
subunit2sql/tests/subunit2sql_fixtures.py
Normal file
146
subunit2sql/tests/subunit2sql_fixtures.py
Normal file
@ -0,0 +1,146 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import urlparse
|
||||
|
||||
import fixtures as fix
|
||||
from oslo_concurrency.fixture import lockutils as lock_fixture
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import fixture as config_fixture
|
||||
from oslo_db import options
|
||||
|
||||
from subunit2sql.db import api as session
|
||||
from subunit2sql.migrations import cli
|
||||
from subunit2sql.tests import db_test_utils
|
||||
|
||||
DB_SCHEMA = ""
|
||||
|
||||
|
||||
def execute_cmd(cmd=None):
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT, shell=True)
|
||||
output = proc.communicate()[0]
|
||||
if proc.returncode != 0:
|
||||
raise Exception('Command failed with output:\n%s' % output)
|
||||
|
||||
|
||||
class Database(fix.Fixture):
|
||||
def _cache_schema(self):
|
||||
global DB_SCHEMA
|
||||
if not DB_SCHEMA:
|
||||
db_test_utils.run_migration("head")
|
||||
|
||||
def cleanup(self):
|
||||
engine = session.get_engine()
|
||||
engine.dispose()
|
||||
|
||||
def reset(self):
|
||||
self._cache_schema()
|
||||
engine = session.get_engine()
|
||||
engine.dispose()
|
||||
engine.connect()
|
||||
|
||||
def setUp(self):
|
||||
super(Database, self).setUp()
|
||||
self.reset()
|
||||
self.addCleanup(self.cleanup)
|
||||
|
||||
|
||||
class MySQLConfFixture(config_fixture.Config):
|
||||
"""Fixture to manage global conf settings."""
|
||||
def _drop_db(self):
|
||||
addr = urlparse.urlparse(self.url)
|
||||
database = addr.path.strip('/')
|
||||
loc_pieces = addr.netloc.split('@')
|
||||
host = loc_pieces[1]
|
||||
auth_pieces = loc_pieces[0].split(':')
|
||||
user = auth_pieces[0]
|
||||
password = ""
|
||||
if len(auth_pieces) > 1:
|
||||
if auth_pieces[1].strip():
|
||||
password = "-p\"%s\"" % auth_pieces[1]
|
||||
sql = ("drop database if exists %(database)s; create "
|
||||
"database %(database)s;") % {'database': database}
|
||||
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
|
||||
"-e \"%(sql)s\"") % {'user': user, 'password': password,
|
||||
'host': host, 'sql': sql}
|
||||
execute_cmd(cmd)
|
||||
|
||||
def setUp(self):
|
||||
super(MySQLConfFixture, self).setUp()
|
||||
self.register_opts(options.database_opts, group='database')
|
||||
self.url = db_test_utils.get_connect_string("mysql")
|
||||
self.set_default('connection', self.url, group='database')
|
||||
lockutils.set_defaults(lock_path='/tmp')
|
||||
self._drop_db()
|
||||
|
||||
|
||||
class PostgresConfFixture(config_fixture.Config):
|
||||
"""Fixture to manage global conf settings."""
|
||||
def _drop_db(self):
|
||||
addr = urlparse.urlparse(self.url)
|
||||
database = addr.path.strip('/')
|
||||
loc_pieces = addr.netloc.split('@')
|
||||
host = loc_pieces[1]
|
||||
|
||||
auth_pieces = loc_pieces[0].split(':')
|
||||
user = auth_pieces[0]
|
||||
password = ""
|
||||
if len(auth_pieces) > 1:
|
||||
password = auth_pieces[1].strip()
|
||||
pg_file = os.path.join(os.path.expanduser('~'), '.pgpass')
|
||||
if os.path.isfile(pg_file):
|
||||
tmp_path = os.path.join('/tmp', 'pgpass')
|
||||
shutil.move(pg_file, tmp_path)
|
||||
self.addCleanup(shutil.move, tmp_path, pg_file)
|
||||
|
||||
pg_pass = '*:*:*:%(user)s:%(password)s' % {
|
||||
'user': user, 'password': password}
|
||||
with open(pg_file, 'w') as fd:
|
||||
fd.write(pg_pass)
|
||||
os.chmod(pg_file, 384)
|
||||
# note(boris-42): We must create and drop database, we can't
|
||||
# drop database which we have connected to, so for such
|
||||
# operations there is a special database template1.
|
||||
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
|
||||
" '%(sql)s' -d template1")
|
||||
sql = ("drop database if exists %(database)s;")
|
||||
sql = sql % {'database': database}
|
||||
droptable = sqlcmd % {'user': user, 'host': host,
|
||||
'sql': sql}
|
||||
execute_cmd(droptable)
|
||||
sql = ("create database %(database)s;")
|
||||
sql = sql % {'database': database}
|
||||
createtable = sqlcmd % {'user': user, 'host': host,
|
||||
'sql': sql}
|
||||
execute_cmd(createtable)
|
||||
|
||||
def setUp(self):
|
||||
super(PostgresConfFixture, self).setUp()
|
||||
self.register_opts(options.database_opts, group='database')
|
||||
self.register_opts(cli.MIGRATION_OPTS)
|
||||
self.url = db_test_utils.get_connect_string("postgres")
|
||||
self.set_default('connection', self.url, group='database')
|
||||
self.set_default('disable_microsecond_data_migration', False)
|
||||
lockutils.set_defaults(lock_path='/tmp')
|
||||
self._drop_db()
|
||||
|
||||
|
||||
class LockFixture(lock_fixture.LockFixture):
|
||||
def __init__(self, name):
|
||||
lockutils.set_defaults(lock_path='/tmp')
|
||||
super(LockFixture, self).__init__(name, 'subunit-db-lock-')
|
@ -4,6 +4,7 @@ discover
|
||||
fixtures>=0.3.14
|
||||
mock>=1.0
|
||||
sphinx>=1.1.2,<1.2
|
||||
testscenarios>=0.4
|
||||
testrepository>=0.0.18
|
||||
testtools>=0.9.34
|
||||
oslosphinx
|
||||
|
Loading…
Reference in New Issue
Block a user