
Fixes or improve various details with concurrency mode. When running in parallel mode, iperf results were not properly aggregated. Display supplementary informations in the output, including a readable output for bandwidth usage and iperf parallel related informations Also allow to handle single value for concurrency-progression parameter within the action Closes-bug: #2015173 Closes-bug: #2015174 Change-Id: Ia1358fc5f966cdc3bfafda6bd8320282986d9f5e
90 lines
2.6 KiB
Python
90 lines
2.6 KiB
Python
import asyncio
|
|
import contextlib
|
|
import io
|
|
import mock
|
|
import unittest
|
|
import unittest.mock
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def patch_open():
|
|
'''Patch open() to allow mocking both open() itself and the file that is
|
|
yielded.
|
|
Yields the mock for "open" and "file", respectively.'''
|
|
mock_open = mock.MagicMock(spec=open)
|
|
mock_file = mock.MagicMock(spec=io.FileIO)
|
|
|
|
@contextlib.contextmanager
|
|
def stub_open(*args, **kwargs):
|
|
mock_open(*args, **kwargs)
|
|
yield mock_file
|
|
|
|
with mock.patch('builtins.open', stub_open):
|
|
yield mock_open, mock_file
|
|
|
|
|
|
def async_test(f):
|
|
"""
|
|
A decorator to test async functions within a synchronous environment.
|
|
|
|
see https://stackoverflow.com/questions/23033939/
|
|
"""
|
|
def wrapper(*args, **kwargs):
|
|
coro = asyncio.coroutine(f)
|
|
future = coro(*args, **kwargs)
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(future)
|
|
return wrapper
|
|
|
|
|
|
class CharmTestCase(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self._patches = {}
|
|
self._patches_start = {}
|
|
|
|
def tearDown(self):
|
|
for k, v in self._patches.items():
|
|
v.stop()
|
|
setattr(self, k, None)
|
|
self._patches = None
|
|
self._patches_start = None
|
|
|
|
def _patch(self, method):
|
|
_m = unittest.mock.patch.object(self.obj, method)
|
|
mock = _m.start()
|
|
self.addCleanup(_m.stop)
|
|
return mock
|
|
|
|
def patch_all(self):
|
|
for method in self.patches:
|
|
setattr(self, method, self._patch(method))
|
|
|
|
def patch_object(self, obj, attr, return_value=None, name=None, new=None,
|
|
**kwargs):
|
|
if name is None:
|
|
name = attr
|
|
if new is not None:
|
|
mocked = mock.patch.object(obj, attr, new=new, **kwargs)
|
|
else:
|
|
mocked = mock.patch.object(obj, attr, **kwargs)
|
|
self._patches[name] = mocked
|
|
started = mocked.start()
|
|
if new is None:
|
|
started.return_value = return_value
|
|
self._patches_start[name] = started
|
|
setattr(self, name, started)
|
|
|
|
def patch(self, item, return_value=None, name=None, new=None, **kwargs):
|
|
if name is None:
|
|
raise RuntimeError("Must pass 'name' to .patch()")
|
|
if new is not None:
|
|
mocked = mock.patch(item, new=new, **kwargs)
|
|
else:
|
|
mocked = mock.patch(item, **kwargs)
|
|
self._patches[name] = mocked
|
|
started = mocked.start()
|
|
if new is None:
|
|
started.return_value = return_value
|
|
self._patches_start[name] = started
|