Use RBAC policy to determine if context is admin.
Fixes bug 1152716 If the context roles do not match the configured admin_role, fall back to determining if admin via the "context_is_admin" RBAC policy rule (for consistency with the approach used by the other projects). Note this requires that the "context_is_admin" rule *must* be set in the policy.json if the out-of-the-box default rule is used (as this default is so open, the net effect of omitting the "context_is_admin" rule is for every request to acquire admin status). Change-Id: Ide2cf604b48f24bd759ce2d65091ff546cd9d22e
This commit is contained in:
parent
2781f41fbd
commit
cc938e25f3
@ -1,4 +1,5 @@
|
||||
{
|
||||
"context_is_admin": "role:admin",
|
||||
"default": "",
|
||||
"manage_image_cache": "role:admin"
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import json
|
||||
from oslo.config import cfg
|
||||
import webob.exc
|
||||
|
||||
from glance.api import policy
|
||||
from glance.common import wsgi
|
||||
import glance.context
|
||||
import glance.openstack.common.log as logging
|
||||
@ -58,6 +59,10 @@ class BaseContextMiddleware(wsgi.Middleware):
|
||||
|
||||
|
||||
class ContextMiddleware(BaseContextMiddleware):
|
||||
def __init__(self, app):
|
||||
self.policy_enforcer = policy.Enforcer()
|
||||
super(ContextMiddleware, self).__init__(app)
|
||||
|
||||
def process_request(self, req):
|
||||
"""Convert authentication information into a request context
|
||||
|
||||
@ -84,6 +89,7 @@ class ContextMiddleware(BaseContextMiddleware):
|
||||
'roles': [],
|
||||
'is_admin': False,
|
||||
'read_only': True,
|
||||
'policy_enforcer': self.policy_enforcer,
|
||||
}
|
||||
return glance.context.RequestContext(**kwargs)
|
||||
|
||||
@ -113,6 +119,7 @@ class ContextMiddleware(BaseContextMiddleware):
|
||||
'auth_tok': req.headers.get('X-Auth-Token', deprecated_token),
|
||||
'owner_is_tenant': CONF.owner_is_tenant,
|
||||
'service_catalog': service_catalog,
|
||||
'policy_enforcer': self.policy_enforcer,
|
||||
}
|
||||
|
||||
return glance.context.RequestContext(**kwargs)
|
||||
|
@ -41,6 +41,7 @@ CONF.register_opts(policy_opts)
|
||||
|
||||
|
||||
DEFAULT_RULES = {
|
||||
'context_is_admin': policy.RoleCheck('role', 'admin'),
|
||||
'default': policy.TrueCheck(),
|
||||
'manage_image_cache': policy.RoleCheck('role', 'admin'),
|
||||
}
|
||||
@ -143,6 +144,16 @@ class Enforcer(object):
|
||||
"""
|
||||
return self._check(context, action, target)
|
||||
|
||||
def check_is_admin(self, context):
|
||||
"""Check if the given context is associated with an admin role,
|
||||
as defined via the 'context_is_admin' RBAC rule.
|
||||
|
||||
:param context: Glance request context
|
||||
:returns: A non-False value if context role is admin.
|
||||
"""
|
||||
target = context.to_dict()
|
||||
return self.check(context, 'context_is_admin', target)
|
||||
|
||||
|
||||
class ImageRepoProxy(glance.domain.proxy.Repo):
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from glance.api import policy
|
||||
from glance.openstack.common import local
|
||||
from glance.openstack.common import uuidutils
|
||||
|
||||
@ -27,17 +28,22 @@ class RequestContext(object):
|
||||
|
||||
def __init__(self, auth_tok=None, user=None, tenant=None, roles=None,
|
||||
is_admin=False, read_only=False, show_deleted=False,
|
||||
owner_is_tenant=True, service_catalog=None):
|
||||
owner_is_tenant=True, service_catalog=None,
|
||||
policy_enforcer=None):
|
||||
self.auth_tok = auth_tok
|
||||
self.user = user
|
||||
self.tenant = tenant
|
||||
self.roles = roles or []
|
||||
self.is_admin = is_admin
|
||||
self.read_only = read_only
|
||||
self._show_deleted = show_deleted
|
||||
self.owner_is_tenant = owner_is_tenant
|
||||
self.request_id = uuidutils.generate_uuid()
|
||||
self.service_catalog = service_catalog
|
||||
self.policy_enforcer = policy_enforcer or policy.Enforcer()
|
||||
self.is_admin = is_admin
|
||||
if not self.is_admin:
|
||||
self.is_admin = \
|
||||
self.policy_enforcer.check_is_admin(self)
|
||||
|
||||
if not hasattr(local.store, 'context'):
|
||||
self.update_store()
|
||||
|
@ -1,3 +1,4 @@
|
||||
{
|
||||
"context_is_admin": "role:admin",
|
||||
"default": ""
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import glance.api.middleware.cache
|
||||
from glance.common import exception
|
||||
from glance import context
|
||||
import glance.registry.client.v1.api as registry
|
||||
from glance.tests import utils
|
||||
|
||||
|
||||
class TestCacheMiddlewareURLMatching(testtools.TestCase):
|
||||
@ -140,7 +141,7 @@ class ProcessRequestTestCacheFilter(glance.api.middleware.cache.CacheFilter):
|
||||
self.cache = DummyCache()
|
||||
|
||||
|
||||
class TestCacheMiddlewareProcessRequest(testtools.TestCase):
|
||||
class TestCacheMiddlewareProcessRequest(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestCacheMiddlewareProcessRequest, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
@ -204,7 +205,7 @@ class TestCacheMiddlewareProcessRequest(testtools.TestCase):
|
||||
self.assertEqual(True, actual)
|
||||
|
||||
|
||||
class TestProcessResponse(testtools.TestCase):
|
||||
class TestProcessResponse(utils.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestProcessResponse, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
|
@ -75,6 +75,7 @@ def _db_image_member_fixture(image_id, member_id, **kwargs):
|
||||
class TestImageRepo(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageRepo, self).setUp()
|
||||
self.db = unit_test_utils.FakeDB()
|
||||
self.db.reset()
|
||||
self.context = glance.context.RequestContext(
|
||||
@ -83,7 +84,6 @@ class TestImageRepo(test_utils.BaseTestCase):
|
||||
self.image_factory = glance.domain.ImageFactory()
|
||||
self._create_images()
|
||||
self._create_image_members()
|
||||
super(TestImageRepo, self).setUp()
|
||||
|
||||
def _create_images(self):
|
||||
self.db.reset()
|
||||
@ -275,6 +275,7 @@ class TestEncryptedLocations(test_utils.BaseTestCase):
|
||||
class TestImageMemberRepo(test_utils.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageMemberRepo, self).setUp()
|
||||
self.db = unit_test_utils.FakeDB()
|
||||
self.db.reset()
|
||||
self.context = glance.context.RequestContext(
|
||||
@ -286,7 +287,6 @@ class TestImageMemberRepo(test_utils.BaseTestCase):
|
||||
image = self.image_repo.get(UUID1)
|
||||
self.image_member_repo = glance.db.ImageMemberRepo(self.context,
|
||||
self.db, image)
|
||||
super(TestImageMemberRepo, self).setUp()
|
||||
|
||||
def _create_images(self):
|
||||
self.images = [
|
||||
|
@ -464,6 +464,7 @@ class TestImageNotifications(utils.BaseTestCase):
|
||||
"""Test Image Notifications work"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestImageNotifications, self).setUp()
|
||||
self.image = ImageStub(
|
||||
image_id=UUID1, name='image-1', status='active', size=1024,
|
||||
created_at=DATETIME, updated_at=DATETIME, owner=TENANT1,
|
||||
@ -479,7 +480,6 @@ class TestImageNotifications(utils.BaseTestCase):
|
||||
self.image_repo_stub, self.context, self.notifier)
|
||||
self.image_proxy = glance.notifier.ImageProxy(
|
||||
self.image, self.context, self.notifier)
|
||||
super(TestImageNotifications, self).setUp()
|
||||
|
||||
def test_image_save_notification(self):
|
||||
self.image_repo_proxy.save(self.image_proxy)
|
||||
|
@ -258,3 +258,46 @@ class TestImagePolicy(test_utils.BaseTestCase):
|
||||
|
||||
image = glance.api.policy.ImageProxy(self.image_stub, {}, self.policy)
|
||||
self.assertRaises(exception.Forbidden, image.get_data)
|
||||
|
||||
|
||||
class TestContextPolicyEnforcer(base.IsolatedUnitTest):
|
||||
def _do_test_policy_influence_context_admin(self,
|
||||
policy_admin_role,
|
||||
context_role,
|
||||
context_is_admin,
|
||||
admin_expected):
|
||||
self.config(policy_file=os.path.join(self.test_dir, 'gobble.gobble'))
|
||||
|
||||
rules = {'context_is_admin': 'role:%s' % policy_admin_role}
|
||||
self.set_policy_rules(rules)
|
||||
|
||||
enforcer = glance.api.policy.Enforcer()
|
||||
|
||||
context = glance.context.RequestContext(roles=[context_role],
|
||||
is_admin=context_is_admin,
|
||||
policy_enforcer=enforcer)
|
||||
self.assertEquals(context.is_admin, admin_expected)
|
||||
|
||||
def test_context_admin_policy_admin(self):
|
||||
self._do_test_policy_influence_context_admin('test_admin',
|
||||
'test_admin',
|
||||
True,
|
||||
True)
|
||||
|
||||
def test_context_nonadmin_policy_admin(self):
|
||||
self._do_test_policy_influence_context_admin('test_admin',
|
||||
'test_admin',
|
||||
False,
|
||||
True)
|
||||
|
||||
def test_context_admin_policy_nonadmin(self):
|
||||
self._do_test_policy_influence_context_admin('test_admin',
|
||||
'demo',
|
||||
True,
|
||||
True)
|
||||
|
||||
def test_context_nonadmin_policy_nonadmin(self):
|
||||
self._do_test_policy_influence_context_admin('test_admin',
|
||||
'demo',
|
||||
False,
|
||||
False)
|
||||
|
Loading…
x
Reference in New Issue
Block a user