Fix provider sg delete by non admin and non admin rule change

This patch restricts the deleting of an provider security group only
to the admin thus preventing the tenant from deleting it.

It also prevents a non admin user from adding or deleting rules from
this group.

NOTE: we are using the following policy.json entry to prevent the
creation of a provider security group by a normal tenant:

    "create_security_group:provider": "rule:admin_only"

Change-Id: Ie195225654b0c7cd8cfb715691c5a3bb4c8ee13d
This commit is contained in:
Aaron Rosen 2016-08-23 17:17:54 -04:00
parent e64512ed2c
commit bc26f40491
5 changed files with 102 additions and 6 deletions

View File

@ -16,6 +16,7 @@
from oslo_utils import uuidutils from oslo_utils import uuidutils
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import exc
from neutron.api.v2 import attributes from neutron.api.v2 import attributes
from neutron.common import utils as n_utils from neutron.common import utils as n_utils
@ -94,9 +95,12 @@ class ExtendedSecurityGroupPropertiesMixin(object):
def _get_security_group_properties(self, context, security_group_id): def _get_security_group_properties(self, context, security_group_id):
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
prop = context.session.query( try:
NsxExtendedSecurityGroupProperties).filter_by( prop = context.session.query(
security_group_id=security_group_id).one() NsxExtendedSecurityGroupProperties).filter_by(
security_group_id=security_group_id).one()
except exc.NoResultFound:
raise ext_sg.SecurityGroupNotFound(id=security_group_id)
return prop return prop
def _process_security_group_properties_update(self, context, def _process_security_group_properties_update(self, context,
@ -238,6 +242,12 @@ class ExtendedSecurityGroupPropertiesMixin(object):
context, updated_port, context, updated_port,
updated_port[provider_sg.PROVIDER_SECURITYGROUPS]) updated_port[provider_sg.PROVIDER_SECURITYGROUPS])
def _prevent_non_admin_delete_provider_sg(self, context, sg_id):
# Only someone who is an admin is allowed to delete this.
if not context.is_admin and self._is_provider_security_group(context,
sg_id):
raise provider_sg.ProviderSecurityGroupDeleteNotAdmin(id=sg_id)
def _extend_security_group_with_properties(self, sg_res, sg_db): def _extend_security_group_with_properties(self, sg_res, sg_db):
if sg_db.ext_properties: if sg_db.ext_properties:
sg_res[sg_logging.LOGGING] = sg_db.ext_properties.logging sg_res[sg_logging.LOGGING] = sg_db.ext_properties.logging

View File

@ -61,6 +61,11 @@ class DefaultSecurityGroupIsNotProvider(nexception.InvalidInput):
"security-group.") "security-group.")
class ProviderSecurityGroupDeleteNotAdmin(nexception.NotAuthorized):
message = _("Security group %(id)s is a provider security group and "
"requires an admin to delete it.")
class Providersecuritygroup(extensions.ExtensionDescriptor): class Providersecuritygroup(extensions.ExtensionDescriptor):
"""Provider security-group extension.""" """Provider security-group extension."""

View File

