Richard (Rick) Hawkins 2c4bf81464 Added discoverable capabilities.
Swift can now optionally be configured to allow requests to '/info',
providing information about the swift cluster.  Additionally a HMAC
signed requests to
'/info?swiftinfo_sig=<sign>&swiftinfo_expires=<expires>' can be
configured allowing privileged access to more sensitive information
not meant to be public.

DocImpact
Change-Id: I2379360fbfe3d9e9e8b25f1dc34517d199574495
Implements: blueprint capabilities
Closes-Bug: #1245694
2013-11-22 15:54:13 -06:00

294 lines
12 KiB
Python

# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import time
from mock import Mock
from swift.proxy.controllers import InfoController
from swift.proxy.server import Application as ProxyApp
from swift.common import utils
from swift.common.utils import json
from swift.common.swob import Request, HTTPException
class TestInfoController(unittest.TestCase):
def setUp(self):
utils._swift_info = {}
utils._swift_admin_info = {}
def get_controller(self, expose_info=None, disallowed_sections=None,
admin_key=None):
disallowed_sections = disallowed_sections or []
app = Mock(spec=ProxyApp)
return InfoController(app, None, expose_info,
disallowed_sections, admin_key)
def start_response(self, status, headers):
self.got_statuses.append(status)
for h in headers:
self.got_headers.append({h[0]: h[1]})
def test_disabled_info(self):
controller = self.get_controller(expose_info=False)
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('403 Forbidden', str(resp))
def test_get_info(self):
controller = self.get_controller(expose_info=True)
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
info = json.loads(resp.body)
self.assertTrue('admin' not in info)
self.assertTrue('foo' in info)
self.assertTrue('bar' in info['foo'])
self.assertEqual(info['foo']['bar'], 'baz')
def test_options_info(self):
controller = self.get_controller(expose_info=True)
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'GET'})
resp = controller.OPTIONS(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
self.assertTrue('Allow' in resp.headers)
def test_get_info_cors(self):
controller = self.get_controller(expose_info=True)
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'GET'},
headers={'Origin': 'http://example.com'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
info = json.loads(resp.body)
self.assertTrue('admin' not in info)
self.assertTrue('foo' in info)
self.assertTrue('bar' in info['foo'])
self.assertEqual(info['foo']['bar'], 'baz')
self.assertTrue('Access-Control-Allow-Origin' in resp.headers)
self.assertTrue('Access-Control-Expose-Headers' in resp.headers)
def test_head_info(self):
controller = self.get_controller(expose_info=True)
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'HEAD'})
resp = controller.HEAD(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
def test_disallow_info(self):
controller = self.get_controller(expose_info=True,
disallowed_sections=['foo2'])
utils._swift_info = {'foo': {'bar': 'baz'},
'foo2': {'bar2': 'baz2'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
req = Request.blank(
'/info', environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
info = json.loads(resp.body)
self.assertTrue('foo' in info)
self.assertTrue('bar' in info['foo'])
self.assertEqual(info['foo']['bar'], 'baz')
self.assertTrue('foo2' not in info)
def test_disabled_admin_info(self):
controller = self.get_controller(expose_info=True, admin_key='')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/info', expires, '')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('403 Forbidden', str(resp))
def test_get_admin_info(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
info = json.loads(resp.body)
self.assertTrue('admin' in info)
self.assertTrue('qux' in info['admin'])
self.assertTrue('quux' in info['admin']['qux'])
self.assertEqual(info['admin']['qux']['quux'], 'corge')
def test_head_admin_info(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'HEAD'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
expires = int(time.time() + 86400)
sig = utils.get_hmac('HEAD', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'HEAD'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
def test_get_admin_info_invalid_method(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('HEAD', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('401 Unauthorized', str(resp))
def test_get_admin_info_invalid_expires(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = 1
sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('401 Unauthorized', str(resp))
expires = 'abc'
sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('401 Unauthorized', str(resp))
def test_get_admin_info_invalid_path(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/foo', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('401 Unauthorized', str(resp))
def test_get_admin_info_invalid_key(self):
controller = self.get_controller(expose_info=True,
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/foo', expires, 'invalid-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('401 Unauthorized', str(resp))
def test_admin_disallow_info(self):
controller = self.get_controller(expose_info=True,
disallowed_sections=['foo2'],
admin_key='secret-admin-key')
utils._swift_info = {'foo': {'bar': 'baz'},
'foo2': {'bar2': 'baz2'}}
utils._swift_admin_info = {'qux': {'quux': 'corge'}}
expires = int(time.time() + 86400)
sig = utils.get_hmac('GET', '/info', expires, 'secret-admin-key')
path = '/info?swiftinfo_sig={sig}&swiftinfo_expires={expires}'.format(
sig=sig, expires=expires)
req = Request.blank(
path, environ={'REQUEST_METHOD': 'GET'})
resp = controller.GET(req)
self.assertTrue(isinstance(resp, HTTPException))
self.assertEqual('200 OK', str(resp))
info = json.loads(resp.body)
self.assertTrue('foo2' not in info)
self.assertTrue('admin' in info)
self.assertTrue('disallowed_sections' in info['admin'])
self.assertTrue('foo2' in info['admin']['disallowed_sections'])
self.assertTrue('qux' in info['admin'])
self.assertTrue('quux' in info['admin']['qux'])
self.assertEqual(info['admin']['qux']['quux'], 'corge')
if __name__ == '__main__':
unittest.main()