python-zunclient/zunclient/tests/functional/base.py
AleptNmarata 6b5ef0dab2 Add OSC commands functional test
This patch adds the basic files and only one test now.
Consecutive patches will add more test cases.

Change-Id: Iebc28e9781357c11a664c6b0d203a4c20a63a6ca
2017-02-16 09:53:45 +00:00

194 lines
8.0 KiB
Python

# Copyright (c) 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import six
import six.moves.configparser as config_parser
from tempest.lib.cli import base
DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
class FunctionalTestBase(base.ClientTestBase):
"""Container base class, calls to zunclient."""
def setUp(self):
super(FunctionalTestBase, self).setUp()
self.client = self._get_clients()
def _get_clients(self):
# NOTE(aarefiev): {toxinidir} is a current working directory, so
# the tox env path is {toxinidir}/.tox
cli_dir = os.path.join(os.path.abspath('.'), '.tox/functional/bin')
config = self._get_config()
if config.get('os_auth_url'):
client = base.CLIClient(cli_dir=cli_dir,
username=config['os_username'],
password=config['os_password'],
tenant_name=config['os_project_name'],
uri=config['os_auth_url'])
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if config.get(domain_attr):
setattr(self, domain_attr, config[domain_attr])
else:
self.zun_url = config['zun_url']
self.os_auth_token = config['os_auth_token']
client = base.CLIClient(cli_dir=cli_dir,
zun_url=self.zun_url,
os_auth_token=self.os_auth_token)
return client
def _get_config(self):
config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG',
DEFAULT_CONFIG_FILE)
# SafeConfigParser was deprecated in Python 3.2
if six.PY3:
config = config_parser.ConfigParser()
else:
config = config_parser.SafeConfigParser()
if not config.read(config_file):
self.skipTest('Skipping, no test config found @ %s' % config_file)
try:
auth_strategy = config.get('functional', 'auth_strategy')
except config_parser.NoOptionError:
auth_strategy = 'keystone'
if auth_strategy not in ['keystone', 'noauth']:
raise self.fail(
'Invalid auth type specified: %s in functional must be '
'one of: [keystone, noauth]' % auth_strategy)
conf_settings = []
keystone_v3_conf_settings = []
if auth_strategy == 'keystone':
conf_settings += ['os_auth_url', 'os_username',
'os_password', 'os_project_name']
keystone_v3_conf_settings += ['os_user_domain_id',
'os_project_domain_id']
else:
conf_settings += ['os_auth_token', 'zun_url']
cli_flags = {}
missing = []
for c in conf_settings + keystone_v3_conf_settings:
try:
cli_flags[c] = config.get('functional', c)
except config_parser.NoOptionError:
# NOTE(vdrok): Here we ignore the absence of KS v3 options as
# v2 may be used. Keystone client will do the actual check of
# the parameters' correctness.
if c not in keystone_v3_conf_settings:
missing.append(c)
if missing:
self.fail('Missing required setting in test.conf (%(conf)s) for '
'auth_strategy=%(auth)s: %(missing)s' %
{'conf': config_file,
'auth': auth_strategy,
'missing': ','.join(missing)})
return cli_flags
def _cmd_no_auth(self, cmd, action, flags='', params=''):
"""Execute given command with noauth attributes.
:param cmd: command to be executed
:type cmd: string
:param action: command on cli to run
:type action: string
:param flags: optional cli flags to use
:type flags: string
:param params: optional positional args to use
:type params: string
"""
flags = ('--os_auth_token %(token)s --zun_url %(url)s %(flags)s'
%
{'token': self.os_auth_token,
'url': self.zun_url,
'flags': flags})
return base.execute(cmd, action, flags, params,
cli_dir=self.client.cli_dir)
def _zun(self, action, flags='', params=''):
"""Execute zun command for the given action.
:param action: the cli command to run using Container
:type action: string
:param flags: any optional cli flags to use
:type flags: string
:param params: any optional positional args to use
:type params: string
"""
flags += ' --os-endpoint-type publicURL'
if hasattr(self, 'os_auth_token'):
return self._cmd_no_auth('appcontainer', action, flags, params)
else:
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if hasattr(self, domain_attr):
flags += ' --os-%(ks_obj)s-domain-id %(value)s' % {
'ks_obj': keystone_object,
'value': getattr(self, domain_attr)
}
return self.client.cmd_with_auth('zun',
action, flags, params)
def _zun_osc(self, action, flags='', params=''):
"""Execute container commands via OpenStack Client."""
config = self._get_config()
identity_api_version = config.get('functional',
'os_identity_api_version')
flags += ' --os-identity-api-version {0}'.format(identity_api_version)
for keystone_object in 'user', 'project':
domain_attr = 'os_%s_domain_id' % keystone_object
if hasattr(self, domain_attr):
flags += ' --os-%(ks_obj)s-domain-id %(value)s' % {
'ks_obj': keystone_object,
'value': getattr(self, domain_attr)
}
return self.client.cmd_with_auth(
'openstack', action, flags, params)
def zun(self, action, flags='', params='', parse=True):
"""Return parsed list of dicts with basic item info.
:param action: the cli command to run using Container
:type action: string
:param flags: any optional cli flags to use
:type flags: string
:param params: any optional positional args to use
:type params: string
:param parse: return parsed list or raw output
:type parse: bool
"""
output = self._zun(action=action, flags=flags, params=params)
return self.parser.listing(output) if parse else output
def get_table_headers(self, action, flags='', params=''):
output = self._zun(action=action, flags=flags, params=params)
table = self.parser.table(output)
return table['headers']
def assertTableHeaders(self, field_names, table_headers):
"""Assert that field_names and table_headers are equal.
:param field_names: field names from the output table of the cmd
:param table_headers: table headers output from cmd
"""
self.assertEqual(sorted(field_names), sorted(table_headers))
def list_containers(self, params=''):
return self.zun('container-list', params=params)