Merge "Better scoping for tempurls, especially container tempurls"
This commit is contained in:
commit
59550070f9
@ -152,6 +152,10 @@ DEFAULT_OUTGOING_REMOVE_HEADERS = 'x-object-meta-*'
|
||||
DEFAULT_OUTGOING_ALLOW_HEADERS = 'x-object-meta-public-*'
|
||||
|
||||
|
||||
CONTAINER_SCOPE = 'container'
|
||||
ACCOUNT_SCOPE = 'account'
|
||||
|
||||
|
||||
def get_tempurl_keys_from_metadata(meta):
|
||||
"""
|
||||
Extracts the tempurl keys from metadata.
|
||||
@ -172,6 +176,38 @@ def disposition_format(filename):
|
||||
quote(filename, safe=' /'), quote(filename))
|
||||
|
||||
|
||||
def authorize_same_account(account_to_match):
|
||||
|
||||
def auth_callback_same_account(req):
|
||||
try:
|
||||
_ver, acc, _rest = req.split_path(2, 3, True)
|
||||
except ValueError:
|
||||
return HTTPUnauthorized(request=req)
|
||||
|
||||
if acc == account_to_match:
|
||||
return None
|
||||
else:
|
||||
return HTTPUnauthorized(request=req)
|
||||
|
||||
return auth_callback_same_account
|
||||
|
||||
|
||||
def authorize_same_container(account_to_match, container_to_match):
|
||||
|
||||
def auth_callback_same_container(req):
|
||||
try:
|
||||
_ver, acc, con, _rest = req.split_path(3, 4, True)
|
||||
except ValueError:
|
||||
return HTTPUnauthorized(request=req)
|
||||
|
||||
if acc == account_to_match and con == container_to_match:
|
||||
return None
|
||||
else:
|
||||
return HTTPUnauthorized(request=req)
|
||||
|
||||
return auth_callback_same_container
|
||||
|
||||
|
||||
class TempURL(object):
|
||||
"""
|
||||
WSGI Middleware to grant temporary URLs specific access to Swift
|
||||
@ -304,10 +340,10 @@ class TempURL(object):
|
||||
return self.app(env, start_response)
|
||||
if not temp_url_sig or not temp_url_expires:
|
||||
return self._invalid(env, start_response)
|
||||
account = self._get_account(env)
|
||||
account, container = self._get_account_and_container(env)
|
||||
if not account:
|
||||
return self._invalid(env, start_response)
|
||||
keys = self._get_keys(env, account)
|
||||
keys = self._get_keys(env)
|
||||
if not keys:
|
||||
return self._invalid(env, start_response)
|
||||
if env['REQUEST_METHOD'] == 'HEAD':
|
||||
@ -322,11 +358,16 @@ class TempURL(object):
|
||||
else:
|
||||
hmac_vals = self._get_hmacs(env, temp_url_expires, keys)
|
||||
|
||||
# While it's true that any() will short-circuit, this doesn't affect
|
||||
# the timing-attack resistance since the only way this will
|
||||
# short-circuit is when a valid signature is passed in.
|
||||
is_valid_hmac = any(streq_const_time(temp_url_sig, hmac)
|
||||
for hmac in hmac_vals)
|
||||
is_valid_hmac = False
|
||||
hmac_scope = None
|
||||
for hmac, scope in hmac_vals:
|
||||
# While it's true that we short-circuit, this doesn't affect the
|
||||
# timing-attack resistance since the only way this will
|
||||
# short-circuit is when a valid signature is passed in.
|
||||
if streq_const_time(temp_url_sig, hmac):
|
||||
is_valid_hmac = True
|
||||
hmac_scope = scope
|
||||
break
|
||||
if not is_valid_hmac:
|
||||
return self._invalid(env, start_response)
|
||||
# disallowed headers prevent accidently allowing upload of a pointer
|
||||
@ -337,7 +378,12 @@ class TempURL(object):
|
||||
if resp:
|
||||
return resp
|
||||
self._clean_incoming_headers(env)
|
||||
env['swift.authorize'] = lambda req: None
|
||||
|
||||
if hmac_scope == ACCOUNT_SCOPE:
|
||||
env['swift.authorize'] = authorize_same_account(account)
|
||||
else:
|
||||
env['swift.authorize'] = authorize_same_container(account,
|
||||
container)
|
||||
env['swift.authorize_override'] = True
|
||||
env['REMOTE_USER'] = '.wsgi.tempurl'
|
||||
qs = {'temp_url_sig': temp_url_sig,
|
||||
@ -378,22 +424,23 @@ class TempURL(object):
|
||||
|
||||
return self.app(env, _start_response)
|
||||
|
||||
def _get_account(self, env):
|
||||
def _get_account_and_container(self, env):
|
||||
"""
|
||||
Returns just the account for the request, if it's an object
|
||||
request and one of the configured methods; otherwise, None is
|
||||
Returns just the account and container for the request, if it's an
|
||||
object request and one of the configured methods; otherwise, None is
|
||||
returned.
|
||||
|
||||
:param env: The WSGI environment for the request.
|
||||
:returns: Account str or None.
|
||||
:returns: (Account str, container str) or (None, None).
|
||||
"""
|
||||
if env['REQUEST_METHOD'] in self.methods:
|
||||
try:
|
||||
ver, acc, cont, obj = split_path(env['PATH_INFO'], 4, 4, True)
|
||||
except ValueError:
|
||||
return None
|
||||
return (None, None)
|
||||
if ver == 'v1' and obj.strip('/'):
|
||||
return acc
|
||||
return (acc, cont)
|
||||
return (None, None)
|
||||
|
||||
def _get_temp_url_info(self, env):
|
||||
"""
|
||||
@ -423,18 +470,23 @@ class TempURL(object):
|
||||
inline = True
|
||||
return temp_url_sig, temp_url_expires, filename, inline
|
||||
|
||||
def _get_keys(self, env, account):
|
||||
def _get_keys(self, env):
|
||||
"""
|
||||
Returns the X-[Account|Container]-Meta-Temp-URL-Key[-2] header values
|
||||
for the account or container, or an empty list if none are set.
|
||||
for the account or container, or an empty list if none are set. Each
|
||||
value comes as a 2-tuple (key, scope), where scope is either
|
||||
CONTAINER_SCOPE or ACCOUNT_SCOPE.
|
||||
|
||||
Returns 0-4 elements depending on how many keys are set in the
|
||||
account's or container's metadata.
|
||||
|
||||
:param env: The WSGI environment for the request.
|
||||
:param account: Account str.
|
||||
:returns: [X-Account-Meta-Temp-URL-Key str value if set,
|
||||
X-Account-Meta-Temp-URL-Key-2 str value if set]
|
||||
:returns: [
|
||||
(X-Account-Meta-Temp-URL-Key str value, ACCOUNT_SCOPE) if set,
|
||||
(X-Account-Meta-Temp-URL-Key-2 str value, ACCOUNT_SCOPE if set,
|
||||
(X-Container-Meta-Temp-URL-Key str value, CONTAINER_SCOPE) if set,
|
||||
(X-Container-Meta-Temp-URL-Key-2 str value, CONTAINER_SCOPE if set,
|
||||
]
|
||||
"""
|
||||
account_info = get_account_info(env, self.app, swift_source='TU')
|
||||
account_keys = get_tempurl_keys_from_metadata(account_info['meta'])
|
||||
@ -443,25 +495,28 @@ class TempURL(object):
|
||||
container_keys = get_tempurl_keys_from_metadata(
|
||||
container_info.get('meta', []))
|
||||
|
||||
return account_keys + container_keys
|
||||
return ([(ak, ACCOUNT_SCOPE) for ak in account_keys] +
|
||||
[(ck, CONTAINER_SCOPE) for ck in container_keys])
|
||||
|
||||
def _get_hmacs(self, env, expires, keys, request_method=None):
|
||||
def _get_hmacs(self, env, expires, scoped_keys, request_method=None):
|
||||
"""
|
||||
:param env: The WSGI environment for the request.
|
||||
:param expires: Unix timestamp as an int for when the URL
|
||||
expires.
|
||||
:param keys: Key strings, from the X-Account-Meta-Temp-URL-Key[-2] of
|
||||
the account.
|
||||
:param scoped_keys: (key, scope) tuples like _get_keys() returns
|
||||
:param request_method: Optional override of the request in
|
||||
the WSGI env. For example, if a HEAD
|
||||
does not match, you may wish to
|
||||
override with GET to still allow the
|
||||
HEAD.
|
||||
|
||||
:returns: a list of (hmac, scope) 2-tuples
|
||||
"""
|
||||
if not request_method:
|
||||
request_method = env['REQUEST_METHOD']
|
||||
return [get_hmac(
|
||||
request_method, env['PATH_INFO'], expires, key) for key in keys]
|
||||
return [
|
||||
(get_hmac(request_method, env['PATH_INFO'], expires, key), scope)
|
||||
for (key, scope) in scoped_keys]
|
||||
|
||||
def _invalid(self, env, start_response):
|
||||
"""
|
||||
|
@ -381,6 +381,7 @@ class Application(object):
|
||||
allowed_methods = getattr(controller, 'allowed_methods', set())
|
||||
return HTTPMethodNotAllowed(
|
||||
request=req, headers={'Allow': ', '.join(allowed_methods)})
|
||||
old_authorize = None
|
||||
if 'swift.authorize' in req.environ:
|
||||
# We call authorize before the handler, always. If authorized,
|
||||
# we remove the swift.authorize hook so isn't ever called
|
||||
@ -391,7 +392,7 @@ class Application(object):
|
||||
if not resp and not req.headers.get('X-Copy-From-Account') \
|
||||
and not req.headers.get('Destination-Account'):
|
||||
# No resp means authorized, no delayed recheck required.
|
||||
del req.environ['swift.authorize']
|
||||
old_authorize = req.environ['swift.authorize']
|
||||
else:
|
||||
# Response indicates denial, but we might delay the denial
|
||||
# and recheck later. If not delayed, return the error now.
|
||||
@ -401,7 +402,13 @@ class Application(object):
|
||||
# gets mutated during handling. This way logging can display the
|
||||
# method the client actually sent.
|
||||
req.environ['swift.orig_req_method'] = req.method
|
||||
return handler(req)
|
||||
try:
|
||||
if old_authorize:
|
||||
req.environ.pop('swift.authorize', None)
|
||||
return handler(req)
|
||||
finally:
|
||||
if old_authorize:
|
||||
req.environ['swift.authorize'] = old_authorize
|
||||
except HTTPException as error_response:
|
||||
return error_response
|
||||
except (Exception, Timeout):
|
||||
|
@ -3090,6 +3090,59 @@ class TestTempurl(Base):
|
||||
contents = self.env.obj.read(parms=parms, cfg={'no_auth_token': True})
|
||||
self.assertEqual(contents, "obj contents")
|
||||
|
||||
def test_GET_DLO_inside_container(self):
|
||||
seg1 = self.env.container.file(
|
||||
"get-dlo-inside-seg1" + Utils.create_name())
|
||||
seg2 = self.env.container.file(
|
||||
"get-dlo-inside-seg2" + Utils.create_name())
|
||||
seg1.write("one fish two fish ")
|
||||
seg2.write("red fish blue fish")
|
||||
|
||||
manifest = self.env.container.file("manifest" + Utils.create_name())
|
||||
manifest.write(
|
||||
'',
|
||||
hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
|
||||
(self.env.container.name,)})
|
||||
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
'GET', expires, self.env.conn.make_path(manifest.path),
|
||||
self.env.tempurl_key)
|
||||
parms = {'temp_url_sig': sig,
|
||||
'temp_url_expires': str(expires)}
|
||||
|
||||
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
|
||||
self.assertEqual(contents, "one fish two fish red fish blue fish")
|
||||
|
||||
def test_GET_DLO_outside_container(self):
|
||||
seg1 = self.env.container.file(
|
||||
"get-dlo-outside-seg1" + Utils.create_name())
|
||||
seg2 = self.env.container.file(
|
||||
"get-dlo-outside-seg2" + Utils.create_name())
|
||||
seg1.write("one fish two fish ")
|
||||
seg2.write("red fish blue fish")
|
||||
|
||||
container2 = self.env.account.container(Utils.create_name())
|
||||
container2.create()
|
||||
|
||||
manifest = container2.file("manifest" + Utils.create_name())
|
||||
manifest.write(
|
||||
'',
|
||||
hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
|
||||
(self.env.container.name,)})
|
||||
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
'GET', expires, self.env.conn.make_path(manifest.path),
|
||||
self.env.tempurl_key)
|
||||
parms = {'temp_url_sig': sig,
|
||||
'temp_url_expires': str(expires)}
|
||||
|
||||
# cross container tempurl works fine for account tempurl key
|
||||
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
|
||||
self.assertEqual(contents, "one fish two fish red fish blue fish")
|
||||
self.assert_status([200])
|
||||
|
||||
def test_PUT(self):
|
||||
new_obj = self.env.container.file(Utils.create_name())
|
||||
|
||||
@ -3422,6 +3475,67 @@ class TestContainerTempurl(Base):
|
||||
'Container TempURL key-2 found, should not be visible '
|
||||
'to readonly ACLs')
|
||||
|
||||
def test_GET_DLO_inside_container(self):
|
||||
seg1 = self.env.container.file(
|
||||
"get-dlo-inside-seg1" + Utils.create_name())
|
||||
seg2 = self.env.container.file(
|
||||
"get-dlo-inside-seg2" + Utils.create_name())
|
||||
seg1.write("one fish two fish ")
|
||||
seg2.write("red fish blue fish")
|
||||
|
||||
manifest = self.env.container.file("manifest" + Utils.create_name())
|
||||
manifest.write(
|
||||
'',
|
||||
hdrs={"X-Object-Manifest": "%s/get-dlo-inside-seg" %
|
||||
(self.env.container.name,)})
|
||||
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
'GET', expires, self.env.conn.make_path(manifest.path),
|
||||
self.env.tempurl_key)
|
||||
parms = {'temp_url_sig': sig,
|
||||
'temp_url_expires': str(expires)}
|
||||
|
||||
contents = manifest.read(parms=parms, cfg={'no_auth_token': True})
|
||||
self.assertEqual(contents, "one fish two fish red fish blue fish")
|
||||
|
||||
def test_GET_DLO_outside_container(self):
|
||||
container2 = self.env.account.container(Utils.create_name())
|
||||
container2.create()
|
||||
seg1 = container2.file(
|
||||
"get-dlo-outside-seg1" + Utils.create_name())
|
||||
seg2 = container2.file(
|
||||
"get-dlo-outside-seg2" + Utils.create_name())
|
||||
seg1.write("one fish two fish ")
|
||||
seg2.write("red fish blue fish")
|
||||
|
||||
manifest = self.env.container.file("manifest" + Utils.create_name())
|
||||
manifest.write(
|
||||
'',
|
||||
hdrs={"X-Object-Manifest": "%s/get-dlo-outside-seg" %
|
||||
(container2.name,)})
|
||||
|
||||
expires = int(time.time()) + 86400
|
||||
sig = self.tempurl_sig(
|
||||
'GET', expires, self.env.conn.make_path(manifest.path),
|
||||
self.env.tempurl_key)
|
||||
parms = {'temp_url_sig': sig,
|
||||
'temp_url_expires': str(expires)}
|
||||
|
||||
# cross container tempurl does not work for container tempurl key
|
||||
try:
|
||||
manifest.read(parms=parms, cfg={'no_auth_token': True})
|
||||
except ResponseError as e:
|
||||
self.assertEqual(e.status, 401)
|
||||
else:
|
||||
self.fail('request did not error')
|
||||
try:
|
||||
manifest.info(parms=parms, cfg={'no_auth_token': True})
|
||||
except ResponseError as e:
|
||||
self.assertEqual(e.status, 401)
|
||||
else:
|
||||
self.fail('request did not error')
|
||||
|
||||
|
||||
class TestContainerTempurlUTF8(Base2, TestContainerTempurl):
|
||||
set_up = False
|
||||
|
@ -29,6 +29,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import hmac
|
||||
import itertools
|
||||
import unittest
|
||||
from hashlib import sha1
|
||||
from time import time
|
||||
@ -44,10 +45,13 @@ class FakeApp(object):
|
||||
self.calls = 0
|
||||
self.status_headers_body_iter = status_headers_body_iter
|
||||
if not self.status_headers_body_iter:
|
||||
self.status_headers_body_iter = iter([('404 Not Found', {
|
||||
'x-test-header-one-a': 'value1',
|
||||
'x-test-header-two-a': 'value2',
|
||||
'x-test-header-two-b': 'value3'}, '')])
|
||||
self.status_headers_body_iter = iter(
|
||||
itertools.repeat((
|
||||
'404 Not Found', {
|
||||
'x-test-header-one-a': 'value1',
|
||||
'x-test-header-two-a': 'value2',
|
||||
'x-test-header-two-b': 'value3'},
|
||||
'')))
|
||||
self.request = None
|
||||
|
||||
def __call__(self, env, start_response):
|
||||
@ -69,16 +73,18 @@ class TestTempURL(unittest.TestCase):
|
||||
self.auth = tempauth.filter_factory({'reseller_prefix': ''})(self.app)
|
||||
self.tempurl = tempurl.filter_factory({})(self.auth)
|
||||
|
||||
def _make_request(self, path, environ=None, keys=(), **kwargs):
|
||||
def _make_request(self, path, environ=None, keys=(), container_keys=None,
|
||||
**kwargs):
|
||||
if environ is None:
|
||||
environ = {}
|
||||
|
||||
_junk, account, _junk, _junk = utils.split_path(path, 2, 4)
|
||||
self._fake_cache_environ(environ, account, keys)
|
||||
self._fake_cache_environ(environ, account, keys,
|
||||
container_keys=container_keys)
|
||||
req = Request.blank(path, environ=environ, **kwargs)
|
||||
return req
|
||||
|
||||
def _fake_cache_environ(self, environ, account, keys):
|
||||
def _fake_cache_environ(self, environ, account, keys, container_keys=None):
|
||||
"""
|
||||
Fake out the caching layer for get_account_info(). Injects account data
|
||||
into environ such that keys are the tempurl keys, if set.
|
||||
@ -96,8 +102,13 @@ class TestTempURL(unittest.TestCase):
|
||||
'bytes': '0',
|
||||
'meta': meta}
|
||||
|
||||
meta = {}
|
||||
for i, key in enumerate(container_keys or []):
|
||||
meta_name = 'Temp-URL-key' + (("-%d" % (i + 1) if i else ""))
|
||||
meta[meta_name] = key
|
||||
|
||||
container_cache_key = 'swift.container/' + account + '/c'
|
||||
environ.setdefault(container_cache_key, {'meta': {}})
|
||||
environ.setdefault(container_cache_key, {'meta': meta})
|
||||
|
||||
def test_passthrough(self):
|
||||
resp = self._make_request('/v1/a/c/o').get_response(self.tempurl)
|
||||
@ -581,6 +592,81 @@ class TestTempURL(unittest.TestCase):
|
||||
self.assertTrue('Temp URL invalid' in resp.body)
|
||||
self.assertTrue('Www-Authenticate' in resp.headers)
|
||||
|
||||
def test_authorize_limits_scope(self):
|
||||
req_other_object = Request.blank("/v1/a/c/o2")
|
||||
req_other_container = Request.blank("/v1/a/c2/o2")
|
||||
req_other_account = Request.blank("/v1/a2/c2/o2")
|
||||
|
||||
key_kwargs = {
|
||||
'keys': ['account-key', 'shared-key'],
|
||||
'container_keys': ['container-key', 'shared-key'],
|
||||
}
|
||||
|
||||
# A request with the account key limits the pre-authed scope to the
|
||||
# account level.
|
||||
method = 'GET'
|
||||
expires = int(time() + 86400)
|
||||
path = '/v1/a/c/o'
|
||||
|
||||
hmac_body = '%s\n%s\n%s' % (method, expires, path)
|
||||
sig = hmac.new('account-key', hmac_body, sha1).hexdigest()
|
||||
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
|
||||
|
||||
# make request will setup the environ cache for us
|
||||
req = self._make_request(path + qs, **key_kwargs)
|
||||
resp = req.get_response(self.tempurl)
|
||||
self.assertEquals(resp.status_int, 404) # sanity check
|
||||
|
||||
authorize = req.environ['swift.authorize']
|
||||
# Requests for other objects happen if, for example, you're
|
||||
# downloading a large object or creating a large-object manifest.
|
||||
oo_resp = authorize(req_other_object)
|
||||
self.assertEqual(oo_resp, None)
|
||||
oc_resp = authorize(req_other_container)
|
||||
self.assertEqual(oc_resp, None)
|
||||
oa_resp = authorize(req_other_account)
|
||||
self.assertEqual(oa_resp.status_int, 401)
|
||||
|
||||
# A request with the container key limits the pre-authed scope to
|
||||
# the container level; a different container in the same account is
|
||||
# out of scope and thus forbidden.
|
||||
hmac_body = '%s\n%s\n%s' % (method, expires, path)
|
||||
sig = hmac.new('container-key', hmac_body, sha1).hexdigest()
|
||||
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
|
||||
|
||||
req = self._make_request(path + qs, **key_kwargs)
|
||||
resp = req.get_response(self.tempurl)
|
||||
self.assertEquals(resp.status_int, 404) # sanity check
|
||||
|
||||
authorize = req.environ['swift.authorize']
|
||||
oo_resp = authorize(req_other_object)
|
||||
self.assertEqual(oo_resp, None)
|
||||
oc_resp = authorize(req_other_container)
|
||||
self.assertEqual(oc_resp.status_int, 401)
|
||||
oa_resp = authorize(req_other_account)
|
||||
self.assertEqual(oa_resp.status_int, 401)
|
||||
|
||||
# If account and container share a key (users set these, so this can
|
||||
# happen by accident, stupidity, *or* malice!), limit the scope to
|
||||
# account level. This prevents someone from shrinking the scope of
|
||||
# account-level tempurls by reusing one of the account's keys on a
|
||||
# container.
|
||||
hmac_body = '%s\n%s\n%s' % (method, expires, path)
|
||||
sig = hmac.new('shared-key', hmac_body, sha1).hexdigest()
|
||||
qs = '?temp_url_sig=%s&temp_url_expires=%s' % (sig, expires)
|
||||
|
||||
req = self._make_request(path + qs, **key_kwargs)
|
||||
resp = req.get_response(self.tempurl)
|
||||
self.assertEquals(resp.status_int, 404) # sanity check
|
||||
|
||||
authorize = req.environ['swift.authorize']
|
||||
oo_resp = authorize(req_other_object)
|
||||
self.assertEqual(oo_resp, None)
|
||||
oc_resp = authorize(req_other_container)
|
||||
self.assertEqual(oc_resp, None)
|
||||
oa_resp = authorize(req_other_account)
|
||||
self.assertEqual(oa_resp.status_int, 401)
|
||||
|
||||
def test_changed_path_invalid(self):
|
||||
method = 'GET'
|
||||
expires = int(time() + 86400)
|
||||
@ -828,35 +914,38 @@ class TestTempURL(unittest.TestCase):
|
||||
self.assertTrue('x-conflict-header-test' in resp.headers)
|
||||
self.assertEqual(resp.headers['x-conflict-header-test'], 'value')
|
||||
|
||||
def test_get_account(self):
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'HEAD', 'PATH_INFO': '/v1/a/c/o'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c/o'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'PUT', 'PATH_INFO': '/v1/a/c/o'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'POST', 'PATH_INFO': '/v1/a/c/o'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'DELETE', 'PATH_INFO': '/v1/a/c/o'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'UNKNOWN', 'PATH_INFO': '/v1/a/c/o'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c/'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c//////'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c///o///'}), 'a')
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a//o'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1//c/o'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '//a/c/o'}), None)
|
||||
self.assertEquals(self.tempurl._get_account({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v2/a/c/o'}), None)
|
||||
def test_get_account_and_container(self):
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'HEAD', 'PATH_INFO': '/v1/a/c/o'}), ('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c/o'}), ('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'PUT', 'PATH_INFO': '/v1/a/c/o'}), ('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'POST', 'PATH_INFO': '/v1/a/c/o'}), ('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'DELETE', 'PATH_INFO': '/v1/a/c/o'}), ('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'UNKNOWN', 'PATH_INFO': '/v1/a/c/o'}),
|
||||
(None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c/'}), (None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c//////'}),
|
||||
(None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c///o///'}),
|
||||
('a', 'c'))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c'}), (None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a//o'}), (None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1//c/o'}), (None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '//a/c/o'}), (None, None))
|
||||
self.assertEquals(self.tempurl._get_account_and_container({
|
||||
'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v2/a/c/o'}), (None, None))
|
||||
|
||||
def test_get_temp_url_info(self):
|
||||
s = 'f5d5051bddf5df7e27c628818738334f'
|
||||
@ -908,13 +997,13 @@ class TestTempURL(unittest.TestCase):
|
||||
self.assertEquals(
|
||||
self.tempurl._get_hmacs(
|
||||
{'REQUEST_METHOD': 'GET', 'PATH_INFO': '/v1/a/c/o'},
|
||||
1, ['abc']),
|
||||
['026d7f7cc25256450423c7ad03fc9f5ffc1dab6d'])
|
||||
1, [('abc', 'account')]),
|
||||
[('026d7f7cc25256450423c7ad03fc9f5ffc1dab6d', 'account')])
|
||||
self.assertEquals(
|
||||
self.tempurl._get_hmacs(
|
||||
{'REQUEST_METHOD': 'HEAD', 'PATH_INFO': '/v1/a/c/o'},
|
||||
1, ['abc'], request_method='GET'),
|
||||
['026d7f7cc25256450423c7ad03fc9f5ffc1dab6d'])
|
||||
1, [('abc', 'account')], request_method='GET'),
|
||||
[('026d7f7cc25256450423c7ad03fc9f5ffc1dab6d', 'account')])
|
||||
|
||||
def test_invalid(self):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user