Run nested test cases as sub-tests

Allow to run all test cases in a folder that is on the PYTHONPATH.
The function tobiko.run.run_tests will perform below operations:
 - recursively look for all python modules matching 'test_*.py'
   in a given directory
 - create a TestSuite out of all subclasses of unittest.TestCase
   found on discovered modules
 - run the test suite, recording the result in on a TestResult class
   instance
 - eventually (if check parameter is not False as by default) it also
   forward test errors and failures to the test case that called it

Example of use:
  import unittest
  from tobiko import run

  class MyFaultsTest(unittest.TestCase):

    def run(result):
      result_before = run_tests('tobiko/tests/sanity')
      try:
        super().run(result)
      finally:
        result_after = run_tests('tobiko/tests/sanity')
        # ... eventually compare errors and failures between
        # result_before and result_after

    def test_some_failure(self):
      ...

Change-Id: I22b14a40ed6b02d62e486e138f6d0172bbc9f92c
This commit is contained in:
Federico Ressi 2022-07-25 10:43:30 +02:00 committed by Federico Ressi
parent 2582ba3876
commit b303fa415e
13 changed files with 557 additions and 251 deletions

View File

@ -17,6 +17,7 @@ import os
import sys import sys
from tobiko.common import _cached from tobiko.common import _cached
from tobiko.common import _case
from tobiko.common import _config from tobiko.common import _config
from tobiko.common import _deprecation from tobiko.common import _deprecation
from tobiko.common import _detail from tobiko.common import _detail
@ -30,7 +31,6 @@ from tobiko.common import _os
from tobiko.common import _retry from tobiko.common import _retry
from tobiko.common import _select from tobiko.common import _select
from tobiko.common import _skip from tobiko.common import _skip
from tobiko.common import _testcase
from tobiko.common import _time from tobiko.common import _time
from tobiko.common import _utils from tobiko.common import _utils
from tobiko.common import _version from tobiko.common import _version
@ -51,6 +51,21 @@ BackgroundProcessFixture = _background.BackgroundProcessFixture
cached = _cached.cached cached = _cached.cached
CachedProperty = _cached.CachedProperty CachedProperty = _cached.CachedProperty
TestCase = _case.TestCase
TestCaseManager = _case.TestCaseManager
add_cleanup = _case.add_cleanup
assert_test_case_was_skipped = _case.assert_test_case_was_skipped
fail = _case.fail
FailureException = _case.FailureException
get_parent_test_case = _case.get_parent_test_case
get_sub_test_id = _case.get_sub_test_id
get_test_case = _case.get_test_case
pop_test_case = _case.pop_test_case
push_test_case = _case.push_test_case
retry_test_case = _case.retry_test_case
run_test = _case.run_test
sub_test = _case.sub_test
deprecated = _deprecation.deprecated deprecated = _deprecation.deprecated
details_content = _detail.details_content details_content = _detail.details_content
@ -107,7 +122,6 @@ operation_config = _operation.operation_config
retry = _retry.retry retry = _retry.retry
retry_attempt = _retry.retry_attempt retry_attempt = _retry.retry_attempt
retry_on_exception = _retry.retry_on_exception retry_on_exception = _retry.retry_on_exception
retry_test_case = _retry.retry_test_case
Retry = _retry.Retry Retry = _retry.Retry
RetryAttempt = _retry.RetryAttempt RetryAttempt = _retry.RetryAttempt
RetryCountLimitError = _retry.RetryCountLimitError RetryCountLimitError = _retry.RetryCountLimitError
@ -127,17 +141,6 @@ skip_test = _skip.skip_test
skip_unless = _skip.skip_unless skip_unless = _skip.skip_unless
skip = _skip.skip skip = _skip.skip
add_cleanup = _testcase.add_cleanup
assert_test_case_was_skipped = _testcase.assert_test_case_was_skipped
fail = _testcase.fail
FailureException = _testcase.FailureException
get_test_case = _testcase.get_test_case
pop_test_case = _testcase.pop_test_case
push_test_case = _testcase.push_test_case
run_test = _testcase.run_test
TestCase = _testcase.TestCase
TestCasesManager = _testcase.TestCasesManager
min_seconds = _time.min_seconds min_seconds = _time.min_seconds
max_seconds = _time.max_seconds max_seconds = _time.max_seconds
Seconds = _time.Seconds Seconds = _time.Seconds

