f581fccf71
With the X-Timestamp validation added in commit e619411, end users could upload objects with X-Timestamp: 9999999999.99999_ffffffffffffffff (the maximum value) and Swift would be unable to delete them. Now, inbound X-Timestamp headers will be moved to X-Backend-Inbound-X-Timestamp, effectively rendering them harmless. The primary reason to allow X-Timestamp before was to prevent Last-Modified changes for objects coming from either: * container_sync or * a migration from another storage system. To enable the former use-case, the container_sync middleware will now translate X-Backend-Inbound-X-Timestamp headers back to X-Timestamp after verifying the request. Additionally, a new option is added to the gatekeeper filter config: # shunt_inbound_x_timestamp = true To enable the latter use-case (or any other use-case not mentioned), set this to false. Upgrade Consideration ===================== If your cluster workload requires that clients be allowed to specify objects' X-Timestamp values, disable the shunt_inbound_x_timestamp option before upgrading. UpgradeImpact Change-Id: I8799d5eb2ae9d795ba358bb422f69c70ee8ebd2c
1494 lines
56 KiB
Python
Executable File
1494 lines
56 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import datetime
|
|
import json
|
|
import unittest2
|
|
from unittest2 import SkipTest
|
|
from uuid import uuid4
|
|
import time
|
|
|
|
from six.moves import range
|
|
|
|
from test.functional import check_response, retry, requires_acls, \
|
|
requires_policies
|
|
import test.functional as tf
|
|
|
|
|
|
def setUpModule():
|
|
tf.setup_package()
|
|
|
|
|
|
def tearDownModule():
|
|
tf.teardown_package()
|
|
|
|
|
|
class TestObject(unittest2.TestCase):
|
|
|
|
def setUp(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
self.container = uuid4().hex
|
|
|
|
self.containers = []
|
|
self._create_container(self.container)
|
|
self._create_container(self.container, use_account=2)
|
|
|
|
self.obj = uuid4().hex
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, self.obj), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def _create_container(self, name=None, headers=None, use_account=1):
|
|
if not name:
|
|
name = uuid4().hex
|
|
self.containers.append(name)
|
|
headers = headers or {}
|
|
|
|
def put(url, token, parsed, conn, name):
|
|
new_headers = dict({'X-Auth-Token': token}, **headers)
|
|
conn.request('PUT', parsed.path + '/' + name, '',
|
|
new_headers)
|
|
return check_response(conn)
|
|
resp = retry(put, name, use_account=use_account)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# With keystoneauth we need the accounts to have had the project
|
|
# domain id persisted as sysmeta prior to testing ACLs. This may
|
|
# not be the case if, for example, the account was created using
|
|
# a request with reseller_admin role, when project domain id may
|
|
# not have been known. So we ensure that the project domain id is
|
|
# in sysmeta by making a POST to the accounts using an admin role.
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', parsed.path, '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(post, use_account=use_account)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
return name
|
|
|
|
def tearDown(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
# get list of objects in container
|
|
def get(url, token, parsed, conn, container):
|
|
conn.request(
|
|
'GET', parsed.path + '/' + container + '?format=json', '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# delete an object
|
|
def delete(url, token, parsed, conn, container, obj):
|
|
conn.request(
|
|
'DELETE', '/'.join([parsed.path, container, obj['name']]), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
for container in self.containers:
|
|
while True:
|
|
resp = retry(get, container)
|
|
body = resp.read()
|
|
if resp.status == 404:
|
|
break
|
|
self.assertTrue(resp.status // 100 == 2, resp.status)
|
|
objs = json.loads(body)
|
|
if not objs:
|
|
break
|
|
for obj in objs:
|
|
resp = retry(delete, container, obj)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# delete the container
|
|
def delete(url, token, parsed, conn, name):
|
|
conn.request('DELETE', parsed.path + '/' + name, '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
for container in self.containers:
|
|
resp = retry(delete, container)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
def test_if_none_match(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, 'if_none_match_test'), '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'If-None-Match': '*'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 412)
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, 'if_none_match_test'), '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'If-None-Match': 'somethingelse'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 400)
|
|
|
|
def test_too_small_x_timestamp(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'too_small_x_timestamp'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Timestamp': '-1'})
|
|
return check_response(conn)
|
|
|
|
def head(url, token, parsed, conn):
|
|
conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
|
|
'too_small_x_timestamp'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0'})
|
|
return check_response(conn)
|
|
ts_before = time.time()
|
|
resp = retry(put)
|
|
body = resp.read()
|
|
ts_after = time.time()
|
|
if resp.status == 400:
|
|
# shunt_inbound_x_timestamp must be false
|
|
self.assertIn(
|
|
'X-Timestamp should be a UNIX timestamp float value', body)
|
|
else:
|
|
self.assertEqual(resp.status, 201)
|
|
self.assertEqual(body, '')
|
|
resp = retry(head)
|
|
resp.read()
|
|
self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
|
|
self.assertLess(float(resp.headers['x-timestamp']), ts_after)
|
|
|
|
def test_too_big_x_timestamp(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'too_big_x_timestamp'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Timestamp': '99999999999.9999999999'})
|
|
return check_response(conn)
|
|
|
|
def head(url, token, parsed, conn):
|
|
conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
|
|
'too_big_x_timestamp'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0'})
|
|
return check_response(conn)
|
|
ts_before = time.time()
|
|
resp = retry(put)
|
|
body = resp.read()
|
|
ts_after = time.time()
|
|
if resp.status == 400:
|
|
# shunt_inbound_x_timestamp must be false
|
|
self.assertIn(
|
|
'X-Timestamp should be a UNIX timestamp float value', body)
|
|
else:
|
|
self.assertEqual(resp.status, 201)
|
|
self.assertEqual(body, '')
|
|
resp = retry(head)
|
|
resp.read()
|
|
self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
|
|
self.assertLess(float(resp.headers['x-timestamp']), ts_after)
|
|
|
|
def test_x_delete_after(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'x_delete_after'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Delete-After': '1'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def get(url, token, parsed, conn):
|
|
conn.request(
|
|
'GET',
|
|
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'),
|
|
'',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
resp = retry(get)
|
|
resp.read()
|
|
count = 0
|
|
while resp.status == 200 and count < 10:
|
|
resp = retry(get)
|
|
resp.read()
|
|
count += 1
|
|
time.sleep(1)
|
|
|
|
self.assertEqual(resp.status, 404)
|
|
|
|
# To avoid an error when the object deletion in tearDown(),
|
|
# the object is added again.
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def test_x_delete_at(self):
|
|
def put(url, token, parsed, conn):
|
|
dt = datetime.datetime.now()
|
|
epoch = time.mktime(dt.timetuple())
|
|
delete_time = str(int(epoch) + 3)
|
|
conn.request(
|
|
'PUT',
|
|
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
|
|
'',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Delete-At': delete_time})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def get(url, token, parsed, conn):
|
|
conn.request(
|
|
'GET',
|
|
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
|
|
'',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
resp = retry(get)
|
|
resp.read()
|
|
count = 0
|
|
while resp.status == 200 and count < 10:
|
|
resp = retry(get)
|
|
resp.read()
|
|
count += 1
|
|
time.sleep(1)
|
|
|
|
self.assertEqual(resp.status, 404)
|
|
|
|
# To avoid an error when the object deletion in tearDown(),
|
|
# the object is added again.
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def test_non_integer_x_delete_after(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'non_integer_x_delete_after'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Delete-After': '*'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 400)
|
|
self.assertEqual(body, 'Non-integer X-Delete-After')
|
|
|
|
def test_non_integer_x_delete_at(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'non_integer_x_delete_at'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Delete-At': '*'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 400)
|
|
self.assertEqual(body, 'Non-integer X-Delete-At')
|
|
|
|
def test_x_delete_at_in_the_past(self):
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
|
|
'x_delete_at_in_the_past'),
|
|
'', {'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Delete-At': '0'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 400)
|
|
self.assertEqual(body, 'X-Delete-At in past')
|
|
|
|
def test_copy_object(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
source = '%s/%s' % (self.container, self.obj)
|
|
dest = '%s/%s' % (self.container, 'test_copy')
|
|
|
|
# get contents of source
|
|
def get_source(url, token, parsed, conn):
|
|
conn.request('GET',
|
|
'%s/%s' % (parsed.path, source),
|
|
'', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get_source)
|
|
source_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(source_contents, 'test')
|
|
|
|
# copy source to dest with X-Copy-From
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Copy-From': source})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# contents of dest should be the same as source
|
|
def get_dest(url, token, parsed, conn):
|
|
conn.request('GET',
|
|
'%s/%s' % (parsed.path, dest),
|
|
'', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get_dest)
|
|
dest_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(dest_contents, source_contents)
|
|
|
|
# delete the copy
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
# verify dest does not exist
|
|
resp = retry(get_dest)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 404)
|
|
|
|
# copy source to dest with COPY
|
|
def copy(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination': dest})
|
|
return check_response(conn)
|
|
resp = retry(copy)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# contents of dest should be the same as source
|
|
resp = retry(get_dest)
|
|
dest_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(dest_contents, source_contents)
|
|
|
|
# copy source to dest with COPY and range
|
|
def copy(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination': dest,
|
|
'Range': 'bytes=1-2'})
|
|
return check_response(conn)
|
|
resp = retry(copy)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# contents of dest should be the same as source
|
|
resp = retry(get_dest)
|
|
dest_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(dest_contents, source_contents[1:3])
|
|
|
|
# delete the copy
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
def test_copy_between_accounts(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
source = '%s/%s' % (self.container, self.obj)
|
|
dest = '%s/%s' % (self.container, 'test_copy')
|
|
|
|
# get contents of source
|
|
def get_source(url, token, parsed, conn):
|
|
conn.request('GET',
|
|
'%s/%s' % (parsed.path, source),
|
|
'', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get_source)
|
|
source_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(source_contents, 'test')
|
|
|
|
acct = tf.parsed[0].path.split('/', 2)[2]
|
|
|
|
# copy source to dest with X-Copy-From-Account
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Copy-From-Account': acct,
|
|
'X-Copy-From': source})
|
|
return check_response(conn)
|
|
# try to put, will not succeed
|
|
# user does not have permissions to read from source
|
|
resp = retry(put, use_account=2)
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# add acl to allow reading from source
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Container-Read': tf.swift_test_perm[1]})
|
|
return check_response(conn)
|
|
resp = retry(post)
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# retry previous put, now should succeed
|
|
resp = retry(put, use_account=2)
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# contents of dest should be the same as source
|
|
def get_dest(url, token, parsed, conn):
|
|
conn.request('GET',
|
|
'%s/%s' % (parsed.path, dest),
|
|
'', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get_dest, use_account=2)
|
|
dest_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(dest_contents, source_contents)
|
|
|
|
# delete the copy
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete, use_account=2)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
# verify dest does not exist
|
|
resp = retry(get_dest, use_account=2)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 404)
|
|
|
|
acct_dest = tf.parsed[1].path.split('/', 2)[2]
|
|
|
|
# copy source to dest with COPY
|
|
def copy(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination-Account': acct_dest,
|
|
'Destination': dest})
|
|
return check_response(conn)
|
|
# try to copy, will not succeed
|
|
# user does not have permissions to write to destination
|
|
resp = retry(copy)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# add acl to allow write to destination
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Container-Write': tf.swift_test_perm[0]})
|
|
return check_response(conn)
|
|
resp = retry(post, use_account=2)
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# now copy will succeed
|
|
resp = retry(copy)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# contents of dest should be the same as source
|
|
resp = retry(get_dest, use_account=2)
|
|
dest_contents = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(dest_contents, source_contents)
|
|
|
|
# delete the copy
|
|
resp = retry(delete, use_account=2)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
def test_public_object(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET',
|
|
'%s/%s/%s' % (parsed.path, self.container, self.obj))
|
|
return check_response(conn)
|
|
try:
|
|
resp = retry(get)
|
|
raise Exception('Should not have been able to GET')
|
|
except Exception as err:
|
|
self.assertTrue(str(err).startswith('No result after '))
|
|
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', parsed.path + '/' + self.container, '',
|
|
{'X-Auth-Token': token,
|
|
'X-Container-Read': '.r:*'})
|
|
return check_response(conn)
|
|
resp = retry(post)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
resp = retry(get)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', parsed.path + '/' + self.container, '',
|
|
{'X-Auth-Token': token, 'X-Container-Read': ''})
|
|
return check_response(conn)
|
|
resp = retry(post)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
try:
|
|
resp = retry(get)
|
|
raise Exception('Should not have been able to GET')
|
|
except Exception as err:
|
|
self.assertTrue(str(err).startswith('No result after '))
|
|
|
|
def test_private_object(self):
|
|
if tf.skip or tf.skip3:
|
|
raise SkipTest
|
|
|
|
# Ensure we can't access the object with the third account
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/%s' % (
|
|
parsed.path, self.container, self.obj), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# create a shared container writable by account3
|
|
shared_container = uuid4().hex
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s' % (
|
|
parsed.path, shared_container), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Container-Read': tf.swift_test_perm[2],
|
|
'X-Container-Write': tf.swift_test_perm[2]})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# verify third account can not copy from private container
|
|
def copy(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'private_object'), '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Copy-From': '%s/%s' % (self.container, self.obj)})
|
|
return check_response(conn)
|
|
resp = retry(copy, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# verify third account can write "obj1" to shared container
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# verify third account can copy "obj1" to shared container
|
|
def copy2(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination': '%s/%s' % (shared_container, 'obj1')})
|
|
return check_response(conn)
|
|
resp = retry(copy2, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# verify third account STILL can not copy from private container
|
|
def copy3(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s/%s' % (
|
|
parsed.path, self.container, self.obj), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination': '%s/%s' % (shared_container,
|
|
'private_object')})
|
|
return check_response(conn)
|
|
resp = retry(copy3, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# clean up "obj1"
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# clean up shared_container
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE',
|
|
parsed.path + '/' + shared_container, '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
def test_container_write_only(self):
|
|
if tf.skip or tf.skip3:
|
|
raise SkipTest
|
|
|
|
# Ensure we can't access the object with the third account
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/%s' % (
|
|
parsed.path, self.container, self.obj), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# create a shared container writable (but not readable) by account3
|
|
shared_container = uuid4().hex
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s' % (
|
|
parsed.path, shared_container), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Container-Write': tf.swift_test_perm[2]})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# verify third account can write "obj1" to shared container
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# verify third account cannot copy "obj1" to shared container
|
|
def copy(url, token, parsed, conn):
|
|
conn.request('COPY', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), '',
|
|
{'X-Auth-Token': token,
|
|
'Destination': '%s/%s' % (shared_container, 'obj2')})
|
|
return check_response(conn)
|
|
resp = retry(copy, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# verify third account can POST to "obj1" in shared container
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Object-Meta-Color': 'blue'})
|
|
return check_response(conn)
|
|
resp = retry(post, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 202)
|
|
|
|
# verify third account can DELETE from shared container
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s/%s' % (
|
|
parsed.path, shared_container, 'obj1'), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete, use_account=3)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# clean up shared_container
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE',
|
|
parsed.path + '/' + shared_container, '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
@requires_acls
|
|
def test_read_only(self):
|
|
if tf.skip3:
|
|
raise tf.SkipTest
|
|
|
|
def get_listing(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def post_account(url, token, parsed, conn, headers):
|
|
new_headers = dict({'X-Auth-Token': token}, **headers)
|
|
conn.request('POST', parsed.path, '', new_headers)
|
|
return check_response(conn)
|
|
|
|
def get(url, token, parsed, conn, name):
|
|
conn.request('GET', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def put(url, token, parsed, conn, name):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def delete(url, token, parsed, conn, name):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# cannot list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# cannot get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# grant read-only access
|
|
acl_user = tf.swift_test_user[2]
|
|
acl = {'read-only': [acl_user]}
|
|
headers = {'x-account-access-control': json.dumps(acl)}
|
|
resp = retry(post_account, headers=headers, use_account=1)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# can list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertIn(self.obj, listing)
|
|
|
|
# can get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(body, 'test')
|
|
|
|
# can not put an object
|
|
obj_name = str(uuid4())
|
|
resp = retry(put, obj_name, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# can not delete an object
|
|
resp = retry(delete, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# sanity with account1
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertNotIn(obj_name, listing)
|
|
self.assertIn(self.obj, listing)
|
|
|
|
@requires_acls
|
|
def test_read_write(self):
|
|
if tf.skip3:
|
|
raise SkipTest
|
|
|
|
def get_listing(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def post_account(url, token, parsed, conn, headers):
|
|
new_headers = dict({'X-Auth-Token': token}, **headers)
|
|
conn.request('POST', parsed.path, '', new_headers)
|
|
return check_response(conn)
|
|
|
|
def get(url, token, parsed, conn, name):
|
|
conn.request('GET', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def put(url, token, parsed, conn, name):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def delete(url, token, parsed, conn, name):
|
|
conn.request('DELETE', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# cannot list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# cannot get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# grant read-write access
|
|
acl_user = tf.swift_test_user[2]
|
|
acl = {'read-write': [acl_user]}
|
|
headers = {'x-account-access-control': json.dumps(acl)}
|
|
resp = retry(post_account, headers=headers, use_account=1)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# can list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertIn(self.obj, listing)
|
|
|
|
# can get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(body, 'test')
|
|
|
|
# can put an object
|
|
obj_name = str(uuid4())
|
|
resp = retry(put, obj_name, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# can delete an object
|
|
resp = retry(delete, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# sanity with account1
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertIn(obj_name, listing)
|
|
self.assertNotIn(self.obj, listing)
|
|
|
|
@requires_acls
|
|
def test_admin(self):
|
|
if tf.skip3:
|
|
raise SkipTest
|
|
|
|
def get_listing(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def post_account(url, token, parsed, conn, headers):
|
|
new_headers = dict({'X-Auth-Token': token}, **headers)
|
|
conn.request('POST', parsed.path, '', new_headers)
|
|
return check_response(conn)
|
|
|
|
def get(url, token, parsed, conn, name):
|
|
conn.request('GET', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def put(url, token, parsed, conn, name):
|
|
conn.request('PUT', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), 'test',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def delete(url, token, parsed, conn, name):
|
|
conn.request('DELETE', '%s/%s/%s' % (
|
|
parsed.path, self.container, name), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# cannot list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# cannot get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# grant admin access
|
|
acl_user = tf.swift_test_user[2]
|
|
acl = {'admin': [acl_user]}
|
|
headers = {'x-account-access-control': json.dumps(acl)}
|
|
resp = retry(post_account, headers=headers, use_account=1)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# can list objects
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertIn(self.obj, listing)
|
|
|
|
# can get object
|
|
resp = retry(get, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(body, 'test')
|
|
|
|
# can put an object
|
|
obj_name = str(uuid4())
|
|
resp = retry(put, obj_name, use_account=3)
|
|
body = resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# can delete an object
|
|
resp = retry(delete, self.obj, use_account=3)
|
|
body = resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# sanity with account1
|
|
resp = retry(get_listing, use_account=3)
|
|
listing = resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertIn(obj_name, listing)
|
|
self.assertNotIn(self.obj, listing)
|
|
|
|
def test_manifest(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
# Data for the object segments
|
|
segments1 = ['one', 'two', 'three', 'four', 'five']
|
|
segments2 = ['six', 'seven', 'eight']
|
|
segments3 = ['nine', 'ten', 'eleven']
|
|
|
|
# Upload the first set of segments
|
|
def put(url, token, parsed, conn, objnum):
|
|
conn.request('PUT', '%s/%s/segments1/%s' % (
|
|
parsed.path, self.container, str(objnum)), segments1[objnum],
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments1)):
|
|
resp = retry(put, objnum)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Upload the manifest
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {
|
|
'X-Auth-Token': token,
|
|
'X-Object-Manifest': '%s/segments1/' % self.container,
|
|
'Content-Type': 'text/jibberish', 'Content-Length': '0'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Get the manifest (should get all the segments as the body)
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments1))
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(resp.getheader('content-type'), 'text/jibberish')
|
|
|
|
# Get with a range at the start of the second segment
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {
|
|
'X-Auth-Token': token, 'Range': 'bytes=3-'})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments1[1:]))
|
|
self.assertEqual(resp.status, 206)
|
|
|
|
# Get with a range in the middle of the second segment
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {
|
|
'X-Auth-Token': token, 'Range': 'bytes=5-'})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments1)[5:])
|
|
self.assertEqual(resp.status, 206)
|
|
|
|
# Get with a full start and stop range
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {
|
|
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments1)[5:11])
|
|
self.assertEqual(resp.status, 206)
|
|
|
|
# Upload the second set of segments
|
|
def put(url, token, parsed, conn, objnum):
|
|
conn.request('PUT', '%s/%s/segments2/%s' % (
|
|
parsed.path, self.container, str(objnum)), segments2[objnum],
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments2)):
|
|
resp = retry(put, objnum)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Get the manifest (should still be the first segments of course)
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments1))
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
# Update the manifest
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {
|
|
'X-Auth-Token': token,
|
|
'X-Object-Manifest': '%s/segments2/' % self.container,
|
|
'Content-Length': '0'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Get the manifest (should be the second set of segments now)
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments2))
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
if not tf.skip3:
|
|
|
|
# Ensure we can't access the manifest with the third account
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# Grant access to the third account
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', '%s/%s' % (parsed.path, self.container),
|
|
'', {'X-Auth-Token': token,
|
|
'X-Container-Read': tf.swift_test_perm[2]})
|
|
return check_response(conn)
|
|
resp = retry(post)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# The third account should be able to get the manifest now
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
self.assertEqual(resp.read(), ''.join(segments2))
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
# Create another container for the third set of segments
|
|
acontainer = uuid4().hex
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', parsed.path + '/' + acontainer, '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Upload the third set of segments in the other container
|
|
def put(url, token, parsed, conn, objnum):
|
|
conn.request('PUT', '%s/%s/segments3/%s' % (
|
|
parsed.path, acontainer, str(objnum)), segments3[objnum],
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments3)):
|
|
resp = retry(put, objnum)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Update the manifest
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '',
|
|
{'X-Auth-Token': token,
|
|
'X-Object-Manifest': '%s/segments3/' % acontainer,
|
|
'Content-Length': '0'})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
# Get the manifest to ensure it's the third set of segments
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get)
|
|
self.assertEqual(resp.read(), ''.join(segments3))
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
if not tf.skip3:
|
|
|
|
# Ensure we can't access the manifest with the third account
|
|
# (because the segments are in a protected container even if the
|
|
# manifest itself is not).
|
|
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 403)
|
|
|
|
# Grant access to the third account
|
|
def post(url, token, parsed, conn):
|
|
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
|
|
'', {'X-Auth-Token': token,
|
|
'X-Container-Read': tf.swift_test_perm[2]})
|
|
return check_response(conn)
|
|
resp = retry(post)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 204)
|
|
|
|
# The third account should be able to get the manifest now
|
|
def get(url, token, parsed, conn):
|
|
conn.request('GET', '%s/%s/manifest' % (
|
|
parsed.path, self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(get, use_account=3)
|
|
self.assertEqual(resp.read(), ''.join(segments3))
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
# Delete the manifest
|
|
def delete(url, token, parsed, conn, objnum):
|
|
conn.request('DELETE', '%s/%s/manifest' % (
|
|
parsed.path,
|
|
self.container), '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete, objnum)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# Delete the third set of segments
|
|
def delete(url, token, parsed, conn, objnum):
|
|
conn.request('DELETE', '%s/%s/segments3/%s' % (
|
|
parsed.path, acontainer, str(objnum)), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments3)):
|
|
resp = retry(delete, objnum)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# Delete the second set of segments
|
|
def delete(url, token, parsed, conn, objnum):
|
|
conn.request('DELETE', '%s/%s/segments2/%s' % (
|
|
parsed.path, self.container, str(objnum)), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments2)):
|
|
resp = retry(delete, objnum)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# Delete the first set of segments
|
|
def delete(url, token, parsed, conn, objnum):
|
|
conn.request('DELETE', '%s/%s/segments1/%s' % (
|
|
parsed.path, self.container, str(objnum)), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
for objnum in range(len(segments1)):
|
|
resp = retry(delete, objnum)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
# Delete the extra container
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
|
|
{'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
|
|
def test_delete_content_type(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/hi' % (parsed.path, self.container),
|
|
'there', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
|
|
'', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertIn(resp.status, (204, 404))
|
|
self.assertEqual(resp.getheader('Content-Type'),
|
|
'text/html; charset=UTF-8')
|
|
|
|
def test_delete_if_delete_at_bad(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT',
|
|
'%s/%s/hi-delete-bad' % (parsed.path, self.container),
|
|
'there', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def delete(url, token, parsed, conn):
|
|
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
|
|
'', {'X-Auth-Token': token,
|
|
'X-If-Delete-At': 'bad'})
|
|
return check_response(conn)
|
|
resp = retry(delete)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 400)
|
|
|
|
def test_null_name(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
def put(url, token, parsed, conn):
|
|
conn.request('PUT', '%s/%s/abc%%00def' % (
|
|
parsed.path,
|
|
self.container), 'test', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
resp = retry(put)
|
|
if (tf.web_front_end == 'apache2'):
|
|
self.assertEqual(resp.status, 404)
|
|
else:
|
|
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
|
|
self.assertEqual(resp.status, 412)
|
|
|
|
def test_cors(self):
|
|
if tf.skip:
|
|
raise SkipTest
|
|
|
|
try:
|
|
strict_cors = tf.cluster_info['swift']['strict_cors_mode']
|
|
except KeyError:
|
|
raise SkipTest("cors mode is unknown")
|
|
|
|
def put_cors_cont(url, token, parsed, conn, orig):
|
|
conn.request(
|
|
'PUT', '%s/%s' % (parsed.path, self.container),
|
|
'', {'X-Auth-Token': token,
|
|
'X-Container-Meta-Access-Control-Allow-Origin': orig})
|
|
return check_response(conn)
|
|
|
|
def put_obj(url, token, parsed, conn, obj):
|
|
conn.request(
|
|
'PUT', '%s/%s/%s' % (parsed.path, self.container, obj),
|
|
'test', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
def check_cors(url, token, parsed, conn,
|
|
method, obj, headers):
|
|
if method != 'OPTIONS':
|
|
headers['X-Auth-Token'] = token
|
|
conn.request(
|
|
method, '%s/%s/%s' % (parsed.path, self.container, obj),
|
|
'', headers)
|
|
return conn.getresponse()
|
|
|
|
resp = retry(put_cors_cont, '*')
|
|
resp.read()
|
|
self.assertEqual(resp.status // 100, 2)
|
|
|
|
resp = retry(put_obj, 'cat')
|
|
resp.read()
|
|
self.assertEqual(resp.status // 100, 2)
|
|
|
|
resp = retry(check_cors,
|
|
'OPTIONS', 'cat', {'Origin': 'http://m.com'})
|
|
self.assertEqual(resp.status, 401)
|
|
|
|
resp = retry(check_cors,
|
|
'OPTIONS', 'cat',
|
|
{'Origin': 'http://m.com',
|
|
'Access-Control-Request-Method': 'GET'})
|
|
|
|
self.assertEqual(resp.status, 200)
|
|
resp.read()
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertEqual(headers.get('access-control-allow-origin'),
|
|
'*')
|
|
|
|
resp = retry(check_cors,
|
|
'GET', 'cat', {'Origin': 'http://m.com'})
|
|
self.assertEqual(resp.status, 200)
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertEqual(headers.get('access-control-allow-origin'),
|
|
'*')
|
|
|
|
resp = retry(check_cors,
|
|
'GET', 'cat', {'Origin': 'http://m.com',
|
|
'X-Web-Mode': 'True'})
|
|
self.assertEqual(resp.status, 200)
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertEqual(headers.get('access-control-allow-origin'),
|
|
'*')
|
|
|
|
####################
|
|
|
|
resp = retry(put_cors_cont, 'http://secret.com')
|
|
resp.read()
|
|
self.assertEqual(resp.status // 100, 2)
|
|
|
|
resp = retry(check_cors,
|
|
'OPTIONS', 'cat',
|
|
{'Origin': 'http://m.com',
|
|
'Access-Control-Request-Method': 'GET'})
|
|
resp.read()
|
|
self.assertEqual(resp.status, 401)
|
|
|
|
if strict_cors:
|
|
resp = retry(check_cors,
|
|
'GET', 'cat', {'Origin': 'http://m.com'})
|
|
resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertNotIn('access-control-allow-origin', headers)
|
|
|
|
resp = retry(check_cors,
|
|
'GET', 'cat', {'Origin': 'http://secret.com'})
|
|
resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertEqual(headers.get('access-control-allow-origin'),
|
|
'http://secret.com')
|
|
else:
|
|
resp = retry(check_cors,
|
|
'GET', 'cat', {'Origin': 'http://m.com'})
|
|
resp.read()
|
|
self.assertEqual(resp.status, 200)
|
|
headers = dict((k.lower(), v) for k, v in resp.getheaders())
|
|
self.assertEqual(headers.get('access-control-allow-origin'),
|
|
'http://m.com')
|
|
|
|
@requires_policies
|
|
def test_cross_policy_copy(self):
|
|
# create container in first policy
|
|
policy = self.policies.select()
|
|
container = self._create_container(
|
|
headers={'X-Storage-Policy': policy['name']})
|
|
obj = uuid4().hex
|
|
|
|
# create a container in second policy
|
|
other_policy = self.policies.exclude(name=policy['name']).select()
|
|
other_container = self._create_container(
|
|
headers={'X-Storage-Policy': other_policy['name']})
|
|
other_obj = uuid4().hex
|
|
|
|
def put_obj(url, token, parsed, conn, container, obj):
|
|
# to keep track of things, use the original path as the body
|
|
content = '%s/%s' % (container, obj)
|
|
path = '%s/%s' % (parsed.path, content)
|
|
conn.request('PUT', path, content, {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# create objects
|
|
for c, o in zip((container, other_container), (obj, other_obj)):
|
|
resp = retry(put_obj, c, o)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def put_copy_from(url, token, parsed, conn, container, obj, source):
|
|
dest_path = '%s/%s/%s' % (parsed.path, container, obj)
|
|
conn.request('PUT', dest_path, '',
|
|
{'X-Auth-Token': token,
|
|
'Content-Length': '0',
|
|
'X-Copy-From': source})
|
|
return check_response(conn)
|
|
|
|
copy_requests = (
|
|
(container, other_obj, '%s/%s' % (other_container, other_obj)),
|
|
(other_container, obj, '%s/%s' % (container, obj)),
|
|
)
|
|
|
|
# copy objects
|
|
for c, o, source in copy_requests:
|
|
resp = retry(put_copy_from, c, o, source)
|
|
resp.read()
|
|
self.assertEqual(resp.status, 201)
|
|
|
|
def get_obj(url, token, parsed, conn, container, obj):
|
|
path = '%s/%s/%s' % (parsed.path, container, obj)
|
|
conn.request('GET', path, '', {'X-Auth-Token': token})
|
|
return check_response(conn)
|
|
|
|
# validate contents, contents should be source
|
|
validate_requests = copy_requests
|
|
for c, o, body in validate_requests:
|
|
resp = retry(get_obj, c, o)
|
|
self.assertEqual(resp.status, 200)
|
|
self.assertEqual(body, resp.read())
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest2.main()
|