Migrate base test class to testtools.
This is step one in moving towards testr. Change-Id: Ieac7fbe34ddc9ca05812faca76b68ff7d37ca708
This commit is contained in:
parent
af023fe0cc
commit
11b184f01d
@ -24,13 +24,14 @@ inline callbacks.
|
||||
"""
|
||||
|
||||
import functools
|
||||
import unittest
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mox
|
||||
import nose.plugins.skip
|
||||
from oslo.config import cfg
|
||||
import stubout
|
||||
import testtools
|
||||
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
@ -63,7 +64,7 @@ class skip_test(object):
|
||||
@functools.wraps(func)
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
raise nose.SkipTest(self.message)
|
||||
raise testtools.TestCase.skipException(self.message)
|
||||
return _skipper
|
||||
|
||||
|
||||
@ -78,7 +79,7 @@ class skip_if(object):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
if self.condition:
|
||||
raise nose.SkipTest(self.message)
|
||||
raise testtools.TestCase.skipException(self.message)
|
||||
func(*args, **kw)
|
||||
return _skipper
|
||||
|
||||
@ -94,7 +95,7 @@ class skip_unless(object):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
if not self.condition:
|
||||
raise nose.SkipTest(self.message)
|
||||
raise testtools.TestCase.skipException(self.message)
|
||||
func(*args, **kw)
|
||||
return _skipper
|
||||
|
||||
@ -104,7 +105,8 @@ def skip_if_fake(func):
|
||||
def _skipper(*args, **kw):
|
||||
"""Wrapped skipper function."""
|
||||
if FLAGS.fake_tests:
|
||||
raise unittest.SkipTest('Test cannot be run in fake mode')
|
||||
raise testtools.TestCase.skipException(
|
||||
'Test cannot be run in fake mode')
|
||||
else:
|
||||
return func(*args, **kw)
|
||||
return _skipper
|
||||
@ -114,13 +116,35 @@ class TestingException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TestCase(unittest.TestCase):
|
||||
class TestCase(testtools.TestCase):
|
||||
"""Test case base class for all unit tests."""
|
||||
|
||||
def setUp(self):
|
||||
"""Run before each test method to initialize test environment."""
|
||||
super(TestCase, self).setUp()
|
||||
|
||||
test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
|
||||
try:
|
||||
test_timeout = int(test_timeout)
|
||||
except ValueError:
|
||||
# If timeout value is invalid do not set a timeout.
|
||||
test_timeout = 0
|
||||
if test_timeout > 0:
|
||||
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
|
||||
self.useFixture(fixtures.NestedTempfile())
|
||||
self.useFixture(fixtures.TempHomeDir())
|
||||
|
||||
if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
|
||||
os.environ.get('OS_STDOUT_CAPTURE') == '1'):
|
||||
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
|
||||
if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
|
||||
os.environ.get('OS_STDERR_CAPTURE') == '1'):
|
||||
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
|
||||
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
|
||||
|
||||
self.log_fixture = self.useFixture(fixtures.FakeLogger())
|
||||
|
||||
fake_flags.set_defaults(FLAGS)
|
||||
flags.parse_args([], default_config_files=[])
|
||||
|
||||
@ -134,41 +158,38 @@ class TestCase(unittest.TestCase):
|
||||
# because it screws with our generators
|
||||
self.mox = mox.Mox()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
self.addCleanup(self.mox.UnsetStubs)
|
||||
self.addCleanup(self.stubs.UnsetAll)
|
||||
self.addCleanup(self.stubs.SmartUnsetAll)
|
||||
self.addCleanup(self.mox.VerifyAll)
|
||||
self.injected = []
|
||||
self._services = []
|
||||
FLAGS.set_override('fatal_exception_format_errors', True)
|
||||
self.addCleanup(FLAGS.reset)
|
||||
|
||||
def tearDown(self):
|
||||
"""Runs after each test method to tear down test environment."""
|
||||
try:
|
||||
self.mox.UnsetStubs()
|
||||
self.stubs.UnsetAll()
|
||||
self.stubs.SmartUnsetAll()
|
||||
self.mox.VerifyAll()
|
||||
super(TestCase, self).tearDown()
|
||||
finally:
|
||||
# Reset any overridden flags
|
||||
FLAGS.reset()
|
||||
|
||||
# Stop any timers
|
||||
for x in self.injected:
|
||||
try:
|
||||
x.stop()
|
||||
except AssertionError:
|
||||
pass
|
||||
# Stop any timers
|
||||
for x in self.injected:
|
||||
try:
|
||||
x.stop()
|
||||
except AssertionError:
|
||||
pass
|
||||
|
||||
# Kill any services
|
||||
for x in self._services:
|
||||
try:
|
||||
x.kill()
|
||||
except Exception:
|
||||
pass
|
||||
# Kill any services
|
||||
for x in self._services:
|
||||
try:
|
||||
x.kill()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Delete attributes that don't start with _ so they don't pin
|
||||
# memory around unnecessarily for the duration of the test
|
||||
# suite
|
||||
for key in [k for k in self.__dict__.keys() if k[0] != '_']:
|
||||
del self.__dict__[key]
|
||||
# Delete attributes that don't start with _ so they don't pin
|
||||
# memory around unnecessarily for the duration of the test
|
||||
# suite
|
||||
for key in [k for k in self.__dict__.keys() if k[0] != '_']:
|
||||
del self.__dict__[key]
|
||||
super(TestCase, self).tearDown()
|
||||
|
||||
def flags(self, **kw):
|
||||
"""Override flag variables for a test."""
|
||||
|
@ -29,9 +29,6 @@ class AdminActionsTest(test.TestCase):
|
||||
self.flags(lock_path=self.tempdir)
|
||||
self.volume_api = volume_api.API()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
|
||||
def test_reset_status_as_admin(self):
|
||||
# admin context
|
||||
ctx = context.RequestContext('admin', 'fake', True)
|
||||
|
@ -749,6 +749,7 @@ class WsgiLimiterProxyTest(BaseLimitTestSuite):
|
||||
def tearDown(self):
|
||||
# restore original HTTPConnection object
|
||||
httplib.HTTPConnection = self.oldHTTPConnection
|
||||
super(WsgiLimiterProxyTest, self).tearDown()
|
||||
|
||||
|
||||
class LimitsViewBuilderTest(test.TestCase):
|
||||
|
@ -748,6 +748,7 @@ class WsgiLimiterProxyTest(BaseLimitTestSuite):
|
||||
def tearDown(self):
|
||||
# restore original HTTPConnection object
|
||||
httplib.HTTPConnection = self.oldHTTPConnection
|
||||
super(WsgiLimiterProxyTest, self).tearDown()
|
||||
|
||||
|
||||
class LimitsViewBuilderTest(test.TestCase):
|
||||
|
@ -16,7 +16,6 @@
|
||||
# under the License.
|
||||
|
||||
import time
|
||||
import unittest
|
||||
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder import service
|
||||
@ -193,6 +192,3 @@ class VolumesTest(integrated_helpers._IntegratedTestBase):
|
||||
found_volume = self.api.get_volume(created_volume_id)
|
||||
self.assertEqual(created_volume_id, found_volume['id'])
|
||||
self.assertEqual(found_volume['display_name'], 'vol-one')
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -59,6 +59,7 @@ class GlusterFsDriverTestCase(test.TestCase):
|
||||
ONE_GB_IN_BYTES = 1024 * 1024 * 1024
|
||||
|
||||
def setUp(self):
|
||||
super(GlusterFsDriverTestCase, self).setUp()
|
||||
self._mox = mox_lib.Mox()
|
||||
self._configuration = mox_lib.MockObject(conf.Configuration)
|
||||
self._configuration.append_config_values(mox_lib.IgnoreArg())
|
||||
@ -77,6 +78,7 @@ class GlusterFsDriverTestCase(test.TestCase):
|
||||
def tearDown(self):
|
||||
self._mox.UnsetStubs()
|
||||
self.stubs.UnsetAll()
|
||||
super(GlusterFsDriverTestCase, self).tearDown()
|
||||
|
||||
def stub_out_not_replaying(self, obj, attr_name):
|
||||
attr_to_replace = getattr(obj, attr_name)
|
||||
|
@ -78,12 +78,11 @@ class NetappNfsDriverTestCase(test.TestCase):
|
||||
"""Test case for NetApp specific NFS clone driver."""
|
||||
|
||||
def setUp(self):
|
||||
super(NetappNfsDriverTestCase, self).setUp()
|
||||
self._mox = mox.Mox()
|
||||
self._driver = netapp_nfs.NetAppNFSDriver(
|
||||
configuration=create_configuration())
|
||||
|
||||
def tearDown(self):
|
||||
self._mox.UnsetStubs()
|
||||
self.addCleanup(self._mox.UnsetStubs)
|
||||
|
||||
def test_check_for_setup_error(self):
|
||||
mox = self._mox
|
||||
@ -286,16 +285,15 @@ class NetappCmodeNfsDriverTestCase(test.TestCase):
|
||||
"""Test case for NetApp C Mode specific NFS clone driver"""
|
||||
|
||||
def setUp(self):
|
||||
super(NetappCmodeNfsDriverTestCase, self).setUp()
|
||||
self._mox = mox.Mox()
|
||||
self._custom_setup()
|
||||
self.addCleanup(self._mox.UnsetStubs)
|
||||
|
||||
def _custom_setup(self):
|
||||
self._driver = netapp_nfs.NetAppCmodeNfsDriver(
|
||||
configuration=create_configuration())
|
||||
|
||||
def tearDown(self):
|
||||
self._mox.UnsetStubs()
|
||||
|
||||
def test_check_for_setup_error(self):
|
||||
mox = self._mox
|
||||
drv = self._driver
|
||||
|
@ -51,12 +51,10 @@ class RemoteFsDriverTestCase(test.TestCase):
|
||||
TEST_FILE_NAME = 'test.txt'
|
||||
|
||||
def setUp(self):
|
||||
super(RemoteFsDriverTestCase, self).setUp()
|
||||
self._driver = nfs.RemoteFsDriver()
|
||||
self._mox = mox_lib.Mox()
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
self._mox.UnsetStubs()
|
||||
self.addCleanup(self._mox.UnsetStubs)
|
||||
|
||||
def test_create_sparsed_file(self):
|
||||
(mox, drv) = self._mox, self._driver
|
||||
@ -119,6 +117,7 @@ class NfsDriverTestCase(test.TestCase):
|
||||
ONE_GB_IN_BYTES = 1024 * 1024 * 1024
|
||||
|
||||
def setUp(self):
|
||||
super(NfsDriverTestCase, self).setUp()
|
||||
self._mox = mox_lib.Mox()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
self.configuration = mox_lib.MockObject(conf.Configuration)
|
||||
@ -130,10 +129,8 @@ class NfsDriverTestCase(test.TestCase):
|
||||
self.configuration.nfs_sparsed_volumes = True
|
||||
self._driver = nfs.NfsDriver(configuration=self.configuration)
|
||||
self._driver.shares = {}
|
||||
|
||||
def tearDown(self):
|
||||
self._mox.UnsetStubs()
|
||||
self.stubs.UnsetAll()
|
||||
self.addCleanup(self.stubs.UnsetAll)
|
||||
self.addCleanup(self._mox.UnsetStubs)
|
||||
|
||||
def stub_out_not_replaying(self, obj, attr_name):
|
||||
attr_to_replace = getattr(obj, attr_name)
|
||||
|
@ -27,7 +27,6 @@ Tests for the IBM Storwize family and SVC volume driver.
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import unittest
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
@ -1908,9 +1907,7 @@ class StorwizeSVCDriverTestCase(test.TestCase):
|
||||
self.assertAlmostEqual(stats['free_capacity_gb'], 3287.5)
|
||||
|
||||
|
||||
# The test case does not rely on Openstack runtime,
|
||||
# so it should inherit from unittest.TestCase.
|
||||
class CLIResponseTestCase(unittest.TestCase):
|
||||
class CLIResponseTestCase(test.TestCase):
|
||||
def test_empty(self):
|
||||
self.assertEqual(0, len(storwize_svc.CLIResponse('')))
|
||||
self.assertEqual(0, len(storwize_svc.CLIResponse(('', 'stderr'))))
|
||||
@ -1973,6 +1970,3 @@ port_speed!8Gb
|
||||
self.assertEqual(list(resp.select('port_id', 'port_status')),
|
||||
[('500507680210C744', 'active'),
|
||||
('500507680240C744', 'inactive')])
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -21,7 +21,6 @@
|
||||
import os.path
|
||||
import ssl
|
||||
import tempfile
|
||||
import unittest
|
||||
import urllib2
|
||||
|
||||
from oslo.config import cfg
|
||||
@ -54,7 +53,7 @@ class TestLoaderNothingExists(test.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class TestLoaderNormalFilesystem(unittest.TestCase):
|
||||
class TestLoaderNormalFilesystem(test.TestCase):
|
||||
"""Loader tests with normal filesystem (unmodified os.path module)."""
|
||||
|
||||
_paste_config = """
|
||||
@ -64,11 +63,13 @@ document_root = /tmp
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestLoaderNormalFilesystem, self).setUp()
|
||||
self.config = tempfile.NamedTemporaryFile(mode="w+t")
|
||||
self.config.write(self._paste_config.lstrip())
|
||||
self.config.seek(0)
|
||||
self.config.flush()
|
||||
self.loader = cinder.wsgi.Loader(self.config.name)
|
||||
self.addCleanup(self.config.close)
|
||||
|
||||
def test_config_found(self):
|
||||
self.assertEquals(self.config.name, self.loader.config_path)
|
||||
@ -84,11 +85,8 @@ document_root = /tmp
|
||||
url_parser = self.loader.load_app("test_app")
|
||||
self.assertEquals("/tmp", url_parser.directory)
|
||||
|
||||
def tearDown(self):
|
||||
self.config.close()
|
||||
|
||||
|
||||
class TestWSGIServer(unittest.TestCase):
|
||||
class TestWSGIServer(test.TestCase):
|
||||
"""WSGI server tests."""
|
||||
def _ipv6_configured():
|
||||
try:
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
import contextlib
|
||||
import StringIO
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import mox
|
||||
@ -26,6 +25,7 @@ from oslo.config import cfg
|
||||
|
||||
from cinder.db import api as db_api
|
||||
from cinder import exception
|
||||
from cinder import test
|
||||
from cinder.volume import configuration as conf
|
||||
from cinder.volume import driver as parent_driver
|
||||
from cinder.volume.drivers.xenapi import lib
|
||||
@ -51,7 +51,7 @@ def get_configured_driver(server='ignore_server', path='ignore_path'):
|
||||
return driver.XenAPINFSDriver(configuration=configuration)
|
||||
|
||||
|
||||
class DriverTestCase(unittest.TestCase):
|
||||
class DriverTestCase(test.TestCase):
|
||||
|
||||
def assert_flag(self, flagname):
|
||||
self.assertTrue(hasattr(driver.FLAGS, flagname))
|
||||
@ -488,7 +488,7 @@ class DriverTestCase(unittest.TestCase):
|
||||
self.assertEquals('xensm', stats['storage_protocol'])
|
||||
|
||||
|
||||
class ToolsTest(unittest.TestCase):
|
||||
class ToolsTest(test.TestCase):
|
||||
@mock.patch('cinder.volume.drivers.xenapi.tools._stripped_first_line_of')
|
||||
def test_get_this_vm_uuid(self, mock_read_first_line):
|
||||
mock_read_first_line.return_value = 'someuuid'
|
||||
|
@ -77,4 +77,3 @@ cover-package = cinder
|
||||
cover-erase = true
|
||||
cover-inclusive = true
|
||||
verbosity=2
|
||||
detailed-errors=1
|
||||
|
@ -6,6 +6,7 @@ hacking>=0.5.3,<0.6
|
||||
|
||||
coverage>=3.6
|
||||
distribute>=0.6.24
|
||||
fixtures>=0.3.12
|
||||
hp3parclient>=1.0.0
|
||||
mock>=0.8.0
|
||||
mox>=0.5.3
|
||||
@ -16,3 +17,4 @@ nosexcover
|
||||
openstack.nose_plugin>=0.7
|
||||
psycopg2
|
||||
sphinx>=1.1.2
|
||||
testtools>=0.9.29
|
||||
|
Loading…
Reference in New Issue
Block a user