@ -2943,6 +2943,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def delete_security_group(self, context, id): def delete_security_group(self, context, id):
"""Delete a security group.""" """Delete a security group."""
self._prevent_non_admin_delete_provider_sg(context, id)
try: try:
# Find nsx rule sections # Find nsx rule sections
section_uri = self._get_section_uri(context.session, id) section_uri = self._get_section_uri(context.session, id)
@ -3042,6 +3043,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
""" """
sg_rules = security_group_rules['security_group_rules'] sg_rules = security_group_rules['security_group_rules']
sg_id = sg_rules[0]['security_group_rule']['security_group_id'] sg_id = sg_rules[0]['security_group_rule']['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, sg_id)
ruleids = set() ruleids = set()
nsx_rules = [] nsx_rules = []
@ -3102,6 +3105,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
"""Delete a security group rule.""" """Delete a security group rule."""
rule_db = self._get_security_group_rule(context, id) rule_db = self._get_security_group_rule(context, id)
security_group_id = rule_db['security_group_id'] security_group_id = rule_db['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, security_group_id)
# Get the nsx rule from neutron DB and delete it # Get the nsx rule from neutron DB and delete it
nsx_rule_id = nsxv_db.get_nsx_rule_id(context.session, id) nsx_rule_id = nsxv_db.get_nsx_rule_id(context.session, id)

View File

@ -2884,6 +2884,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
return secgroup_res return secgroup_res
def delete_security_group(self, context, id): def delete_security_group(self, context, id):
self._prevent_non_admin_delete_provider_sg(context, id)
nsgroup_id, section_id = self.nsxlib.get_sg_mappings( nsgroup_id, section_id = self.nsxlib.get_sg_mappings(
context.session, id) context.session, id)
super(NsxV3Plugin, self).delete_security_group(context, id) super(NsxV3Plugin, self).delete_security_group(context, id)
@ -2918,8 +2919,9 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
# NOTE(arosen): here are are assuming that all of the security # NOTE(arosen): here are are assuming that all of the security
# group rules being added are part of the same security # group rules being added are part of the same security
# group. We should be validating that this is the case though... # group. We should be validating that this is the case though...
sg_id = sg_rules[0]['security_group_rule']['security_group_id'] sg_id = sg_rules[0]['security_group_rule']['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, sg_id)
security_group = self.get_security_group( security_group = self.get_security_group(
context, sg_id) context, sg_id)
action = firewall.ALLOW action = firewall.ALLOW
@ -2947,6 +2949,7 @@ class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def delete_security_group_rule(self, context, id): def delete_security_group_rule(self, context, id):
rule_db = self._get_security_group_rule(context, id) rule_db = self._get_security_group_rule(context, id)
sg_id = rule_db['security_group_id'] sg_id = rule_db['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, sg_id)
_, section_id = self.nsxlib.get_sg_mappings(context.session, sg_id) _, section_id = self.nsxlib.get_sg_mappings(context.session, sg_id)
fw_rule_id = nsx_db.get_sg_rule_mapping(context.session, id) fw_rule_id = nsx_db.get_sg_rule_mapping(context.session, id)
self.nsxlib.delete_rule(section_id, fw_rule_id) self.nsxlib.delete_rule(section_id, fw_rule_id)

View File

@ -96,6 +96,25 @@ class ProviderSecurityGroupTestPlugin(
context, port, original_port, updated_port) context, port, original_port, updated_port)
return self.get_port(context, id) return self.get_port(context, id)
def delete_security_group(self, context, id):
self._prevent_non_admin_delete_provider_sg(context, id)
super(ProviderSecurityGroupTestPlugin,
self).delete_security_group(context, id)
def delete_security_group_rule(self, context, id):
rule_db = self._get_security_group_rule(context, id)
sg_id = rule_db['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, sg_id)
return super(ProviderSecurityGroupTestPlugin,
self).delete_security_group_rule(context, id)
def create_security_group_rule(self, context, security_group_rule):
id = security_group_rule['security_group_rule']['security_group_id']
self._prevent_non_admin_delete_provider_sg(context, id)
return super(ProviderSecurityGroupTestPlugin,
self).create_security_group_rule(context,
security_group_rule)
class ProviderSecurityGroupExtTestCase( class ProviderSecurityGroupExtTestCase(
test_securitygroup.SecurityGroupDBTestCase): test_securitygroup.SecurityGroupDBTestCase):
@ -243,9 +262,64 @@ class ProviderSecurityGroupExtTestCase(
port['port']['provider_security_groups']) port['port']['provider_security_groups'])
self.assertEqual([sg_id], port['port']['security_groups']) self.assertEqual([sg_id], port['port']['security_groups'])
def test_non_admin_cannot_delete_provider_sg_and_admin_can(self):
provider_secgroup = self._create_provider_security_group()
pvd_sg_id = provider_secgroup['security_group']['id']
class TestNSXv3ProviderSecurityGrp(ProviderSecurityGroupExtTestCase, # Try deleting the request as the normal tenant returns forbidden
test_nsxv3_plugin.NsxV3PluginTestCaseMixin): # as a tenant is not allowed to delete this.
ctx = context.Context('', self._tenant_id)
self._delete('security-groups', pvd_sg_id,
expected_code=webob.exc.HTTPForbidden.code,
neutron_context=ctx)
# can be deleted though as admin
self._delete('security-groups', pvd_sg_id,
expected_code=webob.exc.HTTPNoContent.code)
def test_non_admin_cannot_delete_provider_sg_rule(self):
provider_secgroup = self._create_provider_security_group()
pvd_sg_id = provider_secgroup['security_group']['id']
data = {'security_group_rule': {'security_group_id': pvd_sg_id,
'direction': 'ingress',
'protocol': 'tcp',
'ethertype': 'IPv4',
'tenant_id': self._tenant_id}}
req = self.new_create_request('security-group-rules', data)
res = self.deserialize(self.fmt, req.get_response(self.ext_api))
sg_rule_id = res['security_group_rule']['id']
# Try deleting the request as the normal tenant returns forbidden
# as a tenant is not allowed to delete this.
ctx = context.Context('', self._tenant_id)
self._delete('security-group-rules', sg_rule_id,
expected_code=webob.exc.HTTPForbidden.code,
neutron_context=ctx)
# can be deleted though as admin
self._delete('security-group-rules', sg_rule_id,
expected_code=webob.exc.HTTPNoContent.code)
def test_non_admin_cannot_add_provider_sg_rule(self):
provider_secgroup = self._create_provider_security_group()
pvd_sg_id = provider_secgroup['security_group']['id']
data = {'security_group_rule': {'security_group_id': pvd_sg_id,
'direction': 'ingress',
'protocol': 'tcp',
'ethertype': 'IPv4',
'tenant_id': self._tenant_id}}
req = self.new_create_request(
'security-group-rules', data)
req.environ['neutron.context'] = context.Context('', self._tenant_id)
res = req.get_response(self.ext_api)
self.assertEqual(webob.exc.HTTPForbidden.code, res.status_int)
class TestNSXv3ProviderSecurityGrp(test_nsxv3_plugin.NsxV3PluginTestCaseMixin,
ProviderSecurityGroupExtTestCase):
pass pass