9940682bc8
Added support vnf package create command and their unit test cases. Please see the results here: http://paste.openstack.org/show/775422/ Change-Id: Idf59847aaf33e360dae696eae819d217ae63570f Implements: bp tosca-csar-mgmt-driver
801 lines
32 KiB
Python
801 lines
32 KiB
Python
# Copyright 2012 OpenStack Foundation.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import urllib
|
|
|
|
import contextlib
|
|
import fixtures
|
|
import mock
|
|
import six
|
|
import six.moves.urllib.parse as urlparse
|
|
import sys
|
|
import testtools
|
|
|
|
from tackerclient.common import constants
|
|
from tackerclient.common import exceptions
|
|
from tackerclient import shell
|
|
from tackerclient.tacker import v1_0 as tackerV1_0
|
|
from tackerclient.tacker.v1_0 import TackerCommand
|
|
from tackerclient.tests.unit import test_utils
|
|
from tackerclient.v1_0 import client
|
|
|
|
API_VERSION = "1.0"
|
|
FORMAT = 'json'
|
|
TOKEN = 'testtoken'
|
|
ENDURL = 'localurl'
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def capture_std_streams():
|
|
fake_stdout, fake_stderr = six.StringIO(), six.StringIO()
|
|
stdout, stderr = sys.stdout, sys.stderr
|
|
try:
|
|
sys.stdout, sys.stderr = fake_stdout, fake_stderr
|
|
yield fake_stdout, fake_stderr
|
|
finally:
|
|
sys.stdout, sys.stderr = stdout, stderr
|
|
|
|
|
|
class FakeStdout(object):
|
|
|
|
def __init__(self):
|
|
self.content = []
|
|
|
|
def write(self, text):
|
|
self.content.append(text)
|
|
|
|
def make_string(self):
|
|
result = ''
|
|
for line in self.content:
|
|
result = result + line
|
|
return result
|
|
|
|
|
|
class MyResp(object):
|
|
def __init__(self, status_code, headers=None, reason=None):
|
|
self.status_code = status_code
|
|
self.headers = headers or {}
|
|
self.reason = reason
|
|
|
|
|
|
class MyApp(object):
|
|
def __init__(self, _stdout):
|
|
self.stdout = _stdout
|
|
|
|
|
|
def end_url(path, query=None, format=FORMAT):
|
|
_url_str = ENDURL + "/v" + API_VERSION + path + "." + format
|
|
return query and _url_str + "?" + query or _url_str
|
|
|
|
|
|
class MyUrlComparator(object):
|
|
def __init__(self, lhs, client):
|
|
self.lhs = lhs
|
|
self.client = client
|
|
|
|
def equals(self, rhs):
|
|
lhsp = urlparse.urlparse(self.lhs)
|
|
rhsp = urlparse.urlparse(rhs)
|
|
|
|
lhs_qs = urlparse.parse_qsl(lhsp.query)
|
|
rhs_qs = urlparse.parse_qsl(rhsp.query)
|
|
|
|
return (lhsp.scheme == rhsp.scheme and
|
|
lhsp.netloc == rhsp.netloc and
|
|
lhsp.path == rhsp.path and
|
|
len(lhs_qs) == len(rhs_qs) and
|
|
set(lhs_qs) == set(rhs_qs))
|
|
|
|
def __str__(self):
|
|
if self.client and self.client.format != FORMAT:
|
|
lhs_parts = self.lhs.split("?", 1)
|
|
if len(lhs_parts) == 2:
|
|
lhs = ("%s.%s?%s" % (lhs_parts[0][:-4],
|
|
self.client.format,
|
|
lhs_parts[1]))
|
|
else:
|
|
lhs = ("%s.%s" % (lhs_parts[0][:-4],
|
|
self.client.format))
|
|
return lhs
|
|
return self.lhs
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
def __eq__(self, rhs):
|
|
return self.equals(rhs)
|
|
|
|
def __ne__(self, rhs):
|
|
return not self.__eq__(rhs)
|
|
|
|
|
|
class MyComparator(object):
|
|
def __init__(self, lhs, client):
|
|
self.lhs = lhs
|
|
self.client = client
|
|
|
|
def _com_dict(self, lhs, rhs):
|
|
if len(lhs) != len(rhs):
|
|
return False
|
|
for key, value in lhs.items():
|
|
if key not in rhs:
|
|
return False
|
|
rhs_value = rhs[key]
|
|
if not self._com(value, rhs_value):
|
|
return False
|
|
return True
|
|
|
|
def _com_list(self, lhs, rhs):
|
|
if len(lhs) != len(rhs):
|
|
return False
|
|
for lhs_value in lhs:
|
|
if lhs_value not in rhs:
|
|
return False
|
|
return True
|
|
|
|
def _com(self, lhs, rhs):
|
|
if lhs is None:
|
|
return rhs is None
|
|
if isinstance(lhs, dict):
|
|
if not isinstance(rhs, dict):
|
|
return False
|
|
return self._com_dict(lhs, rhs)
|
|
if isinstance(lhs, list):
|
|
if not isinstance(rhs, list):
|
|
return False
|
|
return self._com_list(lhs, rhs)
|
|
if isinstance(lhs, tuple):
|
|
if not isinstance(rhs, tuple):
|
|
return False
|
|
return self._com_list(lhs, rhs)
|
|
return lhs == rhs
|
|
|
|
def equals(self, rhs):
|
|
if self.client:
|
|
rhs = self.client.deserialize(rhs, 200)
|
|
return self._com(self.lhs, rhs)
|
|
|
|
def __repr__(self):
|
|
if self.client:
|
|
return self.client.serialize(self.lhs)
|
|
return str(self.lhs)
|
|
|
|
def __eq__(self, rhs):
|
|
return self.equals(rhs)
|
|
|
|
def __ne__(self, rhs):
|
|
return not self.__eq__(rhs)
|
|
|
|
|
|
class CLITestV10Base(testtools.TestCase):
|
|
|
|
format = 'json'
|
|
test_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
|
|
id_field = 'id'
|
|
|
|
def _find_resourceid(self, client, resource, name_or_id):
|
|
return name_or_id
|
|
|
|
def setUp(self, plurals={}):
|
|
"""Prepare the test environment."""
|
|
super(CLITestV10Base, self).setUp()
|
|
client.LegacyClient.EXTED_PLURALS.update(constants.PLURALS)
|
|
client.LegacyClient.EXTED_PLURALS.update(plurals)
|
|
self.metadata = {'plurals': client.LegacyClient.EXTED_PLURALS,
|
|
'xmlns': constants.XML_NS_V10,
|
|
constants.EXT_NS: {'prefix':
|
|
'http://xxxx.yy.com'}}
|
|
self.endurl = ENDURL
|
|
self.fake_stdout = FakeStdout()
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', self.fake_stdout))
|
|
self.useFixture(fixtures.MonkeyPatch(
|
|
'tackerclient.tacker.v1_0.find_resourceid_by_name_or_id',
|
|
self._find_resourceid))
|
|
self.useFixture(fixtures.MonkeyPatch(
|
|
'tackerclient.tacker.v1_0.find_resourceid_by_id',
|
|
self._find_resourceid))
|
|
self.client = client.LegacyClient(token=TOKEN,
|
|
endpoint_url=self.endurl)
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_create_resource(self, resource, cmd,
|
|
name, myid, args,
|
|
position_names, position_values, mock_get,
|
|
tenant_id=None, get_client_called_count=1,
|
|
tags=None, admin_state_up=True, extra_body=None,
|
|
**kwargs):
|
|
mock_get.return_value = self.client
|
|
non_admin_status_resources = ['vnfd', 'vnf', 'vim', 'vnffgd', 'vnffg']
|
|
if (resource in non_admin_status_resources):
|
|
body = {resource: {}, }
|
|
else:
|
|
body = {resource: {'admin_state_up': admin_state_up, }, }
|
|
if tenant_id:
|
|
body[resource].update({'tenant_id': tenant_id})
|
|
if tags:
|
|
body[resource].update({'tags': tags})
|
|
if extra_body:
|
|
body[resource].update(extra_body)
|
|
body[resource].update(kwargs)
|
|
|
|
for i in range(len(position_names)):
|
|
body[resource].update({position_names[i]: position_values[i]})
|
|
ress = {resource:
|
|
{self.id_field: myid}, }
|
|
if name:
|
|
ress[resource].update({'name': name})
|
|
self.client.format = self.format
|
|
resstr = self.client.serialize(ress)
|
|
# url method body
|
|
resource_plural = tackerV1_0._get_resource_plural(resource,
|
|
self.client)
|
|
path = getattr(self.client, resource_plural + "_path")
|
|
# Work around for LP #1217791. XML deserializer called from
|
|
# MyComparator does not decodes XML string correctly.
|
|
if self.format == 'json':
|
|
_body = MyComparator(body, self.client)
|
|
else:
|
|
_body = self.client.serialize(body)
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(200), resstr)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser('create_' + resource)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
end_url(path, format=self.format), 'POST',
|
|
body=_body,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
self.assertEqual(get_client_called_count, mock_get.call_count)
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
if name:
|
|
self.assertIn(name, _str)
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_list_columns(self, cmd, resources_collection,
|
|
resources_out, mock_get,
|
|
args=['-f', 'json']):
|
|
mock_get.return_value = self.client
|
|
self.client.format = self.format
|
|
resstr = self.client.serialize(resources_out)
|
|
|
|
path = getattr(self.client, resources_collection + "_path")
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(200), resstr)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("list_" + resources_collection)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
end_url(path, format=self.format), 'GET',
|
|
body=None,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
mock_get.assert_called_once_with()
|
|
|
|
def _test_list_resources(self, resources, cmd, detail=False, tags=[],
|
|
fields_1=[], fields_2=[], page_size=None,
|
|
sort_key=[], sort_dir=[], response_contents=None,
|
|
base_args=None, path=None,
|
|
template_source=None):
|
|
if response_contents is None:
|
|
contents = [{self.id_field: 'myid1', },
|
|
{self.id_field: 'myid2', }, ]
|
|
else:
|
|
contents = response_contents
|
|
reses = {resources: contents}
|
|
self.client.format = self.format
|
|
resstr = self.client.serialize(reses)
|
|
# url method body
|
|
query = ""
|
|
args = base_args if base_args is not None else []
|
|
if detail:
|
|
args.append('-D')
|
|
args.extend(['--request-format', self.format])
|
|
if fields_1:
|
|
for field in fields_1:
|
|
args.append('--fields')
|
|
args.append(field)
|
|
if template_source is not None:
|
|
args.append("--template-source")
|
|
args.append(template_source)
|
|
query += 'template_source=' + template_source
|
|
|
|
if tags:
|
|
args.append('--')
|
|
args.append("--tag")
|
|
for tag in tags:
|
|
args.append(tag)
|
|
if isinstance(tag, six.string_types):
|
|
tag = urllib.quote(tag.encode('utf-8'))
|
|
if query:
|
|
query += "&tag=" + tag
|
|
else:
|
|
query = "tag=" + tag
|
|
if (not tags) and fields_2:
|
|
args.append('--')
|
|
if fields_2:
|
|
args.append("--fields")
|
|
for field in fields_2:
|
|
args.append(field)
|
|
if detail:
|
|
query = query and query + '&verbose=True' or 'verbose=True'
|
|
fields_1.extend(fields_2)
|
|
for field in fields_1:
|
|
if query:
|
|
query += "&fields=" + field
|
|
else:
|
|
query = "fields=" + field
|
|
if page_size:
|
|
args.append("--page-size")
|
|
args.append(str(page_size))
|
|
if query:
|
|
query += "&limit=%s" % page_size
|
|
else:
|
|
query = "limit=%s" % page_size
|
|
if sort_key:
|
|
for key in sort_key:
|
|
args.append('--sort-key')
|
|
args.append(key)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_key=%s' % key
|
|
if sort_dir:
|
|
len_diff = len(sort_key) - len(sort_dir)
|
|
if len_diff > 0:
|
|
sort_dir += ['asc'] * len_diff
|
|
elif len_diff < 0:
|
|
sort_dir = sort_dir[:len(sort_key)]
|
|
for dir in sort_dir:
|
|
args.append('--sort-dir')
|
|
args.append(dir)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_dir=%s' % dir
|
|
if path is None:
|
|
path = getattr(self.client, resources + "_path")
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(200), resstr)
|
|
with mock.patch.object(TackerCommand, 'get_client') as mock_get:
|
|
mock_get.return_value = self.client
|
|
cmd_parser = cmd.get_parser("list_" + resources)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
MyUrlComparator(end_url(path, query, format=self.format),
|
|
self.client),
|
|
'GET',
|
|
body=None,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
_str = self.fake_stdout.make_string()
|
|
if response_contents is None:
|
|
self.assertIn('myid1', _str)
|
|
return _str
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_list_sub_resources(self, resources, api_resource, cmd, myid,
|
|
mock_get, detail=False,
|
|
tags=[], fields_1=[], fields_2=[],
|
|
page_size=None, sort_key=[], sort_dir=[],
|
|
response_contents=None, base_args=None,
|
|
path=None):
|
|
mock_get.return_value = self.client
|
|
if response_contents is None:
|
|
contents = [{self.id_field: 'myid1', },
|
|
{self.id_field: 'myid2', }, ]
|
|
else:
|
|
contents = response_contents
|
|
reses = {api_resource: contents}
|
|
self.client.format = self.format
|
|
resstr = self.client.serialize(reses)
|
|
# url method body
|
|
query = ""
|
|
args = base_args if base_args is not None else []
|
|
if detail:
|
|
args.append('-D')
|
|
args.extend(['--request-format', self.format])
|
|
if fields_1:
|
|
for field in fields_1:
|
|
args.append('--fields')
|
|
args.append(field)
|
|
|
|
if tags:
|
|
args.append('--')
|
|
args.append("--tag")
|
|
for tag in tags:
|
|
args.append(tag)
|
|
if isinstance(tag, six.string_types):
|
|
tag = urllib.quote(tag.encode('utf-8'))
|
|
if query:
|
|
query += "&tag=" + tag
|
|
else:
|
|
query = "tag=" + tag
|
|
if (not tags) and fields_2:
|
|
args.append('--')
|
|
if fields_2:
|
|
args.append("--fields")
|
|
for field in fields_2:
|
|
args.append(field)
|
|
if detail:
|
|
query = query and query + '&verbose=True' or 'verbose=True'
|
|
fields_1.extend(fields_2)
|
|
for field in fields_1:
|
|
if query:
|
|
query += "&fields=" + field
|
|
else:
|
|
query = "fields=" + field
|
|
if page_size:
|
|
args.append("--page-size")
|
|
args.append(str(page_size))
|
|
if query:
|
|
query += "&limit=%s" % page_size
|
|
else:
|
|
query = "limit=%s" % page_size
|
|
if sort_key:
|
|
for key in sort_key:
|
|
args.append('--sort-key')
|
|
args.append(key)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_key=%s' % key
|
|
if sort_dir:
|
|
len_diff = len(sort_key) - len(sort_dir)
|
|
if len_diff > 0:
|
|
sort_dir += ['asc'] * len_diff
|
|
elif len_diff < 0:
|
|
sort_dir = sort_dir[:len(sort_key)]
|
|
for dir in sort_dir:
|
|
args.append('--sort-dir')
|
|
args.append(dir)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_dir=%s' % dir
|
|
if path is None:
|
|
path = getattr(self.client, resources + "_path")
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(200), resstr)
|
|
comparator = MyUrlComparator(
|
|
end_url(path % myid, query=query, format=self.format),
|
|
self.client)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("list_" + resources)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
comparator, 'GET',
|
|
body=None,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
_str = self.fake_stdout.make_string()
|
|
if response_contents is None:
|
|
self.assertIn('myid1', _str)
|
|
return _str
|
|
|
|
# TODO(gongysh) add pagination unit test BUG 1633255
|
|
# def _test_list_sub_resources_with_pagination(
|
|
# self, resources, api_resource, cmd, myid):
|
|
# self.mox.StubOutWithMock(cmd, "get_client")
|
|
# self.mox.StubOutWithMock(self.client.httpclient, "request")
|
|
# cmd.get_client().MultipleTimes().AndReturn(self.client)
|
|
# path = getattr(self.client, resources + "_path")
|
|
# fake_query = "marker=myid2&limit=2"
|
|
# reses1 = {api_resource: [{'id': 'myid1', },
|
|
# {'id': 'myid2', }],
|
|
# '%s_links' % api_resource: [
|
|
# {'href': end_url(path % myid, fake_query),
|
|
# 'rel': 'next'}]
|
|
# }
|
|
# reses2 = {api_resource: [{'id': 'myid3', },
|
|
# {'id': 'myid4', }]}
|
|
# self.client.format = self.format
|
|
# resstr1 = self.client.serialize(reses1)
|
|
# resstr2 = self.client.serialize(reses2)
|
|
# self.client.httpclient.request(
|
|
# end_url(path % myid, "", format=self.format), 'GET',
|
|
# body=None,
|
|
# headers=mox.ContainsKeyValue(
|
|
# 'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr1))
|
|
# self.client.httpclient.request(
|
|
# MyUrlComparator(end_url(path % myid, fake_query,
|
|
# format=self.format), self.client), 'GET',
|
|
# body=None, headers=mox.ContainsKeyValue(
|
|
# 'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr2))
|
|
# self.mox.ReplayAll()
|
|
# cmd_parser = cmd.get_parser("list_" + resources)
|
|
# args = [myid, '--request-format', self.format]
|
|
# shell.run_command(cmd, cmd_parser, args)
|
|
# self.mox.VerifyAll()
|
|
# self.mox.UnsetStubs()
|
|
|
|
# def _test_list_resources_with_pagination(self, resources, cmd):
|
|
# self.mox.StubOutWithMock(cmd, "get_client")
|
|
# self.mox.StubOutWithMock(self.client.httpclient, "request")
|
|
# cmd.get_client().MultipleTimes().AndReturn(self.client)
|
|
# path = getattr(self.client, resources + "_path")
|
|
# fake_query = "marker=myid2&limit=2"
|
|
# reses1 = {resources: [{'id': 'myid1', },
|
|
# {'id': 'myid2', }],
|
|
# '%s_links' % resources: [
|
|
# {'href': end_url(path, fake_query),
|
|
# 'rel': 'next'}]}
|
|
# reses2 = {resources: [{'id': 'myid3', },
|
|
# {'id': 'myid4', }]}
|
|
# self.client.format = self.format
|
|
# resstr1 = self.client.serialize(reses1)
|
|
# resstr2 = self.client.serialize(reses2)
|
|
# self.client.httpclient.request(
|
|
# end_url(path, "", format=self.format), 'GET',
|
|
# body=None,
|
|
# headers=mox.ContainsKeyValue(
|
|
# 'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr1))
|
|
# self.client.httpclient.request(
|
|
# MyUrlComparator(end_url(path, fake_query, format=self.format),
|
|
# self.client), 'GET', body=None,
|
|
# headers=mox.ContainsKeyValue(
|
|
# 'X-Auth-Token', TOKEN)).AndReturn((MyResp(200), resstr2))
|
|
# self.mox.ReplayAll()
|
|
# cmd_parser = cmd.get_parser("list_" + resources)
|
|
# args = ['--request-format', self.format]
|
|
# shell.run_command(cmd, cmd_parser, args)
|
|
# self.mox.VerifyAll()
|
|
# self.mox.UnsetStubs()
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_update_resource(self, resource, cmd, myid, args, extrafields,
|
|
mock_get, get_client_called_count=1):
|
|
mock_get.return_value = self.client
|
|
body = {resource: extrafields}
|
|
path = getattr(self.client, resource + "_path")
|
|
self.client.format = self.format
|
|
# Work around for LP #1217791. XML deserializer called from
|
|
# MyComparator does not decodes XML string correctly.
|
|
if self.format == 'json':
|
|
_body = MyComparator(body, self.client)
|
|
else:
|
|
_body = self.client.serialize(body)
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
comparator = MyUrlComparator(
|
|
end_url(path % myid, format=self.format), self.client)
|
|
mock_req.return_value = (MyResp(204), None)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("update_" + resource)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
comparator,
|
|
'PUT',
|
|
body=_body,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
self.assertEqual(get_client_called_count, mock_get.call_count)
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
|
|
def _test_show_resource(self, resource, cmd, myid, args, fields=[]):
|
|
with mock.patch.object(cmd, 'get_client') as mock_get:
|
|
mock_get.return_value = self.client
|
|
query = "&".join(["fields=%s" % field for field in fields])
|
|
expected_res = {resource:
|
|
{self.id_field: myid,
|
|
'name': 'myname', }, }
|
|
self.client.format = self.format
|
|
resstr = self.client.serialize(expected_res)
|
|
path = getattr(self.client, resource + "_path")
|
|
with mock.patch.object(self.client.httpclient, 'request') as\
|
|
mock_req:
|
|
mock_req.return_value = (MyResp(200), resstr)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("show_" + resource)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
end_url(path % myid, query, format=self.format), 'GET',
|
|
body=None,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
_str = self.fake_stdout.make_string()
|
|
mock_get.assert_called_once_with()
|
|
self.assertIn(myid, _str)
|
|
self.assertIn('myname', _str)
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_delete_resource(self, resource, cmd, myid, args, mock_get):
|
|
deleted_msg = {'vnf': 'delete initiated'}
|
|
mock_get.return_value = self.client
|
|
path = getattr(self.client, resource + "_path")
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(204), None)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("delete_" + resource)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
if '--force' in args:
|
|
body_str = '{"' + resource + \
|
|
'": {"attributes": {"force": true}}}'
|
|
mock_req.assert_called_once_with(
|
|
end_url(path % myid, format=self.format), 'DELETE',
|
|
body=body_str,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
else:
|
|
mock_req.assert_called_once_with(
|
|
end_url(path % myid, format=self.format), 'DELETE',
|
|
body=None,
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
mock_get.assert_called_once_with()
|
|
_str = self.fake_stdout.make_string()
|
|
msg = 'All specified %(resource)s(s) %(msg)s successfully\n' % {
|
|
'msg': deleted_msg.get(resource, 'deleted'),
|
|
'resource': resource}
|
|
self.assertEqual(msg, _str)
|
|
|
|
@mock.patch.object(TackerCommand, 'get_client')
|
|
def _test_update_resource_action(self, resource, cmd, myid, action, args,
|
|
body, mock_get, retval=None):
|
|
mock_get.return_value = self.client
|
|
path = getattr(self.client, resource + "_path")
|
|
path_action = '%s/%s' % (myid, action)
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(204), retval)
|
|
args.extend(['--request-format', self.format])
|
|
cmd_parser = cmd.get_parser("delete_" + resource)
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
mock_req.assert_called_once_with(
|
|
end_url(path % path_action, format=self.format), 'PUT',
|
|
body=MyComparator(body, self.client),
|
|
headers=test_utils.ContainsKeyValue('X-Auth-Token', TOKEN))
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
|
|
|
|
class ClientV1TestJson(CLITestV10Base):
|
|
def test_do_request_unicode(self):
|
|
self.client.format = self.format
|
|
unicode_text = u'\u7f51\u7edc'
|
|
action = u'/test'
|
|
params = {'test': unicode_text}
|
|
body = params
|
|
expect_body = self.client.serialize(body)
|
|
self.client.httpclient.auth_token = unicode_text
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(200), expect_body)
|
|
res_body = self.client.do_request('PUT', action, body=body,
|
|
params=params)
|
|
expected_uri = u'localurl/v1.0/test.json?test=%E7%BD%91%E7%BB%9C'
|
|
mock_req.assert_called_with(
|
|
expected_uri, 'PUT', body=expect_body,
|
|
headers={'X-Auth-Token': unicode_text,
|
|
'User-Agent': 'python-tackerclient'})
|
|
# test response with unicode
|
|
self.assertEqual(res_body, body)
|
|
|
|
def test_do_request_error_without_response_body(self):
|
|
self.client.format = self.format
|
|
params = {'test': 'value'}
|
|
expect_query = urlparse.urlencode(params)
|
|
self.client.httpclient.auth_token = 'token'
|
|
with mock.patch.object(self.client.httpclient, 'request') as mock_req:
|
|
mock_req.return_value = (MyResp(400, reason='An error'), '')
|
|
self.client.httpclient.request(
|
|
end_url('/test', query=expect_query, format=self.format),
|
|
'PUT', body='',
|
|
headers={'X-Auth-Token': 'token'}
|
|
)
|
|
error = self.assertRaises(exceptions.TackerClientException,
|
|
self.client.do_request, 'PUT', '/test',
|
|
body='', params=params)
|
|
self.assertEqual("400-tackerFault", str(error))
|
|
|
|
|
|
class CLITestV10ExceptionHandler(CLITestV10Base):
|
|
|
|
def _test_exception_handler_v10(
|
|
self, expected_exception, status_code, expected_msg,
|
|
error_type=None, error_msg=None, error_detail=None,
|
|
error_content=None):
|
|
if error_content is None:
|
|
error_content = {'TackerError': {'type': error_type,
|
|
'message': error_msg,
|
|
'detail': error_detail}}
|
|
|
|
e = self.assertRaises(expected_exception,
|
|
client.exception_handler_v10,
|
|
status_code, error_content)
|
|
self.assertEqual(status_code, e.status_code)
|
|
self.assertEqual(expected_exception.__name__,
|
|
e.__class__.__name__)
|
|
|
|
if expected_msg is None:
|
|
if error_detail:
|
|
expected_msg = '\n'.join([error_msg, error_detail])
|
|
else:
|
|
expected_msg = error_msg
|
|
self.assertEqual(expected_msg, e.message)
|
|
|
|
def test_exception_handler_v10_ip_address_in_use(self):
|
|
err_msg = ('Unable to complete operation for network '
|
|
'fake-network-uuid. The IP address fake-ip is in use.')
|
|
self._test_exception_handler_v10(
|
|
exceptions.IpAddressInUseClient, 409, err_msg,
|
|
'IpAddressInUse', err_msg, '')
|
|
|
|
def test_exception_handler_v10_tacker_known_error(self):
|
|
known_error_map = [
|
|
('NetworkNotFound', exceptions.NetworkNotFoundClient, 404),
|
|
('PortNotFound', exceptions.PortNotFoundClient, 404),
|
|
('NetworkInUse', exceptions.NetworkInUseClient, 409),
|
|
('PortInUse', exceptions.PortInUseClient, 409),
|
|
('StateInvalid', exceptions.StateInvalidClient, 400),
|
|
('IpAddressInUse', exceptions.IpAddressInUseClient, 409),
|
|
('IpAddressGenerationFailure',
|
|
exceptions.IpAddressGenerationFailureClient, 409),
|
|
('ExternalIpAddressExhausted',
|
|
exceptions.ExternalIpAddressExhaustedClient, 400),
|
|
('OverQuota', exceptions.OverQuotaClient, 409),
|
|
]
|
|
|
|
error_msg = 'dummy exception message'
|
|
error_detail = 'sample detail'
|
|
for server_exc, client_exc, status_code in known_error_map:
|
|
self._test_exception_handler_v10(
|
|
client_exc, status_code,
|
|
error_msg + '\n' + error_detail,
|
|
server_exc, error_msg, error_detail)
|
|
|
|
def test_exception_handler_v10_tacker_known_error_without_detail(self):
|
|
error_msg = 'Network not found'
|
|
error_detail = ''
|
|
self._test_exception_handler_v10(
|
|
exceptions.NetworkNotFoundClient, 404,
|
|
error_msg,
|
|
'NetworkNotFound', error_msg, error_detail)
|
|
|
|
def test_exception_handler_v10_unknown_error_to_per_code_exception(self):
|
|
for status_code, client_exc in exceptions.HTTP_EXCEPTION_MAP.items():
|
|
error_msg = 'Unknown error'
|
|
error_detail = 'This is detail'
|
|
self._test_exception_handler_v10(
|
|
client_exc, status_code,
|
|
error_msg + '\n' + error_detail,
|
|
'UnknownError', error_msg, error_detail)
|
|
|
|
def test_exception_handler_v10_tacker_unknown_status_code(self):
|
|
error_msg = 'Unknown error'
|
|
error_detail = 'This is detail'
|
|
self._test_exception_handler_v10(
|
|
exceptions.TackerClientException, 501,
|
|
error_msg + '\n' + error_detail,
|
|
'UnknownError', error_msg, error_detail)
|
|
|
|
def test_exception_handler_v10_bad_tacker_error(self):
|
|
error_content = {'TackerError': {'unknown_key': 'UNKNOWN'}}
|
|
self._test_exception_handler_v10(
|
|
exceptions.TackerClientException, 500,
|
|
expected_msg={'unknown_key': 'UNKNOWN'},
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v10_error_dict_contains_message(self):
|
|
error_content = {'message': 'This is an error message'}
|
|
self._test_exception_handler_v10(
|
|
exceptions.TackerClientException, 500,
|
|
expected_msg='500-tackerFault',
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v10_error_dict_not_contain_message(self):
|
|
error_content = 'tackerFault'
|
|
expected_msg = '%s-%s' % (500, error_content)
|
|
self._test_exception_handler_v10(
|
|
exceptions.TackerClientException, 500,
|
|
expected_msg=expected_msg,
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v10_default_fallback(self):
|
|
error_content = 'This is an error message'
|
|
expected_msg = '%s-%s' % (500, error_content)
|
|
self._test_exception_handler_v10(
|
|
exceptions.TackerClientException, 500,
|
|
expected_msg=expected_msg,
|
|
error_content=error_content)
|