Add doc strings, unit tests etc

This commit is contained in:
Liam Young 2018-07-26 14:29:01 +00:00
parent 6f6883711d
commit 76159459a1
8 changed files with 332 additions and 37 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.tox
.testrepository
.unit-state.db
.stestr/
__pycache__/

8
.testr.conf Normal file
View File

@ -0,0 +1,8 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./unit_tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

View File

@ -1,29 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from charms.reactive import set_flag, clear_flag
from charms.reactive import Endpoint
from charms.reactive import when_any, when_not, when
class CellProvides(Endpoint):
@when_not('endpoint.{endpoint_name}.joined')
def broken(self):
clear_flag(self.expand_name('endpoint.{endpoint_name}.new-request'))
clear_flag(self.expand_name('{endpoint_name}.connected'))
@when('endpoint.{endpoint_name}.joined')
def joined(self):
print("CellProvides")
set_flag(self.expand_name('{endpoint_name}.connected'))

View File

@ -10,59 +10,104 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import socket
from urllib.parse import urlparse
import uuid
from charmhelpers.core import hookenv
from charms.reactive import set_flag, clear_flag from charms.reactive import set_flag, clear_flag
from charms.reactive import Endpoint from charms.reactive import Endpoint
from charms.reactive import when_any, when_not, when from charms.reactive import when_not, when
class CellRequires(Endpoint): class CellRequires(Endpoint):
@when('endpoint.{endpoint_name}.changed') @when('endpoint.{endpoint_name}.changed')
def data_changed(self): def data_changed(self):
"""Set flag to indicate to charm relation data has changed."""
if self.all_joined_units.received.get('network_manager'): if self.all_joined_units.received.get('network_manager'):
set_flag(self.expand_name('{endpoint_name}.available')) set_flag(self.expand_name('{endpoint_name}.available'))
@when_not('endpoint.{endpoint_name}.joined') @when_not('endpoint.{endpoint_name}.joined')
def broken(self): def broken(self):
"""Remove flag to indicate to charm relation has gone.."""
clear_flag(self.expand_name('{endpoint_name}.available')) clear_flag(self.expand_name('{endpoint_name}.available'))
@when('endpoint.{endpoint_name}.joined') @when('endpoint.{endpoint_name}.joined')
def joined(self): def joined(self):
print("CellRequires") """Set flag to indicate to charm relation has been joined."""
set_flag(self.expand_name('{endpoint_name}.connected')) set_flag(self.expand_name('{endpoint_name}.connected'))
def send_cell_data(self, cell_name, amqp_svc_name, db_svc_name): def send_cell_data(self, cell_name, amqp_svc_name, db_svc_name):
"""Send compute nodes data relating to network setup.
:param cell_name: Name of the cell this controller is associated with.
:type cell_name: str
:param amqp_svc_name: URL for this cells nova message broker.
:type amqp_svc_name: str
:param db_svc_name: URL for this cells nova db.
:type db_svc_name: str
"""
for relation in self.relations: for relation in self.relations:
relation.to_publish_raw['amqp-service'] = amqp_svc_name relation.to_publish_raw['amqp-service'] = amqp_svc_name
relation.to_publish_raw['db-service'] = db_svc_name relation.to_publish_raw['db-service'] = db_svc_name
relation.to_publish_raw['cell-name'] = cell_name relation.to_publish_raw['cell-name'] = cell_name
def get_settings(self, keys): def get_settings(self, keys):
"""Retrieve setting(s) from remote units.
:param keys: List of keys and their vaules to retrieve.
:type keys: str
:returns: Requested key value pairs.
:rtype: dict
"""
settings = {} settings = {}
for key in keys: for key in keys:
settings[key] = self.all_joined_units.received.get(key) settings[key] = self.all_joined_units.received.get(key)
return settings return settings
def get_console_data(self): def get_console_data(self):
"""Retrieve console settings from remote application.
:returns: console settings
:rtype: dict
"""
return self.get_settings( return self.get_settings(
['enable_serial_console', 'serial_console_base_url']) ['enable_serial_console', 'serial_console_base_url'])
def get_network_data(self): def get_network_data(self):
"""Retrieve network settings from remote application.
:returns: network settings
:rtype: dict
"""
return self.get_settings( return self.get_settings(
['network_manager', 'quantum_plugin', 'quantum_security_groups', ['network_manager', 'quantum_plugin', 'quantum_security_groups',
'quantum_url']) 'quantum_url'])
def get_region(self): def get_region(self):
"""Retrieve region settings from remote application.
:returns: region settings
:rtype: dict
"""
return self.get_settings(['region']) return self.get_settings(['region'])
def get_volume_data(self): def get_volume_data(self):
"""Retrieve volume settings from remote application.
:returns: volume settings
:rtype: dict
"""
return self.get_settings(['volume_service']) return self.get_settings(['volume_service'])
def get_ec2_data(self): def get_ec2_data(self):
"""Retrieve ec2 settings from remote application.
:returns: ec2 settings
:rtype: dict
"""
return self.get_settings(['ec2_host']) return self.get_settings(['ec2_host'])
def get_restart_trigger(self):
"""Retrieve restart trigger from remote application.
:returns: restart trigger
:rtype: dict
"""
return self.get_settings(['restart_trigger'])

