swift/test/functionalnosetests/test_container.py
Pete Zaitcev 35e07e2982 Permit functional tests against Swift with Keystone
While poking at such problems with authentication other than swauth
it was useful to allow stock functional tests running back-to-back
at two clusters, with tempauth and Keystone.

When running the legacy tests, this version packs account into user
the way it was down previously. This way we do not need to repack
them before every call to get_auth. The downside is additional work
to be done when setting ACLs.

Change-Id: Ieb1d9227cb88977ecd2c39825039bc4be8afae0b
2013-03-10 18:52:10 -06:00

574 lines
22 KiB
Python
Executable File

#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from nose import SkipTest
from uuid import uuid4
from swift.common.constraints import MAX_META_COUNT, MAX_META_NAME_LENGTH, \
MAX_META_OVERALL_SIZE, MAX_META_VALUE_LENGTH
from swift_testing import check_response, retry, skip, skip2, skip3, \
swift_test_perm, web_front_end
class TestContainer(unittest.TestCase):
def setUp(self):
if skip:
raise SkipTest
self.name = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 201)
def tearDown(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name + '?format=json',
'', {'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, obj):
conn.request('DELETE',
'/'.join([parsed.path, self.name, obj['name']]), '',
{'X-Auth-Token': token})
return check_response(conn)
while True:
resp = retry(get)
body = resp.read()
self.assert_(resp.status // 100 == 2, resp.status)
objs = json.loads(body)
if not objs:
break
for obj in objs:
resp = retry(delete, obj)
resp.read()
self.assertEquals(resp.status, 204)
def delete(url, token, parsed, conn):
conn.request('DELETE', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEquals(resp.status, 204)
def test_multi_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, name, value):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, name: value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(post, 'X-Container-Meta-One', '1')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-one'), '1')
resp = retry(post, 'X-Container-Meta-Two', '2')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-one'), '1')
self.assertEquals(resp.getheader('x-container-meta-two'), '2')
def test_PUT_metadata(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn, name, value):
conn.request('PUT', parsed.path + '/' + name, '',
{'X-Auth-Token': token, 'X-Container-Meta-Test': value})
return check_response(conn)
def head(url, token, parsed, conn, name):
conn.request('HEAD', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
name = uuid4().hex
resp = retry(put, name, 'Value')
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(put, name, '')
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(head, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(get, name)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
def test_POST_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, value):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Meta-Test': value})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), None)
resp = retry(post, 'Value')
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(head)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
resp = retry(get)
resp.read()
self.assert_(resp.status in (200, 204), resp.status)
self.assertEquals(resp.getheader('x-container-meta-test'), 'Value')
def test_PUT_bad_metadata(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn, name, extra_headers):
headers = {'X-Auth-Token': token}
headers.update(extra_headers)
conn.request('PUT', parsed.path + '/' + name, '', headers)
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
name = uuid4().hex
resp = retry(put, name,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(put, name,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
resp = retry(put, name,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
resp = retry(put, name,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
name = uuid4().hex
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
size = 0
x = 0
while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
size += 4 + MAX_META_VALUE_LENGTH
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 201)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 204)
name = uuid4().hex
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(put, name, headers)
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(delete, name)
resp.read()
self.assertEquals(resp.status, 404)
def test_POST_bad_metadata(self):
if skip:
raise SkipTest
def post(url, token, parsed, conn, extra_headers):
headers = {'X-Auth-Token': token}
headers.update(extra_headers)
conn.request('POST', parsed.path + '/' + self.name, '', headers)
return check_response(conn)
resp = retry(post,
{'X-Container-Meta-' + ('k' * MAX_META_NAME_LENGTH): 'v'})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(post,
{'X-Container-Meta-' + ('k' * (MAX_META_NAME_LENGTH + 1)): 'v'})
resp.read()
self.assertEquals(resp.status, 400)
resp = retry(post,
{'X-Container-Meta-Too-Long': 'k' * MAX_META_VALUE_LENGTH})
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(post,
{'X-Container-Meta-Too-Long': 'k' * (MAX_META_VALUE_LENGTH + 1)})
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
for x in xrange(MAX_META_COUNT):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers = {}
for x in xrange(MAX_META_COUNT + 1):
headers['X-Container-Meta-%d' % x] = 'v'
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
headers = {}
header_value = 'k' * MAX_META_VALUE_LENGTH
size = 0
x = 0
while size < MAX_META_OVERALL_SIZE - 4 - MAX_META_VALUE_LENGTH:
size += 4 + MAX_META_VALUE_LENGTH
headers['X-Container-Meta-%04d' % x] = header_value
x += 1
if MAX_META_OVERALL_SIZE - size > 1:
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size - 1)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 204)
headers['X-Container-Meta-k'] = \
'v' * (MAX_META_OVERALL_SIZE - size)
resp = retry(post, headers)
resp.read()
self.assertEquals(resp.status, 400)
def test_public_container(self):
if skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET', parsed.path + '/' + self.name)
return check_response(conn)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception, err:
self.assert_(str(err).startswith('No result after '), err)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*,.rlistings'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
resp = retry(get)
resp.read()
self.assertEquals(resp.status, 204)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception, err:
self.assert_(str(err).startswith('No result after '), err)
def test_cross_account_container(self):
if skip or skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the second account
def get2(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container accessible by the second account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': swift_test_perm[1],
'X-Container-Write': swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now use the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# Make the container private again
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': '',
'X-Container-Write': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can't access the container with the second account again
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
def test_cross_account_public_container(self):
if skip or skip2:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the second account
def get2(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container completely public
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*,.rlistings'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# But we shouldn't be able to write with the second account
def put2(url, token, parsed, conn):
conn.request('PUT', first_account[0] + '/' + self.name + '/object',
'test object', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put2, use_account=2)
resp.read()
self.assertEquals(resp.status, 403)
# Now make the container also writeable by the second account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can still read the container with the second account
resp = retry(get2, use_account=2)
resp.read()
self.assertEquals(resp.status, 204)
# And that we can now write with the second account
resp = retry(put2, use_account=2)
resp.read()
self.assertEquals(resp.status, 201)
def test_nonadmin_user(self):
if skip or skip3:
raise SkipTest
# Obtain the first account's string
first_account = ['unknown']
def get1(url, token, parsed, conn):
first_account[0] = parsed.path
conn.request('HEAD', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get1)
resp.read()
# Ensure we can't access the container with the third account
def get3(url, token, parsed, conn):
conn.request('GET', first_account[0] + '/' + self.name, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Make the container accessible by the third account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token, 'X-Container-Read': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can now read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 204)
# But we shouldn't be able to write with the third account
def put3(url, token, parsed, conn):
conn.request('PUT', first_account[0] + '/' + self.name + '/object',
'test object', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put3, use_account=3)
resp.read()
self.assertEquals(resp.status, 403)
# Now make the container also writeable by the third account
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.name, '',
{'X-Auth-Token': token,
'X-Container-Write': swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEquals(resp.status, 204)
# Ensure we can still read the container with the third account
resp = retry(get3, use_account=3)
resp.read()
self.assertEquals(resp.status, 204)
# And that we can now write with the third account
resp = retry(put3, use_account=3)
resp.read()
self.assertEquals(resp.status, 201)
def test_long_name_content_type(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
container_name = 'X' * 2048
conn.request('PUT', '%s/%s' % (parsed.path,
container_name), 'there', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEquals(resp.status, 400)
self.assertEquals(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_null_name(self):
if skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/abc%%00def' % parsed.path, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
if (web_front_end == 'apache2'):
self.assertEquals(resp.status, 404)
else:
self.assertEquals(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEquals(resp.status, 412)
if __name__ == '__main__':
unittest.main()