00221d59b9
1.As mentioned in [1], we should avoid using six.iteritems to achieve iterators. We can use dict.items instead, as it will return iterators in PY3 as well. And dict.items/keys will more readable. 2.In py2, the performance about list should be negligible, see the link [2]. [1] https://wiki.openstack.org/wiki/Python3 [2] http://lists.openstack.org/pipermail/openstack-dev/2015-June/066391.html Change-Id: Ic68ba358d51002e1a1aeac6159248ffb730b5daf
203 lines
8.0 KiB
Python
203 lines
8.0 KiB
Python
# Copyright 2014 Mirantis Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from oslo_log import log
|
|
import six
|
|
from tempest import config
|
|
import testtools
|
|
from testtools import testcase as tc
|
|
|
|
from manila_tempest_tests.tests.api import base
|
|
|
|
CONF = config.CONF
|
|
LOG = log.getLogger(__name__)
|
|
|
|
|
|
class SecurityServiceListMixin(object):
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_list_security_services(self):
|
|
listed = self.shares_client.list_security_services()
|
|
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
|
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
|
for ss in listed))
|
|
|
|
# verify keys
|
|
keys = ["name", "id", "status", "type", ]
|
|
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_list_security_services_with_detail(self):
|
|
listed = self.shares_client.list_security_services(detailed=True)
|
|
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
|
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
|
for ss in listed))
|
|
|
|
# verify keys
|
|
keys = [
|
|
"name", "id", "status", "description",
|
|
"domain", "server", "dns_ip", "user", "password", "type",
|
|
"created_at", "updated_at", "project_id",
|
|
]
|
|
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
@testtools.skipIf(
|
|
not CONF.share.multitenancy_enabled, "Only for multitenancy.")
|
|
def test_list_security_services_filter_by_share_network(self):
|
|
sn = self.shares_client.get_share_network(
|
|
self.shares_client.share_network_id)
|
|
fresh_sn = []
|
|
for i in range(2):
|
|
sn = self.create_share_network(
|
|
neutron_net_id=sn["neutron_net_id"],
|
|
neutron_subnet_id=sn["neutron_subnet_id"])
|
|
fresh_sn.append(sn)
|
|
|
|
self.shares_client.add_sec_service_to_share_network(
|
|
fresh_sn[0]["id"], self.ss_ldap["id"])
|
|
self.shares_client.add_sec_service_to_share_network(
|
|
fresh_sn[1]["id"], self.ss_kerberos["id"])
|
|
|
|
listed = self.shares_client.list_security_services(
|
|
params={'share_network_id': fresh_sn[0]['id']})
|
|
self.assertEqual(1, len(listed))
|
|
self.assertEqual(self.ss_ldap['id'], listed[0]['id'])
|
|
|
|
keys = ["name", "id", "status", "type", ]
|
|
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_list_security_services_detailed_filter_by_ss_attributes(self):
|
|
search_opts = {
|
|
'name': 'ss_ldap',
|
|
'type': 'ldap',
|
|
'user': 'fake_user',
|
|
'server': 'fake_server_1',
|
|
'dns_ip': '1.1.1.1',
|
|
'domain': 'fake_domain_1',
|
|
}
|
|
listed = self.shares_client.list_security_services(
|
|
detailed=True,
|
|
params=search_opts)
|
|
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
|
for ss in listed:
|
|
self.assertTrue(all(ss[key] == value for key, value
|
|
in search_opts.items()))
|
|
|
|
|
|
class SecurityServicesTest(base.BaseSharesTest,
|
|
SecurityServiceListMixin):
|
|
def setUp(self):
|
|
super(SecurityServicesTest, self).setUp()
|
|
ss_ldap_data = {
|
|
'name': 'ss_ldap',
|
|
'dns_ip': '1.1.1.1',
|
|
'server': 'fake_server_1',
|
|
'domain': 'fake_domain_1',
|
|
'user': 'fake_user',
|
|
'password': 'pass',
|
|
}
|
|
ss_kerberos_data = {
|
|
'name': 'ss_kerberos',
|
|
'dns_ip': '2.2.2.2',
|
|
'server': 'fake_server_2',
|
|
'domain': 'fake_domain_2',
|
|
'user': 'test_user',
|
|
'password': 'word',
|
|
}
|
|
self.ss_ldap = self.create_security_service('ldap', **ss_ldap_data)
|
|
self.ss_kerberos = self.create_security_service(
|
|
'kerberos', **ss_kerberos_data)
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_create_delete_security_service(self):
|
|
data = self.generate_security_service_data()
|
|
self.service_names = ["ldap", "kerberos", "active_directory"]
|
|
for ss_name in self.service_names:
|
|
ss = self.create_security_service(ss_name, **data)
|
|
self.assertDictContainsSubset(data, ss)
|
|
self.assertEqual(ss_name, ss["type"])
|
|
self.shares_client.delete_security_service(ss["id"])
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_get_security_service(self):
|
|
data = self.generate_security_service_data()
|
|
ss = self.create_security_service(**data)
|
|
self.assertDictContainsSubset(data, ss)
|
|
|
|
get = self.shares_client.get_security_service(ss["id"])
|
|
self.assertDictContainsSubset(data, get)
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_update_security_service(self):
|
|
data = self.generate_security_service_data()
|
|
ss = self.create_security_service(**data)
|
|
self.assertDictContainsSubset(data, ss)
|
|
|
|
upd_data = self.generate_security_service_data()
|
|
updated = self.shares_client.update_security_service(
|
|
ss["id"], **upd_data)
|
|
|
|
get = self.shares_client.get_security_service(ss["id"])
|
|
self.assertDictContainsSubset(upd_data, updated)
|
|
self.assertDictContainsSubset(upd_data, get)
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
|
|
@testtools.skipIf(
|
|
not CONF.share.multitenancy_enabled, "Only for multitenancy.")
|
|
def test_try_update_valid_keys_sh_server_exists(self):
|
|
ss_data = self.generate_security_service_data()
|
|
ss = self.create_security_service(**ss_data)
|
|
|
|
sn = self.shares_client.get_share_network(
|
|
self.shares_client.share_network_id)
|
|
fresh_sn = self.create_share_network(
|
|
neutron_net_id=sn["neutron_net_id"],
|
|
neutron_subnet_id=sn["neutron_subnet_id"])
|
|
|
|
self.shares_client.add_sec_service_to_share_network(
|
|
fresh_sn["id"], ss["id"])
|
|
|
|
# Security service with fake data is used, so if we use backend driver
|
|
# that fails on wrong data, we expect error here.
|
|
# We require any share that uses our share-network.
|
|
try:
|
|
self.create_share(
|
|
share_network_id=fresh_sn["id"], cleanup_in_class=False)
|
|
except Exception as e:
|
|
# we do wait for either 'error' or 'available' status because
|
|
# it is the only available statuses for proper deletion.
|
|
LOG.warning("Caught exception. It is expected in case backend "
|
|
"fails having security-service with improper data "
|
|
"that leads to share-server creation error. "
|
|
"%s" % six.text_type(e))
|
|
|
|
update_data = {
|
|
"name": "name",
|
|
"description": "new_description",
|
|
}
|
|
updated = self.shares_client.update_security_service(
|
|
ss["id"], **update_data)
|
|
self.assertDictContainsSubset(update_data, updated)
|
|
|
|
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
|
|
def test_list_security_services_filter_by_invalid_opt(self):
|
|
listed = self.shares_client.list_security_services(
|
|
params={'fake_opt': 'some_value'})
|
|
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
|
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
|
for ss in listed))
|