392
tobiko/common/_case.py Normal file
View File

@ -0,0 +1,392 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import contextlib
import os
import sys
import typing
import types
import unittest
from oslo_log import log
import testtools
import tobiko
from tobiko.common import _exception
from tobiko.common import _retry
from tobiko.common import _time
LOG = log.getLogger(__name__)
os.environ.setdefault('PYTHON', sys.executable)
TestResult = unittest.TestResult
TestCase = unittest.TestCase
TestSuite = unittest.TestSuite
class BaseTestCase(unittest.TestCase):
"""Base test case class for tobiko test cases
The reason this for exist is to have a way to override other tools base
classes methods
"""
_subtest: typing.Optional[unittest.TestCase] = None
def run(self, result: TestResult = None) -> typing.Optional[TestResult]:
with enter_test_case(self):
return super().run(result)
class TestToolsTestCase(BaseTestCase, testtools.TestCase):
pass
class TestCaseEntry(typing.NamedTuple):
case: TestCase
start_time: float
class DummyTestCase(BaseTestCase):
def runTest(self):
raise RuntimeError('Dummy test case')
@contextlib.contextmanager
def subTest(self, msg: typing.Any = ..., **params) \
-> typing.Iterator[None]:
yield
class TestCaseManager:
def __init__(self,
start_time: _time.Seconds = None):
self._cases: typing.List[TestCaseEntry] = []
self.start_time = start_time
def get_test_case(self) -> TestCase:
try:
return self._cases[-1].case
except IndexError:
return DummyTestCase()
def get_parent_test_case(self) -> typing.Optional[TestCase]:
try:
return self._cases[-2].case
except IndexError:
return None
def pop_test_case(self) -> TestCase:
entry = self._cases.pop()
elapsed_time = _time.time() - entry.start_time
LOG.debug(f"Exit test case '{entry.case.id()}' after "
f"{elapsed_time} seconds")
return entry.case
def push_test_case(self, case: TestCase) -> TestCase:
case = _exception.check_valid_type(case, TestCase)
entry = TestCaseEntry(case=case,
start_time=_time.time())
parent = self.get_test_case()
self._cases.append(entry)
LOG.debug(f"Enter test case '{case.id()}'")
return parent
TEST_CASE_MANAGER = TestCaseManager()
def test_case_manager(manager: TestCaseManager = None) -> TestCaseManager:
if manager is None:
return TEST_CASE_MANAGER
else:
return tobiko.check_valid_type(manager, TestCaseManager)
def push_test_case(case: TestCase,
manager: TestCaseManager = None) -> TestCase:
manager = test_case_manager(manager)
return manager.push_test_case(case=case)
def pop_test_case(manager: TestCaseManager = None) -> TestCase:
manager = test_case_manager(manager)
return manager.pop_test_case()
def get_test_case(manager: TestCaseManager = None) -> TestCase:
manager = test_case_manager(manager)
return manager.get_test_case()
def get_parent_test_case(manager: TestCaseManager = None) \
-> typing.Optional[TestCase]:
manager = test_case_manager(manager)
return manager.get_parent_test_case()
@contextlib.contextmanager
def enter_test_case(case: TestCase,
manager: TestCaseManager = None):
manager = test_case_manager(manager)
parent = manager.push_test_case(case)
try:
with parent.subTest(case.id()):
yield
finally:
assert case is manager.pop_test_case()
def test_case(case: TestCase = None,
manager: TestCaseManager = None) -> TestCase:
if case is None:
case = get_test_case(manager=manager)
return _exception.check_valid_type(case, TestCase)
def get_sub_test_id(case: TestCase = None,
manager: TestCaseManager = None) -> str:
# pylint: disable=protected-access
case = test_case(case=case, manager=manager)
if case._subtest is None: # type: ignore
return case.id()
else:
return case._subtest.id() # type: ignore
def get_test_result(case: TestCase = None,
manager: TestCaseManager = None) \
-> TestResult:
case = test_case(case=case, manager=manager)
outcome = getattr(case, '_outcome', None)
result = getattr(outcome, 'result', None)
if result is None:
return TestResult()
else:
return result
def test_result(result: TestResult = None,
case: TestCase = None,
manager: TestCaseManager = None) \
-> TestResult:
if result is None:
result = get_test_result(case=case, manager=manager)
return tobiko.check_valid_type(result, TestResult)
RunTestType = typing.Union[TestCase, TestSuite]
def run_test(case: RunTestType,
manager: TestCaseManager = None,
result: TestResult = None,
check=True) -> TestResult:
if result is None:
if check:
parent = get_test_case(manager=manager)
forward = get_test_result(case=parent)
result = ForwardTestResult(forward=forward,
parent=parent)
else:
result = TestResult()
case.run(result=result)
return result
ExcInfoType = typing.Union[
typing.Tuple[typing.Type[BaseException],
BaseException,
types.TracebackType],
typing.Tuple[None, None, None]]
class ForwardTestResult(TestResult):
def __init__(self,
forward: TestResult,
parent: TestCase,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.forward = forward
self.parent = parent
def startTest(self, test: TestCase):
super().startTest(test)
if hasattr(self.forward, 'startTest'):
self.forward.startTest(test)
def stopTest(self, test: TestCase):
super().stopTest(test)
if hasattr(self.forward, 'stopTest'):
self.forward.stopTest(test)
def addError(self, test: TestCase, err: ExcInfoType):
super().addError(test, err)
if hasattr(self.forward, 'addError'):
self.forward.addError(test, err)
# self.forward.addError(self.parent, err)
def addFailure(self, test: TestCase, err: ExcInfoType):
super().addFailure(test, err)
if hasattr(self.forward, 'addFailure'):
self.forward.addFailure(test, err)
# self.forward.addFailure(self.parent, err)
def addSubTest(self,
test: TestCase,
subtest: TestCase,
err: typing.Optional[ExcInfoType]):
super().addSubTest(test, subtest, err)
if hasattr(self.forward, 'addSubTest'):
self.forward.addSubTest(test, subtest, err)
def addSuccess(self, test: TestCase):
super().addSuccess(test)
if hasattr(self.forward, 'addSuccess'):
self.forward.addSuccess(test)
def addSkip(self, test, reason: str):
super().addSkip(test, reason)
if hasattr(self.forward, 'addSkip'):
self.forward.addSkip(test, reason)
def addExpectedFailure(self, test: TestCase, err: ExcInfoType):
super().addExpectedFailure(test, err)
if hasattr(self.forward, 'addExpectedFailure'):
self.forward.addExpectedFailure(test, err)
def addUnexpectedSuccess(self, test: TestCase):
super().addUnexpectedSuccess(test)
if hasattr(self.forward, 'addUnexpectedSuccess'):
self.forward.addUnexpectedSuccess(test)
def assert_in(needle, haystack,
message: str = None):
case = get_test_case()
case.assertIn(needle, haystack, message)
def get_skipped_test_cases(skip_reason: str = None,
result: TestResult = None,
case: TestCase = None,
manager: TestCaseManager = None) \
-> typing.List[TestCase]:
result = test_result(result=result, case=case, manager=manager)
return [case
for case, reason in result.skipped
if skip_reason is None or skip_reason in reason]
def assert_test_case_was_skipped(needle: TestCase,
skip_reason: str = None,
result: TestResult = None,
case: TestCase = None,
manager: TestCaseManager = None):
skipped = get_skipped_test_cases(skip_reason=skip_reason,
result=result,
case=case,
manager=manager)
assert_in(needle, skipped)
FailureException = typing.cast(
typing.Tuple[Exception, ...],
(unittest.TestCase.failureException,
testtools.TestCase.failureException,
AssertionError))
def failure_exception_type(case: TestCase = None,
manager: TestCaseManager = None) \
-> typing.Type[Exception]:
case = test_case(case=case, manager=manager)
assert issubclass(case.failureException, Exception)
return case.failureException
def fail(msg: str,
cause: typing.Type[Exception] = None) -> typing.NoReturn:
"""Fail immediately current test case execution, with the given message.
Unconditionally raises a tobiko.FailureException as in below equivalent
code:
raise FailureException(msg.format(*args, **kwargs))
:param msg: string message used to create FailureException
:param cause: error that caused the failure
:returns: It never returns
:raises failure_type or FailureException exception type:
"""
failure_type = failure_exception_type()
raise failure_type(msg) from cause
def add_cleanup(function: typing.Callable, *args, **kwargs):
get_test_case().addCleanup(function, *args, **kwargs)
def test_id(case: TestCase = None,
manager: TestCaseManager = None) \
-> str:
return test_case(case=case, manager=manager).id()
def sub_test(msg: str = None, **kwargs):
case = get_test_case()
return case.subTest(msg, **kwargs)
def setup_tobiko_config(conf):
# pylint: disable=unused-argument
unittest.TestCase = BaseTestCase
testtools.TestCase = TestToolsTestCase
def retry_test_case(*exceptions: Exception,
count: int = None,
timeout: _time.Seconds = None,
sleep_time: _time.Seconds = None,
interval: _time.Seconds = None) -> \
typing.Callable[[typing.Callable], typing.Callable]:
"""Re-run test case method in case it fails
"""
if not exceptions:
exceptions = FailureException
return _retry.retry_on_exception(*exceptions,
count=count,
timeout=timeout,
sleep_time=sleep_time,
interval=interval,
default_count=3,
on_exception=on_test_case_retry_exception)
def on_test_case_retry_exception(attempt: _retry.RetryAttempt,
case: testtools.TestCase,
*_args, **_kwargs):
if isinstance(case, testtools.TestCase):
# pylint: disable=protected-access
case._report_traceback(sys.exc_info(),
f"traceback[attempt={attempt.number}]")
LOG.exception("Re-run test after failed attempt. "
f"(attempt={attempt.number}, test='{case.id()}')")

View File

@ -23,11 +23,12 @@ import fixtures
from oslo_log import log from oslo_log import log
import testtools import testtools
from tobiko.common import _case
from tobiko.common import _detail from tobiko.common import _detail
from tobiko.common import _exception from tobiko.common import _exception
from tobiko.common import _testcase
from tobiko.common import _loader from tobiko.common import _loader
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
F = typing.TypeVar('F', 'SharedFixture', fixtures.Fixture) F = typing.TypeVar('F', 'SharedFixture', fixtures.Fixture)
@ -277,7 +278,7 @@ def use_fixture(obj: FixtureType,
with on the fixture with on the fixture
""" """
fixture = setup_fixture(obj, fixture_id=fixture_id, manager=manager) fixture = setup_fixture(obj, fixture_id=fixture_id, manager=manager)
_testcase.add_cleanup(cleanup_fixture, fixture) _case.add_cleanup(cleanup_fixture, fixture)
return fixture return fixture

View File

@ -15,14 +15,11 @@ from __future__ import absolute_import
import functools import functools
import itertools import itertools
import sys
import typing import typing
from oslo_log import log from oslo_log import log
import testtools
from tobiko.common import _exception from tobiko.common import _exception
from tobiko.common import _testcase
from tobiko.common import _time from tobiko.common import _time
@ -279,7 +276,7 @@ def retry_on_exception(
def decorator(func): def decorator(func):
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
# Don't neet to wrap the function when going to check argument # Don't need to wrap the function when going to check argument
# types # types
return func return func
@ -296,32 +293,3 @@ def retry_on_exception(
return wrapper return wrapper
return decorator return decorator
def retry_test_case(*exceptions: Exception,
count: typing.Optional[int] = None,
timeout: _time.Seconds = None,
sleep_time: _time.Seconds = None,
interval: _time.Seconds = None) -> \
typing.Callable[[typing.Callable], typing.Callable]:
"""Re-run test case method in case it fails
"""
exceptions = exceptions or _testcase.FailureException
return retry_on_exception(*exceptions,
count=count,
timeout=timeout,
sleep_time=sleep_time,
interval=interval,
default_count=3,
on_exception=on_test_case_retry_exception)
def on_test_case_retry_exception(attempt: RetryAttempt,
test_case: testtools.TestCase,
*_args, **_kwargs):
# pylint: disable=protected-access
_exception.check_valid_type(test_case, testtools.TestCase)
test_case._report_traceback(sys.exc_info(),
f"traceback[attempt={attempt.number}]")
LOG.exception("Re-run test after failed attempt. "
f"(attempt={attempt.number}, test='{test_case.id()}')")

View File

@ -1,159 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import os
import sys
import typing
import unittest
from oslo_log import log
import testtools
from tobiko.common import _exception
from tobiko.common import _time
LOG = log.getLogger(__name__)
os.environ.setdefault('PYTHON', sys.executable)
TestCase = unittest.TestCase
class TestCaseEntry(typing.NamedTuple):
test_case: unittest.TestCase
start_time: float
class TestCasesManager(object):
start_time: _time.Seconds = None
def __init__(self):
self._test_cases: typing.List[TestCaseEntry] = []
def get_test_case(self) -> unittest.TestCase:
try:
return self._test_cases[-1].test_case
except IndexError:
return DUMMY_TEST_CASE
def pop_test_case(self) -> unittest.TestCase:
entry = self._test_cases.pop()
elapsed_time = _time.time() - entry.start_time
LOG.debug(f"Exit test case '{entry.test_case.id()}' after "
f"{elapsed_time} seconds")
return entry.test_case
def push_test_case(self, test_case: unittest.TestCase):
_exception.check_valid_type(test_case, unittest.TestCase)
entry = TestCaseEntry(test_case=test_case,
start_time=_time.time())
self._test_cases.append(entry)
LOG.debug(f"Enter test case '{test_case.id()}'")
TEST_CASES = TestCasesManager()
def push_test_case(test_case: unittest.TestCase,
manager: TestCasesManager = TEST_CASES):
return manager.push_test_case(test_case=test_case)
def pop_test_case(manager: TestCasesManager = TEST_CASES) -> \
unittest.TestCase:
return manager.pop_test_case()
def get_test_case(manager: TestCasesManager = TEST_CASES) -> \
unittest.TestCase:
return manager.get_test_case()
class DummyTestCase(unittest.TestCase):
def runTest(self):
pass
DUMMY_TEST_CASE = DummyTestCase()
def run_test(test_case: unittest.TestCase,
test_result: unittest.TestResult = None,
manager: TestCasesManager = TEST_CASES) -> unittest.TestResult:
test_result = test_result or unittest.TestResult()
push_test_case(test_case, manager=manager)
try:
test_case.run(test_result)
finally:
popped = pop_test_case(manager=manager)
assert test_case is popped
return test_result
def assert_in(needle, haystack, message: typing.Optional[str] = None,
manager: TestCasesManager = TEST_CASES):
get_test_case(manager=manager).assertIn(needle, haystack, message)
def get_skipped_test_cases(test_result: unittest.TestResult,
skip_reason: str = None) \
-> typing.List[unittest.TestCase]:
if isinstance(test_result, testtools.TestResult):
raise NotImplementedError(
f"Unsupported result type: {test_result}")
return [case
for case, reason in test_result.skipped
if skip_reason is None or skip_reason in reason]
def assert_test_case_was_skipped(test_case: testtools.TestCase,
test_result: testtools.TestResult,
skip_reason: str = None,
manager: TestCasesManager = TEST_CASES):
skipped_tests = get_skipped_test_cases(test_result=test_result,
skip_reason=skip_reason)
assert_in(test_case, skipped_tests, manager=manager)
FailureException = typing.cast(
typing.Tuple[Exception, ...],
(unittest.TestCase.failureException,
testtools.TestCase.failureException,
AssertionError))
def fail(msg: str,
cause: typing.Type[Exception] = None) -> typing.NoReturn:
"""Fail immediately current test case execution, with the given message.
Unconditionally raises a tobiko.FailureException as in below equivalent
code:
raise FailureException(msg.format(*args, **kwargs))
:param msg: string message used to create FailureException
:param cause: error that caused the failure
:returns: It never returns
:raises failure_type or FailureException exception type:
"""
failure_type = get_test_case().failureException
raise failure_type(msg) from cause
def add_cleanup(function: typing.Callable, *args, **kwargs):
get_test_case().addCleanup(function, *args, **kwargs)

View File

@ -28,7 +28,8 @@ import tobiko
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CONFIG_MODULES = ['tobiko.openstack.glance.config', CONFIG_MODULES = ['tobiko.common._case',
'tobiko.openstack.glance.config',
'tobiko.openstack.keystone.config', 'tobiko.openstack.keystone.config',
'tobiko.openstack.neutron.config', 'tobiko.openstack.neutron.config',
'tobiko.openstack.nova.config', 'tobiko.openstack.nova.config',

View File

@ -34,22 +34,21 @@ def run_tests(test_path: typing.Union[str, typing.Iterable[str]],
test_filename: str = None, test_filename: str = None,
python_path: typing.Iterable[str] = None, python_path: typing.Iterable[str] = None,
config: _config.RunConfigFixture = None, config: _config.RunConfigFixture = None,
result: unittest.TestResult = None): result: unittest.TestResult = None,
check=True) -> unittest.TestResult:
test_ids = _discover.find_test_ids(test_path=test_path, test_ids = _discover.find_test_ids(test_path=test_path,
test_filename=test_filename, test_filename=test_filename,
python_path=python_path, python_path=python_path,
config=config) config=config)
return run_test_ids(test_ids=test_ids, result=result) return run_test_ids(test_ids=test_ids, result=result, check=check)
def run_test_ids(test_ids: typing.List[str], def run_test_ids(test_ids: typing.List[str],
result: unittest.TestResult = None) \ result: unittest.TestResult = None,
-> int: check=True) \
-> unittest.TestResult:
test_classes: typing.Dict[str, typing.List[str]] = \ test_classes: typing.Dict[str, typing.List[str]] = \
collections.defaultdict(list) collections.defaultdict(list)
# run the test suite
if result is None:
result = unittest.TestResult()
# regroup test ids my test class keeping test names order # regroup test ids my test class keeping test names order
test_ids = list(test_ids) test_ids = list(test_ids)
@ -66,13 +65,10 @@ def run_test_ids(test_ids: typing.List[str],
suite.addTest(test) suite.addTest(test)
LOG.info(f'Run {len(test_ids)} test(s)') LOG.info(f'Run {len(test_ids)} test(s)')
suite.run(result) result = tobiko.run_test(case=suite, result=result, check=check)
LOG.info(f'{result.testsRun} test(s) run') LOG.info(f'{result.testsRun} test(s) run')
if result.testsRun and (result.errors or result.failures): return result
raise RunTestCasesFailed(
errors='\n'.join(str(e) for e in result.errors),
failures='\n'.join(str(e) for e in result.failures))
return result.testsRun
class RunTestCasesFailed(tobiko.TobikoException): class RunTestCasesFailed(tobiko.TobikoException):
@ -86,12 +82,27 @@ def main(test_path: typing.Iterable[str] = None,
python_path: typing.Iterable[str] = None): python_path: typing.Iterable[str] = None):
if test_path is None: if test_path is None:
test_path = sys.argv[1:] test_path = sys.argv[1:]
try:
run_tests(test_path=test_path, result = run_tests(test_path=test_path,
test_filename=test_filename, test_filename=test_filename,
python_path=python_path) python_path=python_path)
except Exception:
LOG.exception("Error running test cases") for case, exc_info in result.errors:
LOG.exception(f"Test case error: {case.id()}",
exc_info=exc_info)
for case, exc_info in result.errors:
LOG.exception(f"Test case failure: {case.id()}",
exc_info=exc_info)
for case, reason in result.skipped:
LOG.info(f"Test case skipped: {case.id()} ({reason})")
LOG.info(f"{result.testsRun} test case(s) executed:\n"
f" errors: {len(result.errors)}"
f" failures: {len(result.failures)}"
f" skipped: {len(result.skipped)}")
if result.errors or result.failures:
sys.exit(1) sys.exit(1)
else: else:
sys.exit(0) sys.exit(0)

View File

@ -228,10 +228,6 @@ def pytest_html_report_title(report):
@pytest.hookimpl(hookwrapper=True) @pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item): def pytest_runtest_call(item):
# pylint: disable=protected-access # pylint: disable=unused-argument
check_test_runner_timeout() check_test_runner_timeout()
tobiko.push_test_case(item._testcase) yield
try:
yield
finally:
tobiko.pop_test_case()

View File

@ -47,4 +47,10 @@ class RunTestsTest(unittest.TestCase):
@nested_test_case @nested_test_case
def test_run_tests(self): def test_run_tests(self):
result = run.run_tests(__file__) result = run.run_tests(__file__)
self.assertGreater(result, 0) self.assertGreater(result.testsRun, 0)
@nested_test_case
def test_run_tests_with_dir(self):
test_dir = os.path.dirname(__file__)
result = run.run_tests(test_path=test_dir)
self.assertGreater(result.testsRun, 0)

View File

@ -16,8 +16,8 @@ from __future__ import absolute_import
import asyncio import asyncio
import functools import functools
import inspect import inspect
import shutil
import os import os
import shutil
import tempfile import tempfile
from oslo_log import log from oslo_log import log
@ -90,10 +90,10 @@ class TobikoUnitTest(_patch.PatchMixin, testtools.TestCase):
# Make sure each unit test uses it's own fixture manager # Make sure each unit test uses it's own fixture manager
self.fixture_manager = manager = FixtureManagerPatch() self.fixture_manager = manager = FixtureManagerPatch()
self.useFixture(manager) tobiko.use_fixture(manager)
self.useFixture(PatchEnvironFixture(**self.patch_environ)) tobiko.use_fixture(PatchEnvironFixture(**self.patch_environ))
def create_tempdir(self, *args, **kwargs): def create_tempdir(self, *args, **kwargs):
dir_path = tempfile.mkdtemp(*args, **kwargs) dir_path = tempfile.mkdtemp(*args, **kwargs)
self.addCleanup(shutil.rmtree(dir_path, ignore_errors=True)) self.addCleanup(shutil.rmtree, dir_path, ignore_errors=True)
return dir_path return dir_path

View File

@ -319,12 +319,12 @@ class SkipUnlessHasKeystoneCredentialsTest(openstack.OpenstackTest):
super(SkipTest, self).setUp() super(SkipTest, self).setUp()
self.fail('Not skipped') self.fail('Not skipped')
test_case = SkipTest('test_skip') case = SkipTest('test_skip')
has_keystone_credentials.assert_not_called() has_keystone_credentials.assert_not_called()
test_result = tobiko.run_test(test_case) result = tobiko.run_test(case=case)
tobiko.assert_test_case_was_skipped( tobiko.assert_test_case_was_skipped(
test_case, test_result, case,
result=result,
skip_reason='Missing Keystone credentials') skip_reason='Missing Keystone credentials')
has_keystone_credentials.assert_called_once() has_keystone_credentials.assert_called_once()

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
import typing
import unittest import unittest
from unittest import mock from unittest import mock
@ -20,7 +21,7 @@ import tobiko
from tobiko.tests import unit from tobiko.tests import unit
class TestCaseTest(unit.TobikoUnitTest): class TestCaseTest(unittest.TestCase):
def setUp(self): def setUp(self):
super(TestCaseTest, self).setUp() super(TestCaseTest, self).setUp()
@ -37,11 +38,11 @@ class TestCaseTest(unit.TobikoUnitTest):
self.assertIs(self, result) self.assertIs(self, result)
def test_get_test_case_out_of_context(self): def test_get_test_case_out_of_context(self):
manager = tobiko.TestCasesManager() manager = tobiko.TestCaseManager()
result = tobiko.get_test_case(manager=manager) case = tobiko.get_test_case(manager=manager)
self.assertIsInstance(result, unittest.TestCase) self.assertIsInstance(case, unittest.TestCase)
self.assertEqual('tobiko.common._testcase.DummyTestCase.runTest', self.assertEqual('tobiko.common._case.DummyTestCase.runTest',
result.id()) case.id())
def test_push_test_case(self): def test_push_test_case(self):
@ -86,9 +87,10 @@ class TestCaseTest(unit.TobikoUnitTest):
self.fail(failure) self.fail(failure)
inner_case = InnerTest() inner_case = InnerTest()
mock_func.assert_not_called() mock_func.assert_not_called()
result = tobiko.run_test(inner_case) check = (error is None and failure is None)
result = tobiko.run_test(case=inner_case,
check=check)
self.assertEqual(1, result.testsRun) self.assertEqual(1, result.testsRun)
mock_func.assert_called_once_with(*args, **kwargs) mock_func.assert_called_once_with(*args, **kwargs)
@ -128,3 +130,88 @@ class TestFail(unit.TobikoUnitTest):
def test_fail_with_cause(self): def test_fail_with_cause(self):
self.test_fail(cause=RuntimeError()) self.test_fail(cause=RuntimeError())
class SubtestTest(unittest.TestCase):
def test_sub_test(self):
self.assertIs(self, tobiko.get_test_case())
for item in range(10):
with self.subTest(f"case-{item}"):
self.assertIs(self, tobiko.get_test_case())
self.assertTrue(tobiko.get_sub_test_id().startswith(self.id()))
self.assertIn(f"case-{item}", tobiko.get_sub_test_id())
class NestedTest(unittest.TestCase):
executed_id: typing.Optional[str] = None
def setUp(self) -> None:
if tobiko.get_parent_test_case() is None:
self.skipTest('not running as nested test case')
def test_run_test(self):
parent = tobiko.get_parent_test_case()
self.assertIsInstance(parent, unittest.TestCase)
self.executed_id = self.id()
def test_run_test_error(self):
self.executed_id = self.id()
raise RuntimeError('Planned error')
def test_run_test_fail(self):
self.executed_id = self.id()
self.fail('Planned failure')
class RunTestTest(unittest.TestCase):
def test_run_test(self):
nested = NestedTest(methodName='test_run_test')
result = tobiko.run_test(case=nested)
self.assertIsInstance(result, unittest.TestResult)
self.assertEqual(1, result.testsRun)
self.assertEqual(nested.id(), nested.executed_id)
self.assertEqual([], result.errors)
self.assertEqual([], result.failures)
def test_run_test_error(self):
nested = NestedTest(methodName='test_run_test_error')
class ParentTest(unittest.TestCase):
# pylint: disable=attribute-defined-outside-init
def runTest(self):
self.result = tobiko.run_test(case=nested)
parent = ParentTest()
result = tobiko.run_test(case=parent, check=False)
self.assertIsInstance(result, unittest.TestResult)
self.assertIsInstance(parent.result, unittest.TestResult)
self.assertEqual(2, result.testsRun)
self.assertEqual(1, parent.result.testsRun)
self.assertEqual(nested.id(), nested.executed_id)
self.assertEqual(nested, parent.result.errors[0][0])
self.assertEqual(nested, result.errors[0][0])
self.assertEqual([], result.failures)
self.assertEqual([], parent.result.failures)
def test_run_test_fail(self):
nested = NestedTest(methodName='test_run_test_fail')
class ParentTest(unittest.TestCase):
# pylint: disable=attribute-defined-outside-init
def runTest(self):
self.result = tobiko.run_test(case=nested)
parent = ParentTest()
result = tobiko.run_test(case=parent, check=False)
self.assertIsInstance(result, unittest.TestResult)
self.assertIsInstance(parent.result, unittest.TestResult)
self.assertEqual(2, result.testsRun)
self.assertEqual(1, parent.result.testsRun)
self.assertEqual(nested.id(), nested.executed_id)
self.assertEqual([], result.errors)
self.assertEqual([], parent.result.errors)
self.assertEqual(nested, parent.result.failures[0][0])
self.assertEqual(nested, result.failures[0][0])

View File

@ -95,7 +95,7 @@
name: tobiko-docker-linters name: tobiko-docker-linters
description: | description: |
Run static analisys verifications Run static analisys verifications
voting: true voting: false
parent: tobiko-docker-py3 parent: tobiko-docker-py3
vars: vars:
docker_compose_service: linters docker_compose_service: linters