Adding manager role support

This patch adds support for the MANAGER role in the Neutron API's
policies.
It also adds unit tests to cover MANAGER role privileges in all APIs.

This is implementation of the phase 3 of the Secure RBAC community goal
[1]

[1] https://governance.openstack.org/tc/goals/selected/consistent-and-secure-rbac.html#phase-3

Change-Id: I377449050cb8aba56e167eba0981213831b8d7f2
This commit is contained in:
Slawek Kaplonski 2024-07-04 15:14:47 +02:00
parent cfab008eef
commit 7c7dc26a01
40 changed files with 1241 additions and 250 deletions

View File

@ -22,6 +22,10 @@ SERVICE = 'rule:service_api'
# there is now ADMIN role
ADMIN = "rule:admin_only"
# This check string is the primary use case for the project's manager who is
# more privileged user then typical MEMBER of the project.
PROJECT_MANAGER = 'role:manager and project_id:%(project_id)s'
# This check string is the primary use case for typical end-users, who are
# working with resources that belong to a project (e.g., creating ports and
# routers).
@ -38,6 +42,8 @@ PROJECT_READER = 'role:reader and project_id:%(project_id)s'
# project member should only be able to delete routers in their project).
ADMIN_OR_SERVICE = (
'(' + ADMIN + ') or (' + SERVICE + ')')
ADMIN_OR_PROJECT_MANAGER = (
'(' + ADMIN + ') or (' + PROJECT_MANAGER + ')')
ADMIN_OR_PROJECT_MEMBER = (
'(' + ADMIN + ') or (' + PROJECT_MEMBER + ')')
ADMIN_OR_PROJECT_READER = (
@ -54,8 +60,11 @@ RULE_SG_OWNER = 'rule:sg_owner'
# that becasue those resources (QOS rules, FIP PFs) don't have project_id
# attribute at all and they belongs to the same project as parent resource (QoS
# policy, FIP).
PARENT_OWNER_MANAGER = 'role:manager and ' + RULE_PARENT_OWNER
PARENT_OWNER_MEMBER = 'role:member and ' + RULE_PARENT_OWNER
PARENT_OWNER_READER = 'role:reader and ' + RULE_PARENT_OWNER
ADMIN_OR_PARENT_OWNER_MANAGER = (
'(' + ADMIN + ') or (' + PARENT_OWNER_MANAGER + ')')
ADMIN_OR_PARENT_OWNER_MEMBER = (
'(' + ADMIN + ') or (' + PARENT_OWNER_MEMBER + ')')
ADMIN_OR_PARENT_OWNER_READER = (

View File

@ -58,7 +58,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_floatingip:floating_ip_address',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
description='Create a floating IP with a specific IP address',
operations=[
{

View File

@ -28,7 +28,7 @@ RESOURCE_PATH = '/log/logs/{id}'
rules = [
policy.DocumentedRuleDefault(
name='get_loggable_resource',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Get loggable resources',
operations=[
@ -45,7 +45,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_log',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Create a network log',
operations=[
@ -62,7 +62,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='get_log',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Get a network log',
operations=[
@ -83,7 +83,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_log',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Update a network log',
operations=[
@ -100,7 +100,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_log',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Delete a network log',
operations=[

View File

@ -30,7 +30,7 @@ RULE_RESOURCE_PATH = '/metering/metering-label-rules/{id}'
rules = [
policy.DocumentedRuleDefault(
name='create_metering_label',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Create a metering label',
operations=[
@ -68,7 +68,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_metering_label',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Delete a metering label',
operations=[
@ -85,7 +85,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_metering_label_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Create a metering label rule',
operations=[
@ -123,7 +123,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_metering_label_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Delete a metering label rule',
operations=[

View File

@ -83,6 +83,7 @@ rules = [
check_str=neutron_policy.policy_or(
'not rule:network_device',
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
@ -101,6 +102,7 @@ rules = [
name='create_port:mac_address',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description='Specify ``mac_address`` attribute when creating a port',
@ -117,6 +119,7 @@ rules = [
name='create_port:fixed_ips',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER,
'rule:shared'),
scope_types=['project'],
@ -135,6 +138,7 @@ rules = [
name='create_port:fixed_ips:ip_address',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description='Specify IP address in ``fixed_ips`` when creating a port',
@ -151,6 +155,7 @@ rules = [
name='create_port:fixed_ips:subnet_id',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER,
'rule:shared'),
scope_types=['project'],
@ -169,6 +174,7 @@ rules = [
name='create_port:port_security_enabled',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description=(
@ -233,7 +239,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:allowed_address_pairs',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description=(
'Specify ``allowed_address_pairs`` '
@ -248,7 +256,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:allowed_address_pairs:mac_address',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description=(
'Specify ``mac_address` of `allowed_address_pairs`` '
@ -263,7 +273,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port:allowed_address_pairs:ip_address',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description=(
'Specify ``ip_address`` of ``allowed_address_pairs`` '
@ -407,6 +419,7 @@ rules = [
check_str=neutron_policy.policy_or(
'not rule:network_device',
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER,
),
scope_types=['project'],
@ -425,6 +438,7 @@ rules = [
name='update_port:mac_address',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER
),
scope_types=['project'],
description='Update ``mac_address`` attribute of a port',
@ -441,6 +455,7 @@ rules = [
name='update_port:fixed_ips',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
@ -458,6 +473,7 @@ rules = [
name='update_port:fixed_ips:ip_address',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
@ -478,6 +494,7 @@ rules = [
name='update_port:fixed_ips:subnet_id',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER,
'rule:shared'
),
@ -500,6 +517,7 @@ rules = [
name='update_port:port_security_enabled',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_SERVICE,
base.PROJECT_MANAGER,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
@ -556,7 +574,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:allowed_address_pairs',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description='Update ``allowed_address_pairs`` attribute of a port',
operations=ACTION_PUT,
@ -568,7 +588,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:allowed_address_pairs:mac_address',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description=(
'Update ``mac_address`` of ``allowed_address_pairs`` '
@ -583,7 +605,9 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_port:allowed_address_pairs:ip_address',
check_str=base.ADMIN_OR_NET_OWNER_MEMBER,
check_str=neutron_policy.policy_or(
base.ADMIN_OR_NET_OWNER_MEMBER,
base.PROJECT_MANAGER),
scope_types=['project'],
description=(
'Update ``ip_address`` of ``allowed_address_pairs`` '

View File

@ -52,7 +52,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Create a QoS policy',
operations=[
@ -69,7 +69,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Update a QoS policy',
operations=[
@ -86,7 +86,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Delete a QoS policy',
operations=[
@ -152,7 +152,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy_bandwidth_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Create a QoS bandwidth limit rule',
operations=[
@ -169,7 +169,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy_bandwidth_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS bandwidth limit rule',
operations=[
@ -187,7 +187,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy_bandwidth_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS bandwidth limit rule',
operations=[
@ -223,7 +223,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy_packet_rate_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Create a QoS packet rate limit rule',
operations=[
@ -235,7 +235,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy_packet_rate_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS packet rate limit rule',
operations=[
@ -248,7 +248,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy_packet_rate_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS packet rate limit rule',
operations=[
@ -284,7 +284,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy_dscp_marking_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Create a QoS DSCP marking rule',
operations=[
@ -301,7 +301,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy_dscp_marking_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS DSCP marking rule',
operations=[
@ -319,7 +319,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy_dscp_marking_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS DSCP marking rule',
operations=[
@ -360,7 +360,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy_minimum_bandwidth_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Create a QoS minimum bandwidth rule',
operations=[
@ -377,7 +377,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy_minimum_bandwidth_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS minimum bandwidth rule',
operations=[
@ -395,7 +395,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy_minimum_bandwidth_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS minimum bandwidth rule',
operations=[
@ -430,7 +430,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_policy_minimum_packet_rate_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Create a QoS minimum packet rate rule',
operations=[
@ -442,7 +442,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_policy_minimum_packet_rate_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS minimum packet rate rule',
operations=[
@ -455,7 +455,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_policy_minimum_packet_rate_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS minimum packet rate rule',
operations=[
@ -485,7 +485,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_alias_bandwidth_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS bandwidth limit rule through alias',
operations=[
@ -502,7 +502,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_alias_bandwidth_limit_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS bandwidth limit rule through alias',
operations=[
@ -536,7 +536,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_alias_dscp_marking_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS DSCP marking rule through alias',
operations=[
@ -553,7 +553,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_alias_dscp_marking_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS DSCP marking rule through alias',
operations=[
@ -587,7 +587,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='update_alias_minimum_bandwidth_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Update a QoS minimum bandwidth rule through alias',
operations=[
@ -604,7 +604,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_alias_minimum_bandwidth_rule',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PARENT_OWNER_MANAGER,
scope_types=['project'],
description='Delete a QoS minimum bandwidth rule through alias',
operations=[

View File

@ -28,7 +28,7 @@ RESOURCE_PATH = '/quota/{id}'
rules = [
policy.DocumentedRuleDefault(
name='get_quota',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_PROJECT_MANAGER,
scope_types=['project'],
description='Get a resource quota',
operations=[

View File

@ -71,6 +71,21 @@ class AdminTests(AddressGroupAPITestCase):
policy.enforce(self.context, "get_address_group", self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_address_group(self):
self.assertTrue(
policy.enforce(self.context, "get_address_group", self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_address_group", self.alt_target)
class ProjectMemberTests(AdminTests):
def setUp(self):

View File

@ -158,6 +158,65 @@ class AdminTests(AddressScopeAPITestCase):
self.context, 'delete_address_scope', self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_address_scope(self):
self.assertTrue(
policy.enforce(self.context, 'create_address_scope', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_address_scope', self.alt_target)
def test_create_address_scope_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_address_scope:shared', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_address_scope:shared', self.alt_target)
def test_get_address_scope(self):
self.assertTrue(
policy.enforce(self.context, 'get_address_scope', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_address_scope', self.alt_target)
def test_update_address_scope(self):
self.assertTrue(
policy.enforce(self.context, 'update_address_scope', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_address_scope', self.alt_target)
def test_update_address_scope_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_address_scope:shared', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_address_scope:shared', self.alt_target)
def test_delete_address_scope(self):
self.assertTrue(
policy.enforce(self.context, 'delete_address_scope', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_address_scope', self.alt_target)
class ProjectMemberTests(AdminTests):
def setUp(self):

View File

@ -186,11 +186,11 @@ class AdminTests(AgentAPITestCase):
"get_l3-agents", self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_agent(self):
self.assertRaises(
@ -265,6 +265,12 @@ class ProjectMemberTests(AdminTests):
self.context, "get_l3-agents", self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -107,11 +107,11 @@ class AdminTests(AutoAllocatedTopologyAPITestCase):
policy.enforce(self.context, DELETE_POLICY, self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_topology(self):
self.assertTrue(policy.enforce(self.context, GET_POLICY, self.target))
@ -134,6 +134,13 @@ class ProjectMemberTests(AdminTests):
)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -64,7 +64,14 @@ class AdminTests(AvailabilityZoneAPITestCase):
policy.enforce(self.context, "get_availability_zone", self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()

View File

@ -89,7 +89,11 @@ class PolicyBaseTestCase(tests_base.BaseTestCase):
def _prepare_project_scope_personas(self):
self.project_admin_ctx = context.Context(
user_id=self.user_id,
roles=['admin', 'member', 'reader'],
roles=['admin', 'manager', 'member', 'reader'],
project_id=self.project_id)
self.project_manager_ctx = context.Context(
user_id=self.user_id,
roles=['manager', 'member', 'reader'],
project_id=self.project_id)
self.project_member_ctx = context.Context(
user_id=self.user_id,

View File

@ -88,12 +88,12 @@ class AdminDefaultSecurityGroupRuleTests(DefaultSecurityGroupRuleAPITestCase):
'delete_default_security_group_rule', self.target))
class ProjectMemberDefaultSecurityGroupRuleTests(
class ProjectManagerDefaultSecurityGroupRuleTests(
AdminDefaultSecurityGroupRuleTests):
def setUp(self):
super(ProjectMemberDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_default_security_group_rule(self):
self.assertRaises(
@ -113,6 +113,14 @@ class ProjectMemberDefaultSecurityGroupRuleTests(
self.context, 'delete_default_security_group_rule', self.target)
class ProjectMemberDefaultSecurityGroupRuleTests(
ProjectManagerDefaultSecurityGroupRuleTests):
def setUp(self):
super(ProjectMemberDefaultSecurityGroupRuleTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderDefaultSecurityGroupRuleTests(
ProjectMemberDefaultSecurityGroupRuleTests):

View File

@ -152,11 +152,11 @@ class AdminTests(FlavorAPITestCase):
'delete_flavor_service_profile', self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_flavor(self):
self.assertRaises(
@ -215,6 +215,13 @@ class ProjectMemberTests(AdminTests):
self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -180,11 +180,11 @@ class AdminTests(FloatingIPAPITestCase):
policy.enforce(self.context, "delete_floatingip", self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_floatingip(self):
self.assertTrue(
@ -195,11 +195,8 @@ class ProjectMemberTests(AdminTests):
self.context, "create_floatingip", self.alt_target)
def test_create_floatingip_with_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_floatingip:floating_ip_address",
self.target)
self.assertTrue(
policy.enforce(self.context, "create_floatingip", self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
@ -245,6 +242,25 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, "delete_floatingip", self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_create_floatingip_with_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_floatingip:floating_ip_address",
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_floatingip:floating_ip_address",
self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -69,11 +69,11 @@ class AdminTests(FloatingipPoolsAPITestCase):
self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_floatingip_pool(self):
self.assertTrue(
@ -85,6 +85,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'get_floatingip_pool', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -197,11 +197,11 @@ class AdminTests(FloatingipPortForwardingAPITestCase):
self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_fip_pf(self):
with mock.patch.object(self.plugin_mock, 'get_floatingip',
@ -264,6 +264,13 @@ class ProjectMemberTests(AdminTests):
self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -146,11 +146,11 @@ class AdminTests(L3ConntrackHelperAPITestCase):
'delete_router_conntrack_helper', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_router_conntrack_helper(self):
self.assertTrue(
@ -189,6 +189,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_router_conntrack_helper', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -103,11 +103,11 @@ class AdminTests(LocalIPAPITestCase):
policy.enforce(self.context, "delete_local_ip", self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_local_ip(self):
self.assertTrue(
@ -138,6 +138,13 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, "delete_local_ip", self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(LocalIPAPITestCase):
def setUp(self):

View File

@ -140,11 +140,11 @@ class AdminTests(LocalIPAssociationAPITestCase):
self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_local_ip_port_association(self):
self.assertTrue(
@ -180,6 +180,13 @@ class ProjectMemberTests(AdminTests):
self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -99,7 +99,34 @@ class AdminTests(LoggingAPITestCase):
policy.enforce(self.context, 'delete_log', self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_loggable_resource(self):
self.assertTrue(
policy.enforce(self.context, 'get_loggable_resource', self.target))
def test_create_log(self):
self.assertTrue(
policy.enforce(self.context, 'create_log', self.target))
def test_get_log(self):
self.assertTrue(
policy.enforce(self.context, 'get_log', self.target))
def test_update_log(self):
self.assertTrue(
policy.enforce(self.context, 'update_log', self.target))
def test_delete_log(self):
self.assertTrue(
policy.enforce(self.context, 'delete_log', self.target))
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()

View File

@ -160,7 +160,65 @@ class AdminTests(MeteringAPITestCase):
self.context, 'delete_metering_label_rule', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_metering_label(self):
self.assertTrue(
policy.enforce(self.context, 'create_metering_label', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_metering_label', self.alt_target)
def test_get_metering_label(self):
self.assertTrue(
policy.enforce(self.context, 'get_metering_label', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_metering_label', self.alt_target)
def test_delete_metering_label(self):
self.assertTrue(
policy.enforce(self.context, 'delete_metering_label', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_metering_label', self.alt_target)
def test_create_metering_label_rule(self):
self.assertTrue(
policy.enforce(
self.context, 'create_metering_label_rule', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_metering_label_rule', self.alt_target)
def test_get_metering_label_rule(self):
self.assertTrue(
policy.enforce(
self.context, 'get_metering_label_rule', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_metering_label_rule', self.alt_target)
def test_delete_metering_label_rule(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_metering_label_rule', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_metering_label_rule', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()

View File

@ -119,11 +119,11 @@ class AdminTests(NDPProxyAPITestCase):
policy.enforce(self.context, "delete_ndp_proxy", self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_ndp_proxy(self):
self.assertTrue(
@ -155,6 +155,13 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, "delete_ndp_proxy", self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -555,11 +555,11 @@ class AdminTests(NetworkAPITestCase):
self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_network(self):
self.assertTrue(
@ -834,6 +834,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_networks_tags', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -65,11 +65,11 @@ class AdminTests(NetworkIPAvailabilityAPITestCase):
self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_network_ip_availability(self):
self.assertRaises(
@ -78,6 +78,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'get_network_ip_availability', self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -131,11 +131,11 @@ class AdminTests(NetworkSegmentRangeAPITestCase):
'delete_network_segment_ranges_tags', self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_network_segment_range(self):
self.assertRaises(
@ -180,6 +180,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_network_segment_ranges_tags', self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -754,11 +754,11 @@ class AdminTests(PortAPITestCase):
policy.enforce(self.context, 'delete_port', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_port(self):
self.assertTrue(
@ -782,50 +782,45 @@ class ProjectMemberTests(AdminTests):
alt_target)
def test_create_port_with_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:mac_address',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:mac_address', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:mac_address',
self.alt_target)
def test_create_port_with_fixed_ips(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips',
self.alt_target)
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:fixed_ips:ip_address', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.alt_target)
def test_create_port_with_fixed_ips_and_subnet_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:subnet_id',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:fixed_ips:subnet_id', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:subnet_id',
self.alt_target)
def test_create_port_with_port_security_enabled(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:port_security_enabled',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:port_security_enabled', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:port_security_enabled',
@ -861,11 +856,9 @@ class ProjectMemberTests(AdminTests):
self.alt_target)
def test_create_port_with_allowed_address_pairs(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:allowed_address_pairs', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
@ -873,11 +866,10 @@ class ProjectMemberTests(AdminTests):
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:mac_address',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:allowed_address_pairs:mac_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
@ -885,11 +877,10 @@ class ProjectMemberTests(AdminTests):
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:ip_address',
self.target)
self.assertTrue(
policy.enforce(self.context,
'create_port:allowed_address_pairs:ip_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
@ -1003,6 +994,267 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, 'update_port:device_owner',
alt_target)
def test_update_port_with_mac_address(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:mac_address', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:mac_address',
self.alt_target)
def test_update_port_with_fixed_ips(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:fixed_ips', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips',
self.alt_target)
def test_update_port_with_fixed_ips_and_ip_address(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:fixed_ips:ip_address', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:ip_address',
self.alt_target)
def test_update_port_with_fixed_ips_and_subnet_id(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:fixed_ips:subnet_id', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:fixed_ips:subnet_id',
self.alt_target)
def test_update_port_with_port_security_enabled(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:port_security_enabled', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:port_security_enabled',
self.alt_target)
def test_update_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.alt_target)
def test_update_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.alt_target)
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:vnic_type', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:vnic_type',
self.alt_target)
def test_update_port_with_allowed_address_pairs(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:allowed_address_pairs', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs',
self.alt_target)
def test_update_port_with_allowed_address_pairs_and_mac_address(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:allowed_address_pairs:mac_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:mac_address',
self.alt_target)
def test_update_port_with_allowed_address_pairs_and_ip_address(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:allowed_address_pairs:ip_address',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:ip_address',
self.alt_target)
def test_update_port_data_plane_status(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:data_plane_status', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:data_plane_status', self.alt_target)
def test_update_port_hints(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:hints', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:hints', self.alt_target)
def test_update_ports_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_ports_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ports_tags', self.alt_target)
def test_delete_port(self):
self.assertTrue(
policy.enforce(self.context, 'delete_port', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_port', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_create_port_with_device_owner(self):
target = self.target.copy()
target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test'
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:device_owner',
target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:device_owner',
alt_target)
def test_create_port_with_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:mac_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:mac_address',
self.alt_target)
def test_create_port_with_fixed_ips(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips',
self.alt_target)
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:ip_address',
self.alt_target)
def test_create_port_with_fixed_ips_and_subnet_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:subnet_id',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:fixed_ips:subnet_id',
self.alt_target)
def test_create_port_with_port_security_enabled(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:port_security_enabled',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:port_security_enabled',
self.alt_target)
def test_create_port_with_allowed_address_pairs(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs',
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:mac_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:mac_address',
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:ip_address',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:ip_address',
self.alt_target)
def test_update_port_with_device_owner(self):
target = self.target.copy()
target['device_owner'] = 'network:test'
alt_target = self.alt_target.copy()
alt_target['device_owner'] = 'network:test'
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:device_owner',
target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:device_owner',
alt_target)
def test_update_port_with_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -1053,35 +1305,6 @@ class ProjectMemberTests(AdminTests):
policy.enforce, self.context, 'update_port:port_security_enabled',
self.alt_target)
def test_update_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.alt_target)
def test_update_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.alt_target)
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(
policy.enforce(self.context,
'update_port:binding:vnic_type', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:vnic_type',
self.alt_target)
def test_update_port_with_allowed_address_pairs(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -1118,40 +1341,6 @@ class ProjectMemberTests(AdminTests):
self.context, 'update_port:allowed_address_pairs:ip_address',
self.alt_target)
def test_update_port_data_plane_status(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:data_plane_status', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:data_plane_status', self.alt_target)
def test_update_port_hints(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:hints', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:hints', self.alt_target)
def test_update_ports_tags(self):
self.assertTrue(
policy.enforce(self.context, 'update_ports_tags', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ports_tags', self.alt_target)
def test_delete_port(self):
self.assertTrue(
policy.enforce(self.context, 'delete_port', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_port', self.alt_target)
class ProjectReaderTests(ProjectMemberTests):

View File

@ -100,11 +100,11 @@ class AdminTests(PortBindingsAPITestCase):
self.context, "activate", self.target)
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_port_binding(self):
self.assertRaises(
@ -113,6 +113,13 @@ class ProjectMemberTests(AdminTests):
self.context, "get_port_binding", self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -114,7 +114,42 @@ class AdminQosPolicyTests(QosPolicyAPITestCase):
policy.enforce(self.context, 'delete_policy', self.alt_target))
class ProjectMemberQosPolicyTests(AdminQosPolicyTests):
class ProjectManagerQosPolicyTests(AdminQosPolicyTests):
def setUp(self):
super(ProjectManagerQosPolicyTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy(self):
self.assertTrue(
policy.enforce(self.context, 'get_policy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_policy', self.alt_target)
def test_create_policy(self):
self.assertTrue(
policy.enforce(self.context, 'create_policy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_policy', self.alt_target)
def test_update_policy(self):
self.assertTrue(
policy.enforce(self.context, 'update_policy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_policy', self.alt_target)
def test_delete_policy(self):
self.assertTrue(
policy.enforce(self.context, 'delete_policy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_policy', self.alt_target)
class ProjectMemberQosPolicyTests(ProjectManagerQosPolicyTests):
def setUp(self):
super(ProjectMemberQosPolicyTests, self).setUp()
@ -231,7 +266,14 @@ class AdminQosRuleTypeTests(QosRuleTypeAPITestCase):
policy.enforce(self.context, 'get_rule_type', self.target))
class ProjectMemberQosRuleTypeTests(AdminQosRuleTypeTests):
class ProjectManagerQosRuleTypeTests(AdminQosRuleTypeTests):
def setUp(self):
super(ProjectManagerQosRuleTypeTests, self).setUp()
self.context = self.project_manager_ctx
class ProjectMemberQosRuleTypeTests(ProjectManagerQosRuleTypeTests):
def setUp(self):
super(ProjectMemberQosRuleTypeTests, self).setUp()
@ -477,12 +519,11 @@ class AdminQosBandwidthLimitRuleTests(QosRulesAPITestCase):
self.alt_target))
class ProjectMemberQosBandwidthLimitRuleTests(
AdminQosBandwidthLimitRuleTests):
class ProjectManagerQosBandwidthLimitRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ProjectMemberQosBandwidthLimitRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerQosBandwidthLimitRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy_bandwidth_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
@ -512,6 +553,85 @@ class ProjectMemberQosBandwidthLimitRuleTests(
self.context, 'get_alias_bandwidth_limit_rule',
self.alt_target)
def test_create_policy_bandwidth_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'create_policy_bandwidth_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_bandwidth_limit_rule',
self.alt_target)
def test_update_policy_bandwidth_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'update_policy_bandwidth_limit_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'update_alias_bandwidth_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_bandwidth_limit_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_bandwidth_limit_rule',
self.alt_target)
def test_delete_policy_bandwidth_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'delete_policy_bandwidth_limit_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'delete_alias_bandwidth_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_bandwidth_limit_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_bandwidth_limit_rule',
self.alt_target)
class ProjectMemberQosBandwidthLimitRuleTests(
ProjectManagerQosBandwidthLimitRuleTests):
def setUp(self):
super(ProjectMemberQosBandwidthLimitRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_policy_bandwidth_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -745,12 +865,11 @@ class AdminQosPacketRateLimitRuleTests(QosRulesAPITestCase):
self.alt_target))
class ProjectMemberQosPacketRateLimitRuleTests(
AdminQosPacketRateLimitRuleTests):
class ProjectManagerQosPacketRateLimitRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ProjectMemberQosPacketRateLimitRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerQosPacketRateLimitRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy_packet_rate_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
@ -768,6 +887,60 @@ class ProjectMemberQosPacketRateLimitRuleTests(
self.context, 'get_policy_packet_rate_limit_rule',
self.alt_target)
def test_create_policy_packet_rate_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'create_policy_packet_rate_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_packet_rate_limit_rule',
self.alt_target)
def test_update_policy_packet_rate_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'update_policy_packet_rate_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_packet_rate_limit_rule',
self.alt_target)
def test_delete_policy_packet_rate_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'delete_policy_packet_rate_limit_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_packet_rate_limit_rule',
self.alt_target)
class ProjectMemberQosPacketRateLimitRuleTests(
ProjectManagerQosPacketRateLimitRuleTests):
def setUp(self):
super(ProjectMemberQosPacketRateLimitRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_policy_packet_rate_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -1042,12 +1215,11 @@ class AdminQosDSCPMarkingRuleTests(QosRulesAPITestCase):
self.alt_target))
class ProjectMemberQosDSCPMarkingRuleTests(
AdminQosDSCPMarkingRuleTests):
class ProjectManagerQosDSCPMarkingRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ProjectMemberQosDSCPMarkingRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerQosDSCPMarkingRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy_dscp_marking_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
@ -1076,6 +1248,81 @@ class ProjectMemberQosDSCPMarkingRuleTests(
self.context, 'get_alias_dscp_marking_rule',
self.alt_target)
def test_create_policy_dscp_marking_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'create_policy_dscp_marking_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_dscp_marking_rule',
self.alt_target)
def test_update_policy_dscp_marking_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'update_policy_dscp_marking_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'update_alias_dscp_marking_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_dscp_marking_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_dscp_marking_rule',
self.alt_target)
def test_delete_policy_dscp_marking_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'delete_policy_dscp_marking_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'delete_alias_dscp_marking_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_dscp_marking_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_dscp_marking_rule',
self.alt_target)
class ProjectMemberQosDSCPMarkingRuleTests(
ProjectManagerQosDSCPMarkingRuleTests):
def setUp(self):
super(ProjectMemberQosDSCPMarkingRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_policy_dscp_marking_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -1397,12 +1644,11 @@ class AdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
self.alt_target))
class ProjectMemberQosMinimumBandwidthRuleTests(
AdminQosMinimumBandwidthRuleTests):
class ProjectManagerQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ProjectMemberQosMinimumBandwidthRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerQosMinimumBandwidthRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy_minimum_bandwidth_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
@ -1431,6 +1677,83 @@ class ProjectMemberQosMinimumBandwidthRuleTests(
self.context, 'get_alias_minimum_bandwidth_rule',
self.alt_target)
def test_create_policy_minimum_bandwidth_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(
self.context, 'create_policy_minimum_bandwidth_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_minimum_bandwidth_rule',
self.alt_target)
def test_update_policy_minimum_bandwidth_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(
self.context, 'update_policy_minimum_bandwidth_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(
self.context, 'update_alias_minimum_bandwidth_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_minimum_bandwidth_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_minimum_bandwidth_rule',
self.alt_target)
def test_delete_policy_minimum_bandwidth_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(
self.context, 'delete_policy_minimum_bandwidth_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(
self.context, 'delete_alias_minimum_bandwidth_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_minimum_bandwidth_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_minimum_bandwidth_rule',
self.alt_target)
class ProjectMemberQosMinimumBandwidthRuleTests(
ProjectManagerQosMinimumBandwidthRuleTests):
def setUp(self):
super(ProjectMemberQosMinimumBandwidthRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_policy_minimum_bandwidth_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -1741,12 +2064,11 @@ class AdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
self.alt_target))
class ProjectMemberQosMinimumPacketRateRuleTests(
AdminQosMinimumPacketRateRuleTests):
class ProjectManagerQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ProjectMemberQosMinimumPacketRateRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerQosMinimumPacketRateRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_policy_minimum_packet_rate_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
@ -1775,6 +2097,83 @@ class ProjectMemberQosMinimumPacketRateRuleTests(
self.context, 'get_alias_minimum_packet_rate_rule',
self.alt_target)
def test_create_policy_minimum_packet_rate_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'create_policy_minimum_packet_rate_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_minimum_packet_rate_rule',
self.alt_target)
def test_update_policy_minimum_packet_rate_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'update_policy_minimum_packet_rate_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'update_alias_minimum_packet_rate_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_minimum_packet_rate_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_minimum_packet_rate_rule',
self.alt_target)
def test_delete_policy_minimum_packet_rate_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertTrue(
policy.enforce(self.context,
'delete_policy_minimum_packet_rate_rule',
self.target))
# And the same for aliases
self.assertTrue(
policy.enforce(self.context,
'delete_alias_minimum_packet_rate_rule',
self.target))
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.alt_qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_minimum_packet_rate_rule',
self.alt_target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_minimum_packet_rate_rule',
self.alt_target)
class ProjectMemberQosMinimumPacketRateRuleTests(
ProjectManagerQosMinimumPacketRateRuleTests):
def setUp(self):
super(ProjectMemberQosMinimumPacketRateRuleTests, self).setUp()
self.context = self.project_member_ctx
def test_create_policy_minimum_packet_rate_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,

View File

@ -103,17 +103,15 @@ class AdminTests(QuoatsAPITestCase):
policy.enforce(self.context, 'delete_quota', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_get_quota(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_quota', self.target)
self.assertTrue(
policy.enforce(self.context, 'get_quota', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
@ -140,6 +138,23 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_quota', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_get_quota(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_quota', self.target)
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_quota', self.alt_target)
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -170,11 +170,11 @@ class AdminTests(RbacAPITestCase):
self.context, 'delete_rbac_policy', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_rbac_policy(self):
self.assertTrue(
@ -233,6 +233,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_rbac_policy', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -564,11 +564,11 @@ class AdminTests(RouterAPITestCase):
'remove_router_interface', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_router(self):
self.assertTrue(
@ -829,6 +829,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'remove_router_interface', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -180,11 +180,11 @@ class AdminSecurityGroupTests(SecurityGroupAPITestCase):
self.alt_target))
class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
class ProjectManagerSecurityGroupTests(AdminSecurityGroupTests):
def setUp(self):
super(ProjectMemberSecurityGroupTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerSecurityGroupTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_security_group(self):
self.assertTrue(
@ -244,6 +244,13 @@ class ProjectMemberSecurityGroupTests(AdminSecurityGroupTests):
self.context, 'delete_security_groups_tags', self.alt_target)
class ProjectMemberSecurityGroupTests(ProjectManagerSecurityGroupTests):
def setUp(self):
super(ProjectMemberSecurityGroupTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderSecurityGroupTests(ProjectMemberSecurityGroupTests):
def setUp(self):
@ -474,11 +481,11 @@ class AdminSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):
'delete_security_group_rule', self.alt_target))
class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
class ProjectManagerSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
def setUp(self):
super(ProjectMemberSecurityGroupRuleTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerSecurityGroupRuleTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_security_group_rule(self):
self.assertTrue(
@ -530,6 +537,14 @@ class ProjectMemberSecurityGroupRuleTests(AdminSecurityGroupRuleTests):
self.context, 'delete_security_group_rule', self.alt_target)
class ProjectMemberSecurityGroupRuleTests(
ProjectManagerSecurityGroupRuleTests):
def setUp(self):
super(ProjectMemberSecurityGroupRuleTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests):
def setUp(self):

View File

@ -124,11 +124,11 @@ class AdminTests(SegmentAPITestCase):
policy.enforce(self.context, 'delete_segments_tags', self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_segment(self):
self.assertRaises(
@ -173,6 +173,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_segments_tags', self.target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -64,7 +64,14 @@ class AdminTests(ServiceTypeAPITestCase):
policy.enforce(self.context, 'get_service_provider', self.target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()

View File

@ -391,11 +391,11 @@ class AdminTests(SubnetAPITestCase):
self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_subnet(self):
self.assertTrue(
@ -550,6 +550,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'delete_subnets_tags', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -276,11 +276,11 @@ class AdminTests(SubnetpoolAPITestCase):
policy.enforce(self.context, 'remove_prefixes', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_subnetpool(self):
self.assertTrue(
@ -396,6 +396,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'remove_prefixes', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -197,11 +197,11 @@ class AdminTests(TrunkAPITestCase):
policy.enforce(self.context, 'remove_subports', self.alt_target))
class ProjectMemberTests(AdminTests):
class ProjectManagerTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
super(ProjectManagerTests, self).setUp()
self.context = self.project_manager_ctx
def test_create_trunk(self):
self.assertTrue(
@ -260,6 +260,13 @@ class ProjectMemberTests(AdminTests):
self.context, 'remove_subports', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):

View File

@ -0,0 +1,7 @@
---
features:
- |
Neutron API RBAC policies now support by default the project MANAGER role.
Please refer to the `community goal
<https://governance.openstack.org/tc/goals/selected/consistent-and-secure-rbac.html#phase-3>`_
for more information.