Merge "tests: initialize policy in BaseTestCase"
This commit is contained in:
commit
06721af86d
@ -33,6 +33,7 @@ from oslo_messaging import conffixture as messaging_conffixture
|
|||||||
|
|
||||||
from neutron.common import config
|
from neutron.common import config
|
||||||
from neutron.common import rpc as n_rpc
|
from neutron.common import rpc as n_rpc
|
||||||
|
from neutron import policy
|
||||||
from neutron.tests import fake_notifier
|
from neutron.tests import fake_notifier
|
||||||
from neutron.tests import sub_base
|
from neutron.tests import sub_base
|
||||||
|
|
||||||
@ -104,6 +105,9 @@ class BaseTestCase(sub_base.SubBaseTestCase):
|
|||||||
self.setup_rpc_mocks()
|
self.setup_rpc_mocks()
|
||||||
self.setup_config()
|
self.setup_config()
|
||||||
|
|
||||||
|
policy.init()
|
||||||
|
self.addCleanup(policy.reset)
|
||||||
|
|
||||||
def get_new_temp_dir(self):
|
def get_new_temp_dir(self):
|
||||||
"""Create a new temporary directory.
|
"""Create a new temporary directory.
|
||||||
|
|
||||||
|
@ -94,7 +94,6 @@ class APIPolicyTestCase(base.BaseTestCase):
|
|||||||
True)
|
True)
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
policy.reset()
|
|
||||||
if self.ATTRIBUTE_MAP_COPY:
|
if self.ATTRIBUTE_MAP_COPY:
|
||||||
attributes.RESOURCE_ATTRIBUTE_MAP = self.ATTRIBUTE_MAP_COPY
|
attributes.RESOURCE_ATTRIBUTE_MAP = self.ATTRIBUTE_MAP_COPY
|
||||||
super(APIPolicyTestCase, self).tearDown()
|
super(APIPolicyTestCase, self).tearDown()
|
||||||
|
@ -113,6 +113,9 @@ class APIv2TestBase(base.BaseTestCase, testlib_plugin.PluginSetupHelper):
|
|||||||
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
|
cfg.CONF.set_override('quota_driver', 'neutron.quota.ConfDriver',
|
||||||
group='QUOTAS')
|
group='QUOTAS')
|
||||||
|
|
||||||
|
# APIRouter initialization resets policy module, re-initializing it
|
||||||
|
policy.init()
|
||||||
|
|
||||||
|
|
||||||
class _ArgMatcher(object):
|
class _ArgMatcher(object):
|
||||||
"""An adapter to assist mock assertions, used to custom compare."""
|
"""An adapter to assist mock assertions, used to custom compare."""
|
||||||
@ -518,8 +521,6 @@ class APIv2TestCase(APIv2TestBase):
|
|||||||
# Note: since all resources use the same controller and validation
|
# Note: since all resources use the same controller and validation
|
||||||
# logic, we actually get really good coverage from testing just networks.
|
# logic, we actually get really good coverage from testing just networks.
|
||||||
class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
||||||
def setUp(self):
|
|
||||||
super(JSONV2TestCase, self).setUp()
|
|
||||||
|
|
||||||
def _test_list(self, req_tenant_id, real_tenant_id):
|
def _test_list(self, req_tenant_id, real_tenant_id):
|
||||||
env = {}
|
env = {}
|
||||||
@ -1027,8 +1028,6 @@ class JSONV2TestCase(APIv2TestBase, testlib_api.WebTestCase):
|
|||||||
def test_get_keystone_strip_admin_only_attribute(self):
|
def test_get_keystone_strip_admin_only_attribute(self):
|
||||||
tenant_id = _uuid()
|
tenant_id = _uuid()
|
||||||
# Inject rule in policy engine
|
# Inject rule in policy engine
|
||||||
policy.init()
|
|
||||||
self.addCleanup(policy.reset)
|
|
||||||
rules = {'get_network:name': common_policy.parse_rule(
|
rules = {'get_network:name': common_policy.parse_rule(
|
||||||
"rule:admin_only")}
|
"rule:admin_only")}
|
||||||
policy.set_rules(rules, overwrite=False)
|
policy.set_rules(rules, overwrite=False)
|
||||||
|
@ -22,10 +22,6 @@ from neutron.tests import base
|
|||||||
|
|
||||||
class ConfigurationTest(base.BaseTestCase):
|
class ConfigurationTest(base.BaseTestCase):
|
||||||
|
|
||||||
def setup_config(self):
|
|
||||||
# don't use default config
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_load_paste_app_not_found(self):
|
def test_load_paste_app_not_found(self):
|
||||||
self.config(api_paste_config='no_such_file.conf')
|
self.config(api_paste_config='no_such_file.conf')
|
||||||
with mock.patch.object(cfg.CONF, 'find_file', return_value=None) as ff:
|
with mock.patch.object(cfg.CONF, 'find_file', return_value=None) as ff:
|
||||||
|
@ -15,22 +15,12 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from neutron.agent.linux import interface
|
|
||||||
from neutron.cmd import netns_cleanup as util
|
from neutron.cmd import netns_cleanup as util
|
||||||
from neutron.tests import base
|
from neutron.tests import base
|
||||||
|
|
||||||
|
|
||||||
class TestNetnsCleanup(base.BaseTestCase):
|
class TestNetnsCleanup(base.BaseTestCase):
|
||||||
|
|
||||||
def setup_config(self):
|
|
||||||
# don't use default config
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_setup_conf(self):
|
|
||||||
expected_opts = interface.OPTS
|
|
||||||
conf = util.setup_conf()
|
|
||||||
self.assertTrue(all([opt.name in conf for opt in expected_opts]))
|
|
||||||
|
|
||||||
def test_kill_dhcp(self, dhcp_active=True):
|
def test_kill_dhcp(self, dhcp_active=True):
|
||||||
conf = mock.Mock()
|
conf = mock.Mock()
|
||||||
conf.dhcp_driver = 'driver'
|
conf.dhcp_driver = 'driver'
|
||||||
|
@ -26,16 +26,6 @@ from neutron.tests import base
|
|||||||
|
|
||||||
class TestOVSCleanup(base.BaseTestCase):
|
class TestOVSCleanup(base.BaseTestCase):
|
||||||
|
|
||||||
def setup_config(self):
|
|
||||||
# don't use default config
|
|
||||||
pass
|
|
||||||
|
|
||||||
def test_setup_conf(self):
|
|
||||||
conf = util.setup_conf()
|
|
||||||
self.assertEqual(conf.external_network_bridge, 'br-ex')
|
|
||||||
self.assertEqual(conf.ovs_integration_bridge, 'br-int')
|
|
||||||
self.assertFalse(conf.ovs_all_ports)
|
|
||||||
|
|
||||||
def test_main(self):
|
def test_main(self):
|
||||||
bridges = ['br-int', 'br-ex']
|
bridges = ['br-int', 'br-ex']
|
||||||
ports = ['p1', 'p2', 'p3']
|
ports = ['p1', 'p2', 'p3']
|
||||||
|
@ -40,7 +40,6 @@ from neutron.tests import base
|
|||||||
class PolicyFileTestCase(base.BaseTestCase):
|
class PolicyFileTestCase(base.BaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(PolicyFileTestCase, self).setUp()
|
super(PolicyFileTestCase, self).setUp()
|
||||||
self.addCleanup(policy.reset)
|
|
||||||
self.context = context.Context('fake', 'fake', is_admin=False)
|
self.context = context.Context('fake', 'fake', is_admin=False)
|
||||||
self.target = {'tenant_id': 'fake'}
|
self.target = {'tenant_id': 'fake'}
|
||||||
|
|
||||||
@ -66,7 +65,6 @@ class PolicyFileTestCase(base.BaseTestCase):
|
|||||||
class PolicyTestCase(base.BaseTestCase):
|
class PolicyTestCase(base.BaseTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(PolicyTestCase, self).setUp()
|
super(PolicyTestCase, self).setUp()
|
||||||
self.addCleanup(policy.reset)
|
|
||||||
# NOTE(vish): preload rules to circumvent reloading from file
|
# NOTE(vish): preload rules to circumvent reloading from file
|
||||||
rules = {
|
rules = {
|
||||||
"true": '@',
|
"true": '@',
|
||||||
@ -174,7 +172,6 @@ class DefaultPolicyTestCase(base.BaseTestCase):
|
|||||||
jsonutils.dump(self.rules, policyfile)
|
jsonutils.dump(self.rules, policyfile)
|
||||||
cfg.CONF.set_override('policy_file', tmpfilename)
|
cfg.CONF.set_override('policy_file', tmpfilename)
|
||||||
policy.refresh()
|
policy.refresh()
|
||||||
self.addCleanup(policy.reset)
|
|
||||||
|
|
||||||
self.context = context.Context('fake', 'fake')
|
self.context = context.Context('fake', 'fake')
|
||||||
|
|
||||||
@ -201,10 +198,13 @@ FAKE_RESOURCE = {"%ss" % FAKE_RESOURCE_NAME:
|
|||||||
|
|
||||||
class NeutronPolicyTestCase(base.BaseTestCase):
|
class NeutronPolicyTestCase(base.BaseTestCase):
|
||||||
|
|
||||||
|
def fakepolicyinit(self, **kwargs):
|
||||||
|
enf = policy._ENFORCER
|
||||||
|
enf.set_rules(common_policy.Rules(self.rules))
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(NeutronPolicyTestCase, self).setUp()
|
super(NeutronPolicyTestCase, self).setUp()
|
||||||
policy.refresh()
|
policy.refresh()
|
||||||
self.addCleanup(policy.reset)
|
|
||||||
self.admin_only_legacy = "role:admin"
|
self.admin_only_legacy = "role:admin"
|
||||||
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
|
self.admin_or_owner_legacy = "role:admin or tenant_id:%(tenant_id)s"
|
||||||
# Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
|
# Add a Fake 'something' resource to RESOURCE_ATTRIBUTE_MAP
|
||||||
@ -245,16 +245,12 @@ class NeutronPolicyTestCase(base.BaseTestCase):
|
|||||||
"rule:shared"
|
"rule:shared"
|
||||||
}.items())
|
}.items())
|
||||||
|
|
||||||
def fakepolicyinit(**kwargs):
|
|
||||||
enf = policy._ENFORCER
|
|
||||||
enf.set_rules(common_policy.Rules(self.rules))
|
|
||||||
|
|
||||||
def remove_fake_resource():
|
def remove_fake_resource():
|
||||||
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
|
del attributes.RESOURCE_ATTRIBUTE_MAP["%ss" % FAKE_RESOURCE_NAME]
|
||||||
|
|
||||||
self.patcher = mock.patch.object(neutron.policy,
|
self.patcher = mock.patch.object(neutron.policy,
|
||||||
'init',
|
'init',
|
||||||
new=fakepolicyinit)
|
new=self.fakepolicyinit)
|
||||||
self.patcher.start()
|
self.patcher.start()
|
||||||
self.addCleanup(remove_fake_resource)
|
self.addCleanup(remove_fake_resource)
|
||||||
self.context = context.Context('fake', 'fake', roles=['user'])
|
self.context = context.Context('fake', 'fake', roles=['user'])
|
||||||
@ -500,7 +496,7 @@ class NeutronPolicyTestCase(base.BaseTestCase):
|
|||||||
# Trigger a policy with rule admin_or_owner
|
# Trigger a policy with rule admin_or_owner
|
||||||
action = "create_network"
|
action = "create_network"
|
||||||
target = {'tenant_id': 'fake'}
|
target = {'tenant_id': 'fake'}
|
||||||
policy.init()
|
self.fakepolicyinit()
|
||||||
self.assertRaises(exceptions.PolicyCheckError,
|
self.assertRaises(exceptions.PolicyCheckError,
|
||||||
policy.enforce,
|
policy.enforce,
|
||||||
self.context, action, target)
|
self.context, action, target)
|
||||||
|
Loading…
Reference in New Issue
Block a user