6
test-requirements.txt Normal file
View File

@ -0,0 +1,6 @@
# Lint and unit test requirements
flake8
os-testr>=0.4.1
charms.reactive
mock>=1.2
coverage>=3.6

33
tox.ini Normal file
View File

@ -0,0 +1,33 @@
[tox]
skipsdist = True
envlist = pep8,py35
skip_missing_interpreters = True
[testenv]
setenv = VIRTUAL_ENV={envdir}
PYTHONHASHSEED=0
TERM=linux
install_command =
pip install {opts} {packages}
[testenv:py35]
basepython = python3
deps = -r{toxinidir}/test-requirements.txt
commands = ostestr {posargs}
[testenv:py36]
basepython = python3.6
deps = -r{toxinidir}/test-requirements.txt
commands = ostestr {posargs}
[testenv:pep8]
basepython = python2.7
deps = -r{toxinidir}/test-requirements.txt
commands = flake8 {posargs} . unit_tests
[testenv:venv]
commands = {posargs}
[flake8]
# E402 ignore necessary for path append before sys module import in actions
ignore = E402

0
unit_tests/__init__.py Normal file
View File

227
unit_tests/test_requires.py Normal file
View File

@ -0,0 +1,227 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import mock
with mock.patch('charmhelpers.core.hookenv.metadata') as _meta:
_meta.return_Value = 'ss'
import requires
_hook_args = {}
TO_PATCH = [
'clear_flag',
'set_flag',
]
def mock_hook(*args, **kwargs):
def inner(f):
# remember what we were passed. Note that we can't actually determine
# the class we're attached to, as the decorator only gets the function.
_hook_args[f.__name__] = dict(args=args, kwargs=kwargs)
return f
return inner
class _unit_mock:
def __init__(self, unit_name, received=None):
self.unit_name = unit_name
self.received = received or {}
class _relation_mock:
def __init__(self, application_name=None, units=None):
self.to_publish_raw = {}
self.application_name = application_name
self.units = units
class TestCellRequires(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._patched_hook = mock.patch('charms.reactive.when', mock_hook)
cls._patched_hook_started = cls._patched_hook.start()
# force requires to rerun the mock_hook decorator:
# try except is Python2/Python3 compatibility as Python3 has moved
# reload to importlib.
try:
reload(requires)
except NameError:
import importlib
importlib.reload(requires)
@classmethod
def tearDownClass(cls):
cls._patched_hook.stop()
cls._patched_hook_started = None
cls._patched_hook = None
# and fix any breakage we did to the module
try:
reload(requires)
except NameError:
import importlib
importlib.reload(requires)
def patch(self, method):
_m = mock.patch.object(self.obj, method)
_mock = _m.start()
self.addCleanup(_m.stop)
return _mock
def setUp(self):
self.cr = requires.CellRequires('some-relation', [])
self._patches = {}
self._patches_start = {}
self.obj = requires
for method in TO_PATCH:
setattr(self, method, self.patch(method))
def tearDown(self):
self.cr = None
for k, v in self._patches.items():
v.stop()
setattr(self, k, None)
self._patches = None
self._patches_start = None
def patch_kr(self, attr, return_value=None):
mocked = mock.patch.object(self.cr, attr)
self._patches[attr] = mocked
started = mocked.start()
started.return_value = return_value
self._patches_start[attr] = started
setattr(self, attr, started)
def test_registered_hooks(self):
# test that the decorators actually registered the relation
# expressions that are meaningful for this interface: this is to
# handle regressions.
# The keys are the function names that the hook attaches to.
hook_patterns = {
'data_changed': ('endpoint.{endpoint_name}.changed', ),
'joined': ('endpoint.{endpoint_name}.joined', ),
'broken': ('endpoint.{endpoint_name}.joined', ),
}
for k, v in _hook_args.items():
self.assertEqual(hook_patterns[k], v['args'])
def test_date_changed(self):
mock_aju = mock.MagicMock()
mock_aju.received = {'network_manager': 'nm'}
self.cr._all_joined_units = mock_aju
self.cr.data_changed()
self.set_flag.assert_called_once_with('some-relation.available')
def test_date_changed_missing_data(self):
mock_aju = mock.MagicMock()
mock_aju.received = {}
self.cr._all_joined_units = mock_aju
self.cr.data_changed()
self.assertFalse(self.set_flag.called)
def test_broken(self):
self.cr.broken()
self.clear_flag.assert_called_once_with('some-relation.available')
def test_joined(self):
self.cr.joined()
self.set_flag.assert_called_once_with('some-relation.connected')
def test_send_cell_data(self):
mock_rel1 = _relation_mock()
mock_rel2 = _relation_mock()
self.cr._relations = [mock_rel1, mock_rel2]
self.cr.send_cell_data(
'cell2',
'rabbitmq-server-cell2',
'mysql-cell2')
expect = {
'amqp-service': 'rabbitmq-server-cell2',
'db-service': 'mysql-cell2',
'cell-name': 'cell2'}
self.assertEqual(mock_rel1.to_publish_raw, expect)
self.assertEqual(mock_rel2.to_publish_raw, expect)
def test_get_settings(self):
mock_aju = mock.MagicMock()
mock_aju.received = {
'key1': 'value1',
'key2': 'value2',
'key3': 'value3'}
self.cr._all_joined_units = mock_aju
self.assertEqual(
self.cr.get_settings(['key1', 'key3']),
{'key1': 'value1', 'key3': 'value3'})
def setup_rdata(self):
mock_aju = mock.MagicMock()
mock_aju.received = {
'enable_serial_console': True,
'serial_console_base_url': 'http://serialconsole',
'network_manager': 'vTRManager',
'quantum_plugin': 'vTokenRing',
'quantum_security_groups': 'no',
'quantum_url': 'http://bob:345/dddd/',
'region': 'Region12',
'volume_service': 'cinder',
'restart_trigger': 'a-uuid',
'ec2_host': 'http://ec2host'}
self.cr._all_joined_units = mock_aju
def test_get_console_data(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_console_data(),
{
'enable_serial_console': True,
'serial_console_base_url': 'http://serialconsole'})
def test_get_network_data(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_network_data(),
{
'network_manager': 'vTRManager',
'quantum_plugin': 'vTokenRing',
'quantum_security_groups': 'no',
'quantum_url': 'http://bob:345/dddd/'})
def test_get_region(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_region(),
{'region': 'Region12'})
def test_get_volume_data(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_volume_data(),
{'volume_service': 'cinder'})
def test_get_ec2_data(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_ec2_data(),
{'ec2_host': 'http://ec2host'})
def test_get_restart_trigger(self):
self.setup_rdata()
self.assertEqual(
self.cr.get_restart_trigger(),
{'restart_trigger': 'a-uuid'})