Enforce policy on service api

This patch enforces policy on service api,
and adds corresponding unit tests.

Change-Id: I96b200575e9e998b986e5b47b1961769ee8f1969
This commit is contained in:
Pradeep Kumar Singh 2016-11-04 14:37:45 +00:00
parent 96b5fc1870
commit 5c0935c056
4 changed files with 69 additions and 50 deletions

View File

@ -22,5 +22,5 @@
"image:create": "rule:default", "image:create": "rule:default",
"image:get_all": "rule:default", "image:get_all": "rule:default",
"magnum-service:get_all": "rule:admin_api" "zun-service:get_all": "rule:admin_api"
} }

View File

@ -11,12 +11,14 @@
# under the License. # under the License.
import pecan import pecan
from pecan import rest
from zun.api.controllers import base from zun.api.controllers import base
from zun.api.controllers import types from zun.api.controllers import types
from zun.api.controllers.v1 import collection from zun.api.controllers.v1 import collection
from zun.api import servicegroup as svcgrp_api from zun.api import servicegroup as svcgrp_api
from zun.common import exception from zun.common import exception
from zun.common import policy
from zun import objects from zun import objects
@ -110,21 +112,22 @@ class ZunServiceCollection(collection.Collection):
return collection return collection
class ZunServiceController(object): class ZunServiceController(rest.RestController):
"""REST controller for zun-services.""" """REST controller for zun-services."""
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(ZunServiceController, self).__init__() super(ZunServiceController, self).__init__()
self.servicegroup_api = svcgrp_api.ServiceGroup() self.servicegroup_api = svcgrp_api.ServiceGroup()
# TODO(hongbin): uncomment this once policy is ported @pecan.expose('json')
# @policy.enforce_wsgi("zun-service", "get_all")
@pecan.expose(generic=True, template='json')
@exception.wrap_pecan_controller_exception @exception.wrap_pecan_controller_exception
def index(self, **kwargs): def get_all(self, **kwargs):
"""Retrieve a list of zun-services. """Retrieve a list of zun-services.
""" """
context = pecan.request.context
policy.enforce(context, "zun-service:get_all",
action="zun-service:get_all")
hsvcs = objects.ZunService.list(pecan.request.context, hsvcs = objects.ZunService.list(pecan.request.context,
limit=None, limit=None,
marker=None, marker=None,

View File

@ -11,6 +11,7 @@
# under the License. # under the License.
from tempest.lib import decorators from tempest.lib import decorators
from tempest.lib import exceptions
from zun.tests.tempest.api import clients from zun.tests.tempest.api import clients
from zun.tests.tempest import base from zun.tests.tempest import base
@ -40,13 +41,10 @@ class TestService(base.BaseZunTest):
super(TestService, cls).resource_setup() super(TestService, cls).resource_setup()
# TODO(pksingh): currently functional test doesnt support
# policy, will write another test after
# implementing policy in functional tests
@decorators.idempotent_id('a04f61f2-15ae-4200-83b7-1f311b101f36') @decorators.idempotent_id('a04f61f2-15ae-4200-83b7-1f311b101f36')
def test_service_list(self): def test_service_list(self):
resp, model = self.container_client.list_services() self.assertRaises(exceptions.Forbidden,
self.assertEqual(200, resp.status) self.container_client.list_services)
self.assertEqual(len(model.services), 1)
zun_comp = model.services[0]
self.assertEqual(zun_comp['id'], 1)
self.assertEqual('up', zun_comp['state'])
self.assertEqual('zun-compute', zun_comp['binary'])
self.assertGreater(zun_comp['report_count'], 0)

View File

@ -10,16 +10,21 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import mock
from zun.api.controllers.v1 import zun_services as zservice from zun.api.controllers.v1 import zun_services as zservice
from zun.api import servicegroup
from zun import objects
from zun.tests import base from zun.tests import base
from zun.tests.unit.api import utils as apiutils from zun.tests.unit.api import base as api_base
from zun.tests.unit.api import utils as api_utils
class TestZunServiceObject(base.BaseTestCase): class TestZunServiceObject(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestZunServiceObject, self).setUp() super(TestZunServiceObject, self).setUp()
self.rpc_dict = apiutils.zservice_get_data() self.rpc_dict = api_utils.zservice_get_data()
def test_msvc_obj_fields_filtering(self): def test_msvc_obj_fields_filtering(self):
"""Test that it does filtering fields """ """Test that it does filtering fields """
@ -37,44 +42,57 @@ class db_rec(object):
return self.rec_as_dict return self.rec_as_dict
# TODO(hongbin): Enable the tests below class TestZunServiceController(api_base.FunctionalTest):
# class TestZunServiceController(api_base.FunctionalTest):
# def setUp(self): def test_empty(self):
# super(TestZunServiceController, self).setUp() response = self.get_json('/services')
self.assertEqual([], response['services'])
# def test_empty(self): def _rpc_api_reply(self, count=1):
# response = self.get_json('/hservices') reclist = []
# self.assertEqual([], response['hservices']) for i in range(count):
elem = api_utils.zservice_get_data()
elem['id'] = i + 1
rec = db_rec(elem)
reclist.append(rec)
return reclist
# def _rpc_api_reply(self, count=1): @mock.patch.object(objects.ZunService, 'list')
# reclist = [] @mock.patch.object(servicegroup.ServiceGroup, 'service_is_up')
# for i in range(count): def test_get_one(self, svc_up, mock_list):
# elem = apiutils.zservice_get_data() mock_list.return_value = self._rpc_api_reply()
# elem['id'] = i + 1 svc_up.return_value = "up"
# rec = db_rec(elem)
# reclist.append(rec)
# return reclist
# @mock.patch.object(objects.ZunService, 'list') response = self.get_json('/services')
# @mock.patch.object(servicegroup.ServiceGroup, 'service_is_up') self.assertEqual(len(response['services']), 1)
# def test_get_one(self, svc_up, mock_list): self.assertEqual(response['services'][0]['id'], 1)
# mock_list.return_value = self._rpc_api_reply()
# svc_up.return_value = "up"
# response = self.get_json('/hservices') @mock.patch.object(objects.ZunService, 'list')
# self.assertEqual(len(response['hservices']), 1) @mock.patch.object(servicegroup.ServiceGroup, 'service_is_up')
# self.assertEqual(response['hservices'][0]['id'], 1) def test_get_many(self, svc_up, mock_list):
svc_num = 5
mock_list.return_value = self._rpc_api_reply(svc_num)
svc_up.return_value = "up"
# @mock.patch.object(objects.ZunService, 'list') response = self.get_json('/services')
# @mock.patch.object(servicegroup.ServiceGroup, 'service_is_up') self.assertEqual(len(response['services']), svc_num)
# def test_get_many(self, svc_up, mock_list): for i in range(svc_num):
# svc_num = 5 elem = response['services'][i]
# mock_list.return_value = self._rpc_api_reply(svc_num) self.assertEqual(elem['id'], i + 1)
# svc_up.return_value = "up"
# response = self.get_json('/hservices')
# self.assertEqual(len(response['hservices']), svc_num) class TestZunServiceEnforcement(api_base.FunctionalTest):
# for i in range(svc_num):
# elem = response['hservices'][i] def _common_policy_check(self, rule, func, *arg, **kwarg):
# self.assertEqual(elem['id'], i + 1) self.policy.set_rules({rule: 'project:non_fake'})
response = func(*arg, **kwarg)
self.assertEqual(403, response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertTrue(
"Policy doesn't allow %s to be performed." % rule,
response.json['errors'][0]['detail'])
def test_policy_disallow_get_all(self):
self._common_policy_check(
'zun-service:get_all', self.get_json,
'/services', expect_errors=True)