Merge "Add new search options for security service"
This commit is contained in:
commit
f67311a166
@ -0,0 +1,68 @@
|
||||
# Copyright 2014 Mirantis Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tempest.api.share import base
|
||||
from tempest.api.share import test_security_services
|
||||
from tempest import test
|
||||
|
||||
|
||||
class SecurityServiceAdminTest(
|
||||
base.BaseSharesAdminTest,
|
||||
test_security_services.SecurityServiceListMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(SecurityServiceAdminTest, self).setUp()
|
||||
ss_ldap_data = {
|
||||
'name': 'ss_ldap',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'server': 'fake_server_1',
|
||||
'domain': 'fake_domain_1',
|
||||
'user': 'fake_user',
|
||||
'password': 'pass',
|
||||
}
|
||||
ss_kerberos_data = {
|
||||
'name': 'ss_kerberos',
|
||||
'dns_ip': '2.2.2.2',
|
||||
'server': 'fake_server_2',
|
||||
'domain': 'fake_domain_2',
|
||||
'user': 'test_user',
|
||||
'password': 'word',
|
||||
}
|
||||
resp, self.ss_ldap = self.create_security_service('ldap',
|
||||
**ss_ldap_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, self.ss_kerberos = self.create_security_service(
|
||||
'kerberos',
|
||||
**ss_kerberos_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
@test.attr(type=["gate", "smoke", ])
|
||||
def test_list_security_services_all_tenants(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'all_tenants': 1})
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke", ])
|
||||
def test_list_security_services_invalid_filters(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'fake_opt': 'some_value'})
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertEqual(0, len(listed))
|
@ -22,7 +22,110 @@ from tempest import test
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecurityServicesTest(base.BaseSharesTest):
|
||||
class SecurityServiceListMixin(object):
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services(self):
|
||||
resp, listed = self.shares_client.list_security_services()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
# verify keys
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_with_detail(self):
|
||||
resp, listed = self.shares_client.list_security_services(detailed=True)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
||||
# verify keys
|
||||
keys = [
|
||||
"name", "id", "status", "description",
|
||||
"domain", "server", "dns_ip", "user", "password", "type",
|
||||
"created_at", "updated_at", "project_id",
|
||||
]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_filter_by_share_network(self):
|
||||
sn = self.shares_client.get_share_network(
|
||||
self.os.shares_client.share_network_id)[1]
|
||||
fresh_sn = []
|
||||
for i in range(2):
|
||||
resp, sn = self.create_share_network(
|
||||
neutron_net_id=sn["neutron_net_id"],
|
||||
neutron_subnet_id=sn["neutron_subnet_id"])
|
||||
fresh_sn.append(sn)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
resp, body = self.shares_client.add_sec_service_to_share_network(
|
||||
fresh_sn[0]["id"], self.ss_ldap["id"])
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, body = self.shares_client.add_sec_service_to_share_network(
|
||||
fresh_sn[1]["id"], self.ss_kerberos["id"])
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'share_network_id': fresh_sn[0]['id']})
|
||||
self.assertEqual(1, len(listed))
|
||||
self.assertEqual(self.ss_ldap['id'], listed[0]['id'])
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_filter_by_ss_attributes(self):
|
||||
search_opts = {
|
||||
'status': 'NEW',
|
||||
'name': 'ss_ldap',
|
||||
'type': 'ldap',
|
||||
'user': 'fake_user',
|
||||
'server': 'fake_server_1',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'domain': 'fake_domain_1',
|
||||
}
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params=search_opts)
|
||||
self.assertEqual(1, len(listed))
|
||||
self.assertEqual(self.ss_ldap['id'], listed[0]['id'])
|
||||
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
|
||||
class SecurityServicesTest(base.BaseSharesTest,
|
||||
SecurityServiceListMixin):
|
||||
def setUp(self):
|
||||
super(SecurityServicesTest, self).setUp()
|
||||
ss_ldap_data = {
|
||||
'name': 'ss_ldap',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'server': 'fake_server_1',
|
||||
'domain': 'fake_domain_1',
|
||||
'user': 'fake_user',
|
||||
'password': 'pass',
|
||||
}
|
||||
ss_kerberos_data = {
|
||||
'name': 'ss_kerberos',
|
||||
'dns_ip': '2.2.2.2',
|
||||
'server': 'fake_server_2',
|
||||
'domain': 'fake_domain_2',
|
||||
'user': 'test_user',
|
||||
'password': 'word',
|
||||
}
|
||||
resp, self.ss_ldap = self.create_security_service('ldap',
|
||||
**ss_ldap_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
resp, self.ss_kerberos = self.create_security_service(
|
||||
'kerberos',
|
||||
**ss_kerberos_data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_create_delete_security_service(self):
|
||||
@ -105,35 +208,10 @@ class SecurityServicesTest(base.BaseSharesTest):
|
||||
self.assertDictContainsSubset(update_data, updated)
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services(self):
|
||||
data = self.generate_security_service_data()
|
||||
resp, ss = self.create_security_service(**data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertDictContainsSubset(data, ss)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
any(ss["id"] in ss["id"] for ss in listed)
|
||||
|
||||
# verify keys
|
||||
keys = ["name", "id", "status", "type", ]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
|
||||
@test.attr(type=["gate", "smoke"])
|
||||
def test_list_security_services_with_detail(self):
|
||||
data = self.generate_security_service_data()
|
||||
resp, ss = self.create_security_service(**data)
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
self.assertDictContainsSubset(data, ss)
|
||||
|
||||
resp, listed = self.shares_client.list_security_services_with_detail()
|
||||
self.assertIn(int(resp["status"]), test.HTTP_SUCCESS)
|
||||
any(ss["id"] in ss["id"] for ss in listed)
|
||||
|
||||
# verify keys
|
||||
keys = [
|
||||
"name", "id", "status", "description",
|
||||
"domain", "server", "dns_ip", "user", "password", "type",
|
||||
"created_at", "updated_at", "project_id",
|
||||
]
|
||||
[self.assertIn(key, s_s.keys()) for s_s in listed for key in keys]
|
||||
def test_list_security_services_filter_by_invalid_opt(self):
|
||||
resp, listed = self.shares_client.list_security_services(
|
||||
params={'fake_opt': 'some_value'})
|
||||
self.assertIn(int(resp['status']), test.HTTP_SUCCESS)
|
||||
self.assertTrue(any(self.ss_ldap['id'] == ss['id'] for ss in listed))
|
||||
self.assertTrue(any(self.ss_kerberos['id'] == ss['id']
|
||||
for ss in listed))
|
||||
|
@ -120,3 +120,9 @@ class SecurityServicesNegativeTest(base.BaseSharesTest):
|
||||
self.assertRaises(exceptions.NotFound,
|
||||
self.shares_client.get_security_service,
|
||||
ss["id"])
|
||||
|
||||
@test.attr(type=["gate", "smoke", "negative"])
|
||||
def test_try_list_security_services_all_tenants(self):
|
||||
self.assertRaises(exceptions.Unauthorized,
|
||||
self.shares_client.list_security_services,
|
||||
params={'all_tenants': 1})
|
||||
|
@ -436,13 +436,10 @@ class SharesClient(rest_client.RestClient):
|
||||
resp, body = self.get("security-services/%s" % ss_id)
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_security_services(self):
|
||||
resp, body = self.get("security-services")
|
||||
return resp, self._parse_resp(body)
|
||||
|
||||
def list_security_services_with_detail(self, params=None):
|
||||
"""List the details of all shares."""
|
||||
uri = "security-services/detail"
|
||||
def list_security_services(self, detailed=False, params=None):
|
||||
uri = "security-services"
|
||||
if detailed:
|
||||
uri += '/detail'
|
||||
if params:
|
||||
uri += "?%s" % urllib.urlencode(params)
|
||||
resp, body = self.get(uri)
|
||||
|
@ -45,6 +45,7 @@
|
||||
"security_service:show": [["rule:default"]],
|
||||
"security_service:index": [["rule:default"]],
|
||||
"security_service:detail": [["rule:default"]],
|
||||
"security_service:get_all_security_services": [["rule:admin_api"]],
|
||||
|
||||
"share_server:index": [["rule:admin_api"]],
|
||||
"share_server:show": [["rule:admin_api"]],
|
||||
|
@ -103,11 +103,15 @@ class SecurityServiceController(wsgi.Controller):
|
||||
@wsgi.serializers(xml=SecurityServicesTemplate)
|
||||
def index(self, req):
|
||||
"""Returns a summary list of security services."""
|
||||
policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
|
||||
'index')
|
||||
return self._get_security_services(req, is_detail=False)
|
||||
|
||||
@wsgi.serializers(xml=SecurityServicesTemplate)
|
||||
def detail(self, req):
|
||||
"""Returns a detailed list of security services."""
|
||||
policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
|
||||
'detail')
|
||||
return self._get_security_services(req, is_detail=True)
|
||||
|
||||
def _get_security_services(self, req, is_detail):
|
||||
@ -116,8 +120,6 @@ class SecurityServiceController(wsgi.Controller):
|
||||
The list gets transformed through view builder.
|
||||
"""
|
||||
context = req.environ['manila.context']
|
||||
policy.check_policy(context, RESOURCE_NAME,
|
||||
'get_all_security_services')
|
||||
|
||||
search_opts = {}
|
||||
search_opts.update(req.GET)
|
||||
@ -126,26 +128,27 @@ class SecurityServiceController(wsgi.Controller):
|
||||
share_nw = db.share_network_get(context,
|
||||
search_opts['share_network_id'])
|
||||
security_services = share_nw['security_services']
|
||||
del search_opts['share_network_id']
|
||||
else:
|
||||
if 'all_tenants' in search_opts:
|
||||
policy.check_policy(context, RESOURCE_NAME,
|
||||
'get_all_security_services')
|
||||
security_services = db.security_service_get_all(context)
|
||||
else:
|
||||
security_services = db.security_service_get_all_by_project(
|
||||
context, context.project_id)
|
||||
search_opts.pop('all_tenants', None)
|
||||
common.remove_invalid_options(
|
||||
context,
|
||||
search_opts,
|
||||
self._get_security_services_search_options())
|
||||
if 'all_tenants' in search_opts:
|
||||
security_services = db.security_service_get_all(context)
|
||||
del search_opts['all_tenants']
|
||||
else:
|
||||
security_services = db.security_service_get_all_by_project(
|
||||
context, context.project_id)
|
||||
if search_opts:
|
||||
results = []
|
||||
not_found = object()
|
||||
for service in security_services:
|
||||
for opt, value in six.iteritems(search_opts):
|
||||
if service.get(opt, not_found) != value:
|
||||
break
|
||||
else:
|
||||
results.append(service)
|
||||
for ss in security_services:
|
||||
if all(ss.get(opt, not_found) == value for opt, value in
|
||||
six.iteritems(search_opts)):
|
||||
results.append(ss)
|
||||
security_services = results
|
||||
|
||||
limited_list = common.limited(security_services, req)
|
||||
@ -153,13 +156,19 @@ class SecurityServiceController(wsgi.Controller):
|
||||
if is_detail:
|
||||
security_services = self._view_builder.detail_list(
|
||||
req, limited_list)
|
||||
for ss in security_services['security_services']:
|
||||
share_networks = db.share_network_get_all_by_security_service(
|
||||
context,
|
||||
ss['id'])
|
||||
ss['share_networks'] = [sn['id'] for sn in share_networks]
|
||||
else:
|
||||
security_services = self._view_builder.summary_list(
|
||||
req, limited_list)
|
||||
return security_services
|
||||
|
||||
def _get_security_services_search_options(self):
|
||||
return ('status', 'name', 'id', 'type', )
|
||||
return ('status', 'name', 'id', 'type', 'user',
|
||||
'server', 'dns_ip', 'domain', )
|
||||
|
||||
def _share_servers_dependent_on_sn_exist(self, context,
|
||||
security_service_id):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from six.moves.urllib import parse
|
||||
import webob
|
||||
|
||||
from manila.api.v1 import security_service
|
||||
@ -30,7 +31,7 @@ class ShareApiTest(test.TestCase):
|
||||
super(ShareApiTest, self).setUp()
|
||||
self.controller = security_service.SecurityServiceController()
|
||||
self.maxDiff = None
|
||||
self.security_service = {
|
||||
self.ss_active_directory = {
|
||||
"created_at": "fake-time",
|
||||
"updated_at": "fake-time-2",
|
||||
"id": 1,
|
||||
@ -45,15 +46,55 @@ class ShareApiTest(test.TestCase):
|
||||
"status": "new",
|
||||
"project_id": "fake",
|
||||
}
|
||||
security_service.policy.check_policy = mock.Mock()
|
||||
self.ss_ldap = {
|
||||
"created_at": "fake-time",
|
||||
"updated_at": "fake-time-2",
|
||||
"id": 2,
|
||||
"name": "ss-ldap",
|
||||
"description": "Fake Security Service Desc",
|
||||
"type": constants.SECURITY_SERVICES_ALLOWED_TYPES[1],
|
||||
"dns_ip": "2.2.2.2",
|
||||
"server": "test-server",
|
||||
"domain": "test-domain",
|
||||
"user": "test-user",
|
||||
"password": "test-password",
|
||||
"status": "active",
|
||||
"project_id": "fake",
|
||||
}
|
||||
self.valid_search_opts = {
|
||||
'user': 'fake-user',
|
||||
'server': 'fake-server',
|
||||
'dns_ip': '1.1.1.1',
|
||||
'domain': 'fake-domain',
|
||||
'status': 'new',
|
||||
'type': constants.SECURITY_SERVICES_ALLOWED_TYPES[0],
|
||||
}
|
||||
self.check_policy_patcher = mock.patch(
|
||||
'manila.api.v1.security_service.policy.check_policy')
|
||||
self.check_policy_patcher.start()
|
||||
self.addCleanup(self._stop_started_patcher, self.check_policy_patcher)
|
||||
self.security_service_list_expected_resp = {
|
||||
'security_services': [{
|
||||
'id': self.ss_active_directory['id'],
|
||||
'name': self.ss_active_directory['name'],
|
||||
'type': self.ss_active_directory['type'],
|
||||
'status': self.ss_active_directory['status']
|
||||
}, ]
|
||||
}
|
||||
|
||||
def _stop_started_patcher(self, patcher):
|
||||
if hasattr(patcher, 'is_local'):
|
||||
patcher.stop()
|
||||
|
||||
def test_security_service_show(self):
|
||||
db.security_service_get = mock.Mock(return_value=self.security_service)
|
||||
db.security_service_get = mock.Mock(
|
||||
return_value=self.ss_active_directory)
|
||||
req = fakes.HTTPRequest.blank('/security-services/1')
|
||||
res_dict = self.controller.show(req, '1')
|
||||
expected = self.security_service.copy()
|
||||
expected = self.ss_active_directory.copy()
|
||||
expected.update()
|
||||
self.assertEqual(res_dict, {'security_service': self.security_service})
|
||||
self.assertEqual(res_dict,
|
||||
{'security_service': self.ss_active_directory})
|
||||
|
||||
def test_security_service_show_not_found(self):
|
||||
db.security_service_get = mock.Mock(side_effect=exception.NotFound)
|
||||
@ -63,7 +104,7 @@ class ShareApiTest(test.TestCase):
|
||||
req, '1')
|
||||
|
||||
def test_security_service_create(self):
|
||||
sec_service = self.security_service.copy()
|
||||
sec_service = self.ss_active_directory.copy()
|
||||
create_stub = mock.Mock(
|
||||
return_value=sec_service)
|
||||
self.stubs.Set(db, 'security_service_create', create_stub)
|
||||
@ -71,11 +112,11 @@ class ShareApiTest(test.TestCase):
|
||||
req = fakes.HTTPRequest.blank('/security-services')
|
||||
res_dict = self.controller.create(
|
||||
req, {"security_service": sec_service})
|
||||
expected = self.security_service.copy()
|
||||
expected = self.ss_active_directory.copy()
|
||||
self.assertEqual(res_dict, {'security_service': expected})
|
||||
|
||||
def test_security_service_create_invalid_types(self):
|
||||
sec_service = self.security_service.copy()
|
||||
sec_service = self.ss_active_directory.copy()
|
||||
sec_service['type'] = 'invalid'
|
||||
req = fakes.HTTPRequest.blank('/security-services')
|
||||
self.assertRaises(exception.InvalidInput, self.controller.create, req,
|
||||
@ -117,8 +158,8 @@ class ShareApiTest(test.TestCase):
|
||||
req, 1)
|
||||
|
||||
def test_security_service_update_name(self):
|
||||
new = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
new = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['name'] = 'new'
|
||||
db.security_service_get = mock.Mock(return_value=new)
|
||||
db.security_service_update = mock.Mock(return_value=updated)
|
||||
@ -135,8 +176,8 @@ class ShareApiTest(test.TestCase):
|
||||
req.environ['manila.context'], 1)
|
||||
|
||||
def test_security_service_update_description(self):
|
||||
new = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
new = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['description'] = 'new'
|
||||
db.security_service_get = mock.Mock(return_value=new)
|
||||
db.security_service_update = mock.Mock(return_value=updated)
|
||||
@ -159,7 +200,7 @@ class ShareApiTest(test.TestCase):
|
||||
db.share_network_get_all_by_security_service.return_value = [
|
||||
{'id': 'fake_id', 'share_servers': 'fake_share_servers'},
|
||||
]
|
||||
security_service = self.security_service.copy()
|
||||
security_service = self.ss_active_directory.copy()
|
||||
db.security_service_get.return_value = security_service
|
||||
body = {'security_service': {'user_id': 'new_user'}}
|
||||
req = fakes.HTTPRequest.blank('/security_services/1')
|
||||
@ -178,8 +219,8 @@ class ShareApiTest(test.TestCase):
|
||||
db.share_network_get_all_by_security_service.return_value = [
|
||||
{'id': 'fake_id', 'share_servers': 'fake_share_servers'},
|
||||
]
|
||||
old = self.security_service.copy()
|
||||
updated = self.security_service.copy()
|
||||
old = self.ss_active_directory.copy()
|
||||
updated = self.ss_active_directory.copy()
|
||||
updated['name'] = 'new name'
|
||||
updated['description'] = 'new description'
|
||||
db.security_service_get.return_value = old
|
||||
@ -203,14 +244,101 @@ class ShareApiTest(test.TestCase):
|
||||
|
||||
def test_security_service_list(self):
|
||||
db.security_service_get_all_by_project = mock.Mock(
|
||||
return_value=[self.security_service.copy()])
|
||||
return_value=[self.ss_active_directory.copy()])
|
||||
req = fakes.HTTPRequest.blank('/security_services')
|
||||
res_dict = self.controller.index(req)
|
||||
expected = {'security_services': [
|
||||
{'id': self.security_service['id'],
|
||||
'name': self.security_service['name'],
|
||||
'type': self.security_service['type'],
|
||||
'status': self.security_service['status']
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
|
||||
@mock.patch.object(db, 'share_network_get', mock.Mock())
|
||||
def test_security_service_list_filter_by_sn(self):
|
||||
sn = {
|
||||
'id': 'fake_sn_id',
|
||||
'security_services': [self.ss_active_directory, ],
|
||||
}
|
||||
]}
|
||||
self.assertEqual(res_dict, expected)
|
||||
db.share_network_get.return_value = sn
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?share_network_id=fake_sn_id')
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
db.share_network_get.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
sn['id'])
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all', mock.Mock())
|
||||
def test_security_services_list_all_tenants_admin_context(self):
|
||||
self.check_policy_patcher.stop()
|
||||
db.security_service_get_all.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?all_tenants=1&name=fake-name',
|
||||
use_admin_context=True)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp, res_dict)
|
||||
db.security_service_get_all.assert_called_once_with(
|
||||
req.environ['manila.context'])
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all', mock.Mock())
|
||||
def test_security_services_list_all_tenants_non_admin_context(self):
|
||||
self.check_policy_patcher.stop()
|
||||
db.security_service_get_all.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?all_tenants=1')
|
||||
self.assertRaises(exception.PolicyNotAuthorized, self.controller.index,
|
||||
req)
|
||||
self.assertFalse(db.security_service_get_all.called)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_services_list_admin_context_invalid_opts(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?fake_opt=fake_value',
|
||||
use_admin_context=True)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual({'security_services': []}, res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_once_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_service_list_all_filter_opts_separately(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
for opt, val in self.valid_search_opts.items():
|
||||
for use_admin_context in [True, False]:
|
||||
req = fakes.HTTPRequest.blank(
|
||||
'/security-services?' + opt + '=' + val,
|
||||
use_admin_context=use_admin_context)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp,
|
||||
res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
||||
@mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock())
|
||||
def test_security_service_list_all_filter_opts(self):
|
||||
db.security_service_get_all_by_project.return_value = [
|
||||
self.ss_active_directory,
|
||||
self.ss_ldap,
|
||||
]
|
||||
query_string = '/security-services?' + parse.urlencode(sorted(
|
||||
[(k, v) for (k, v) in list(self.valid_search_opts.items())]))
|
||||
for use_admin_context in [True, False]:
|
||||
req = fakes.HTTPRequest.blank(query_string,
|
||||
use_admin_context=use_admin_context)
|
||||
res_dict = self.controller.index(req)
|
||||
self.assertEqual(self.security_service_list_expected_resp,
|
||||
res_dict)
|
||||
db.security_service_get_all_by_project.assert_called_with(
|
||||
req.environ['manila.context'],
|
||||
req.environ['manila.context'].project_id)
|
||||
|
@ -35,5 +35,8 @@
|
||||
"share_extension:types_manage": [],
|
||||
"share_extension:types_extra_specs": [],
|
||||
|
||||
"security_service:index": [],
|
||||
"security_service:get_all_security_services": [["rule:admin_api"]],
|
||||
|
||||
"limits_extension:used_limits": []
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user