diff --git a/neutron/policy.py b/neutron/policy.py index dae2b8dd205..779334396ea 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -468,7 +468,10 @@ def check(context, action, target, plugin=None, might_not_exist=False, """ # If we already know the context has admin rights do not perform an # additional check and authorize the operation - if context.is_admin: + # TODO(slaweq): Remove that is_admin check and always perform rules checks + # when old, deprecated rules will be removed and only rules with new + # personas will be supported + if not cfg.CONF.oslo_policy.enforce_new_defaults and context.is_admin: return True if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules): return True @@ -502,7 +505,10 @@ def enforce(context, action, target, plugin=None, pluralized=None): """ # If we already know the context has admin rights do not perform an # additional check and authorize the operation - if context.is_admin: + # TODO(slaweq): Remove that is_admin check and always perform rules checks + # when old, deprecated rules will be removed and only rules with new + # personas will be supported + if not cfg.CONF.oslo_policy.enforce_new_defaults and context.is_admin: return True rule, target, context = _prepare_check(context, action, target, pluralized) try: diff --git a/neutron/tests/unit/test_policy.py b/neutron/tests/unit/test_policy.py index aed620440a5..f0fb87cff24 100644 --- a/neutron/tests/unit/test_policy.py +++ b/neutron/tests/unit/test_policy.py @@ -78,6 +78,8 @@ class PolicyTestCase(base.BaseTestCase): "example:early_or_success": "@ or !", "example:lowercase_admin": "role:admin or role:sysadmin", "example:uppercase_admin": "role:ADMIN or role:sysadmin", + "example:only_system_admin_allowed": ( + "role:admin and system_scope:all"), } policy.refresh() # NOTE(vish): then overload underlying rules @@ -85,6 +87,60 @@ class PolicyTestCase(base.BaseTestCase): self.context = context.Context('fake', 'fake', roles=['member']) self.target = {} + def _test_check_system_admin_allowed_action(self, enforce_new_defaults): + action = "example:only_system_admin_allowed" + cfg.CONF.set_override( + 'enforce_new_defaults', enforce_new_defaults, group='oslo_policy') + project_admin_ctx = context.Context( + user="fake", project_id="fake", + roles=['admin', 'member', 'reader']) + system_admin_ctx = context.Context( + user="fake", project_id="fake", + roles=['admin', 'member', 'reader'], + system_scope='all') + self.assertTrue(policy.check(system_admin_ctx, action, self.target)) + if enforce_new_defaults: + self.assertFalse( + policy.check(project_admin_ctx, action, self.target)) + else: + self.assertTrue( + policy.check(project_admin_ctx, action, self.target)) + + def test_check_only_system_admin_new_defaults(self): + self._test_check_system_admin_allowed_action(enforce_new_defaults=True) + + def test_check_only_system_admin_old_defaults(self): + self._test_check_system_admin_allowed_action( + enforce_new_defaults=False) + + def _test_enforce_system_admin_allowed_action(self, enforce_new_defaults): + action = "example:only_system_admin_allowed" + cfg.CONF.set_override( + 'enforce_new_defaults', enforce_new_defaults, group='oslo_policy') + project_admin_ctx = context.Context( + user="fake", project_id="fake", + roles=['admin', 'member', 'reader']) + system_admin_ctx = context.Context( + user="fake", project_id="fake", + roles=['admin', 'member', 'reader'], + system_scope='all') + self.assertTrue(policy.enforce(system_admin_ctx, action, self.target)) + if enforce_new_defaults: + self.assertRaises( + oslo_policy.PolicyNotAuthorized, + policy.enforce, project_admin_ctx, action, self.target) + else: + self.assertTrue( + policy.enforce(project_admin_ctx, action, self.target)) + + def test_enforce_only_system_admin_new_defaults(self): + self._test_enforce_system_admin_allowed_action( + enforce_new_defaults=True) + + def test_enforce_only_system_admin_old_defaults(self): + self._test_enforce_system_admin_allowed_action( + enforce_new_defaults=False) + def test_enforce_nonexistent_action_throws(self): action = "example:noexist" self.assertRaises(oslo_policy.PolicyNotAuthorized, policy.enforce,