swift/test/functional/test_object.py
John Dickinson adc568c97f Fix bulk responses when using xml and Expect 100-continue
When we fixed bulk response heartbeating in https://review.openstack.org/#/c/510715/,
code review raised the issue of moving the xml header down to after the
early-exit clauses. At the time, it didn't seem to break anything, so
it was left in place. However, that insight was correct.

The purpose of the earlier patch was to force eventlet to use chunked
transfer encoding on the response in order to prevent eventlet from
buffering the whole response, thus defeating the purpose of the
heartbeat responses.

Moving the first line of the body lower (ie after the early exit
checks), allows other headers in a chunked transfer encoding response
to be appropriately processed before sending the headers. Sending the
xml declaration early causes it to get intermingled in the 100-continue
protocol, thus breaking the chunked transfer encoding semantics.

Closes-Bug: #1819252

Change-Id: I072f4dab21cd7cdb81b9e41072eb504131411dc8
2019-03-18 20:41:29 +00:00

1684 lines
62 KiB
Python

#!/usr/bin/python
# Copyright (c) 2010-2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import json
import unittest2
from uuid import uuid4
import time
from xml.dom import minidom
from six.moves import range
from test.functional import check_response, retry, requires_acls, \
requires_policies, SkipTest, requires_bulk
import test.functional as tf
def setUpModule():
tf.setup_package()
def tearDownModule():
tf.teardown_package()
class TestObject(unittest2.TestCase):
def setUp(self):
if tf.skip or tf.skip2:
raise SkipTest
if tf.in_process:
tf.skip_if_no_xattrs()
self.container = uuid4().hex
self.containers = []
self._create_container(self.container)
self._create_container(self.container, use_account=2)
self.obj = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, self.obj), 'test',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def _create_container(self, name=None, headers=None, use_account=1):
if not name:
name = uuid4().hex
self.containers.append(name)
headers = headers or {}
def put(url, token, parsed, conn, name):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('PUT', parsed.path + '/' + name, '',
new_headers)
return check_response(conn)
resp = retry(put, name, use_account=use_account)
resp.read()
self.assertEqual(resp.status, 201)
# With keystoneauth we need the accounts to have had the project
# domain id persisted as sysmeta prior to testing ACLs. This may
# not be the case if, for example, the account was created using
# a request with reseller_admin role, when project domain id may
# not have been known. So we ensure that the project domain id is
# in sysmeta by making a POST to the accounts using an admin role.
def post(url, token, parsed, conn):
conn.request('POST', parsed.path, '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(post, use_account=use_account)
resp.read()
self.assertEqual(resp.status, 204)
return name
def tearDown(self):
if tf.skip:
raise SkipTest
# get list of objects in container
def get(url, token, parsed, conn, container):
conn.request(
'GET', parsed.path + '/' + container + '?format=json', '',
{'X-Auth-Token': token})
return check_response(conn)
# delete an object
def delete(url, token, parsed, conn, container, obj):
path = '/'.join([parsed.path, container,
obj['name'].encode('utf8')])
conn.request('DELETE', path, '', {'X-Auth-Token': token})
return check_response(conn)
for container in self.containers:
while True:
resp = retry(get, container)
body = resp.read()
if resp.status == 404:
break
self.assertEqual(resp.status // 100, 2, resp.status)
objs = json.loads(body)
if not objs:
break
for obj in objs:
resp = retry(delete, container, obj)
resp.read()
self.assertIn(resp.status, (204, 404))
# delete the container
def delete(url, token, parsed, conn, name):
conn.request('DELETE', parsed.path + '/' + name, '',
{'X-Auth-Token': token})
return check_response(conn)
for container in self.containers:
resp = retry(delete, container)
resp.read()
self.assertIn(resp.status, (204, 404))
def test_metadata(self):
obj = 'test_metadata'
req_metadata = {}
def put(url, token, parsed, conn):
headers = {'X-Auth-Token': token}
headers.update(req_metadata)
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, obj
), '', headers)
return check_response(conn)
def get(url, token, parsed, conn):
conn.request(
'GET',
'%s/%s/%s' % (parsed.path, self.container, obj),
'',
{'X-Auth-Token': token})
return check_response(conn)
def post(url, token, parsed, conn):
headers = {'X-Auth-Token': token}
headers.update(req_metadata)
conn.request('POST', '%s/%s/%s' % (
parsed.path, self.container, obj
), '', headers)
return check_response(conn)
def metadata(resp):
metadata = {}
for k, v in resp.headers.items():
if 'meta' in k.lower():
metadata[k] = v
return metadata
# empty put
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {})
# empty post
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 202)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {})
# metadata put
req_metadata = {
'x-object-meta-Color': 'blUe',
'X-Object-Meta-food': 'PizZa',
}
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Color': 'blUe',
'X-Object-Meta-Food': 'PizZa',
})
# metadata post
req_metadata = {'X-Object-Meta-color': 'oraNge'}
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 202)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Color': 'oraNge'
})
# sysmeta put
req_metadata = {
'X-Object-Meta-Color': 'Red',
'X-Object-Sysmeta-Color': 'Green',
'X-Object-Transient-Sysmeta-Color': 'Blue',
}
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Color': 'Red',
})
# sysmeta post
req_metadata = {
'X-Object-Meta-Food': 'Burger',
'X-Object-Meta-Animal': 'Cat',
'X-Object-Sysmeta-Animal': 'Cow',
'X-Object-Transient-Sysmeta-Food': 'Burger',
}
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 202)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Food': 'Burger',
'X-Object-Meta-Animal': 'Cat',
})
# non-ascii put
req_metadata = {
'X-Object-Meta-Foo': u'B\u00e2r',
}
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Foo': 'B\xc3\xa2r',
})
# non-ascii post
req_metadata = {
'X-Object-Meta-Foo': u'B\u00e5z',
}
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 202)
resp = retry(get)
self.assertEqual('', resp.read())
self.assertEqual(resp.status, 200)
self.assertEqual(metadata(resp), {
'X-Object-Meta-Foo': 'B\xc3\xa5z',
})
def test_if_none_match(self):
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, self.container, 'if_none_match_test'), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, 'if_none_match_test'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'If-None-Match': '*'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 412)
resp = retry(delete)
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, 'if_none_match_test'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'If-None-Match': 'somethingelse'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 400)
def test_too_small_x_timestamp(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'too_small_x_timestamp'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Timestamp': '-1'})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
'too_small_x_timestamp'),
'', {'X-Auth-Token': token,
'Content-Length': '0'})
return check_response(conn)
ts_before = time.time()
time.sleep(0.05)
resp = retry(put)
body = resp.read()
time.sleep(0.05)
ts_after = time.time()
if resp.status == 400:
# shunt_inbound_x_timestamp must be false
self.assertIn(
'X-Timestamp should be a UNIX timestamp float value', body)
else:
self.assertEqual(resp.status, 201)
self.assertEqual(body, '')
resp = retry(head)
resp.read()
self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
self.assertLess(float(resp.headers['x-timestamp']), ts_after)
def test_too_big_x_timestamp(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'too_big_x_timestamp'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Timestamp': '99999999999.9999999999'})
return check_response(conn)
def head(url, token, parsed, conn):
conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container,
'too_big_x_timestamp'),
'', {'X-Auth-Token': token,
'Content-Length': '0'})
return check_response(conn)
ts_before = time.time()
time.sleep(0.05)
resp = retry(put)
body = resp.read()
time.sleep(0.05)
ts_after = time.time()
if resp.status == 400:
# shunt_inbound_x_timestamp must be false
self.assertIn(
'X-Timestamp should be a UNIX timestamp float value', body)
else:
self.assertEqual(resp.status, 201)
self.assertEqual(body, '')
resp = retry(head)
resp.read()
self.assertGreater(float(resp.headers['x-timestamp']), ts_before)
self.assertLess(float(resp.headers['x-timestamp']), ts_after)
def test_x_delete_after(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'x_delete_after'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Delete-After': '2'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def get(url, token, parsed, conn):
conn.request(
'GET',
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'),
'',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
resp.read()
count = 0
while resp.status == 200 and count < 10:
resp = retry(get)
resp.read()
count += 1
time.sleep(0.5)
self.assertEqual(resp.status, 404)
# To avoid an error when the object deletion in tearDown(),
# the object is added again.
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def test_x_delete_at(self):
def put(url, token, parsed, conn):
dt = datetime.datetime.now()
epoch = time.mktime(dt.timetuple())
delete_time = str(int(epoch) + 3)
conn.request(
'PUT',
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
'',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Delete-At': delete_time})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def get(url, token, parsed, conn):
conn.request(
'GET',
'%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'),
'',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
resp.read()
count = 0
while resp.status == 200 and count < 10:
resp = retry(get)
resp.read()
count += 1
time.sleep(1)
self.assertEqual(resp.status, 404)
# To avoid an error when the object deletion in tearDown(),
# the object is added again.
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def test_non_integer_x_delete_after(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'non_integer_x_delete_after'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Delete-After': '*'})
return check_response(conn)
resp = retry(put)
body = resp.read()
self.assertEqual(resp.status, 400)
self.assertEqual(body, 'Non-integer X-Delete-After')
def test_non_integer_x_delete_at(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'non_integer_x_delete_at'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Delete-At': '*'})
return check_response(conn)
resp = retry(put)
body = resp.read()
self.assertEqual(resp.status, 400)
self.assertEqual(body, 'Non-integer X-Delete-At')
def test_x_delete_at_in_the_past(self):
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container,
'x_delete_at_in_the_past'),
'', {'X-Auth-Token': token,
'Content-Length': '0',
'X-Delete-At': '0'})
return check_response(conn)
resp = retry(put)
body = resp.read()
self.assertEqual(resp.status, 400)
self.assertEqual(body, 'X-Delete-At in past')
def test_copy_object(self):
if tf.skip:
raise SkipTest
source = '%s/%s' % (self.container, self.obj)
dest = '%s/%s' % (self.container, 'test_copy')
# get contents of source
def get_source(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, source),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(source_contents, 'test')
# copy source to dest with X-Copy-From
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From': source})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, dest),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
# verify dest does not exist
resp = retry(get_dest)
resp.read()
self.assertEqual(resp.status, 404)
# copy source to dest with COPY
def copy(url, token, parsed, conn):
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
{'X-Auth-Token': token,
'Destination': dest})
return check_response(conn)
resp = retry(copy)
resp.read()
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# copy source to dest with COPY and range
def copy(url, token, parsed, conn):
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
{'X-Auth-Token': token,
'Destination': dest,
'Range': 'bytes=1-2'})
return check_response(conn)
resp = retry(copy)
resp.read()
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest)
dest_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents[1:3])
# delete the copy
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
def test_copy_between_accounts(self):
if tf.skip2:
raise SkipTest
source = '%s/%s' % (self.container, self.obj)
dest = '%s/%s' % (self.container, 'test_copy')
# get contents of source
def get_source(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, source),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_source)
source_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(source_contents, 'test')
acct = tf.parsed[0].path.split('/', 2)[2]
# copy source to dest with X-Copy-From-Account
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From-Account': acct,
'X-Copy-From': source})
return check_response(conn)
# try to put, will not succeed
# user does not have permissions to read from source
resp = retry(put, use_account=2)
self.assertEqual(resp.status, 403)
# add acl to allow reading from source
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token,
'X-Container-Read': tf.swift_test_perm[1]})
return check_response(conn)
resp = retry(post)
self.assertEqual(resp.status, 204)
# retry previous put, now should succeed
resp = retry(put, use_account=2)
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
def get_dest(url, token, parsed, conn):
conn.request('GET',
'%s/%s' % (parsed.path, dest),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get_dest, use_account=2)
dest_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# delete the copy
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, dest), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, use_account=2)
resp.read()
self.assertIn(resp.status, (204, 404))
# verify dest does not exist
resp = retry(get_dest, use_account=2)
resp.read()
self.assertEqual(resp.status, 404)
acct_dest = tf.parsed[1].path.split('/', 2)[2]
# copy source to dest with COPY
def copy(url, token, parsed, conn):
conn.request('COPY', '%s/%s' % (parsed.path, source), '',
{'X-Auth-Token': token,
'Destination-Account': acct_dest,
'Destination': dest})
return check_response(conn)
# try to copy, will not succeed
# user does not have permissions to write to destination
resp = retry(copy)
resp.read()
self.assertEqual(resp.status, 403)
# add acl to allow write to destination
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token,
'X-Container-Write': tf.swift_test_perm[0]})
return check_response(conn)
resp = retry(post, use_account=2)
self.assertEqual(resp.status, 204)
# now copy will succeed
resp = retry(copy)
resp.read()
self.assertEqual(resp.status, 201)
# contents of dest should be the same as source
resp = retry(get_dest, use_account=2)
dest_contents = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(dest_contents, source_contents)
# delete the copy
resp = retry(delete, use_account=2)
resp.read()
self.assertIn(resp.status, (204, 404))
def test_public_object(self):
if tf.skip:
raise SkipTest
def get(url, token, parsed, conn):
conn.request('GET',
'%s/%s/%s' % (parsed.path, self.container, self.obj))
return check_response(conn)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assertTrue(str(err).startswith('No result after '))
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token,
'X-Container-Read': '.r:*'})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 204)
resp = retry(get)
resp.read()
self.assertEqual(resp.status, 200)
def post(url, token, parsed, conn):
conn.request('POST', parsed.path + '/' + self.container, '',
{'X-Auth-Token': token, 'X-Container-Read': ''})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 204)
try:
resp = retry(get)
raise Exception('Should not have been able to GET')
except Exception as err:
self.assertTrue(str(err).startswith('No result after '))
def test_private_object(self):
if tf.skip or tf.skip3:
raise SkipTest
# Ensure we can't access the object with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, self.obj), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# create a shared container writable by account3
shared_container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (
parsed.path, shared_container), '',
{'X-Auth-Token': token,
'X-Container-Read': tf.swift_test_perm[2],
'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# verify third account can not copy from private container
def copy(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, shared_container, 'private_object'), '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From': '%s/%s' % (self.container, self.obj)})
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), 'test',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
self.assertEqual(resp.status, 201)
# verify third account can copy "obj1" to shared container
def copy2(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token,
'Destination': '%s/%s' % (shared_container, 'obj1')})
return check_response(conn)
resp = retry(copy2, use_account=3)
resp.read()
self.assertEqual(resp.status, 201)
# verify third account STILL can not copy from private container
def copy3(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (
parsed.path, self.container, self.obj), '',
{'X-Auth-Token': token,
'Destination': '%s/%s' % (shared_container,
'private_object')})
return check_response(conn)
resp = retry(copy3, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# clean up "obj1"
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
# clean up shared_container
def delete(url, token, parsed, conn):
conn.request('DELETE',
parsed.path + '/' + shared_container, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
def test_container_write_only(self):
if tf.skip or tf.skip3:
raise SkipTest
# Ensure we can't access the object with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, self.obj), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# create a shared container writable (but not readable) by account3
shared_container = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s' % (
parsed.path, shared_container), '',
{'X-Auth-Token': token,
'X-Container-Write': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# verify third account can write "obj1" to shared container
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), 'test',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put, use_account=3)
resp.read()
self.assertEqual(resp.status, 201)
# verify third account cannot copy "obj1" to shared container
def copy(url, token, parsed, conn):
conn.request('COPY', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token,
'Destination': '%s/%s' % (shared_container, 'obj2')})
return check_response(conn)
resp = retry(copy, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# verify third account can POST to "obj1" in shared container
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token,
'X-Object-Meta-Color': 'blue'})
return check_response(conn)
resp = retry(post, use_account=3)
resp.read()
self.assertEqual(resp.status, 202)
# verify third account can DELETE from shared container
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, shared_container, 'obj1'), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, use_account=3)
resp.read()
self.assertIn(resp.status, (204, 404))
# clean up shared_container
def delete(url, token, parsed, conn):
conn.request('DELETE',
parsed.path + '/' + shared_container, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
@requires_acls
def test_read_only(self):
if tf.skip3:
raise tf.SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# grant read-only access
acl_user = tf.swift_test_user[2]
acl = {'read-only': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(body, 'test')
# can not put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 403)
# can not delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 403)
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertNotIn(obj_name, listing)
self.assertIn(self.obj, listing)
@requires_acls
def test_read_write(self):
if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# grant read-write access
acl_user = tf.swift_test_user[2]
acl = {'read-write': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertIn(resp.status, (204, 404))
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertIn(obj_name, listing)
self.assertNotIn(self.obj, listing)
@requires_acls
def test_admin(self):
if tf.skip3:
raise SkipTest
def get_listing(url, token, parsed, conn):
conn.request('GET', '%s/%s' % (parsed.path, self.container), '',
{'X-Auth-Token': token})
return check_response(conn)
def post_account(url, token, parsed, conn, headers):
new_headers = dict({'X-Auth-Token': token}, **headers)
conn.request('POST', parsed.path, '', new_headers)
return check_response(conn)
def get(url, token, parsed, conn, name):
conn.request('GET', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
def put(url, token, parsed, conn, name):
conn.request('PUT', '%s/%s/%s' % (
parsed.path, self.container, name), 'test',
{'X-Auth-Token': token})
return check_response(conn)
def delete(url, token, parsed, conn, name):
conn.request('DELETE', '%s/%s/%s' % (
parsed.path, self.container, name), '',
{'X-Auth-Token': token})
return check_response(conn)
# cannot list objects
resp = retry(get_listing, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# cannot get object
resp = retry(get, self.obj, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# grant admin access
acl_user = tf.swift_test_user[2]
acl = {'admin': [acl_user]}
headers = {'x-account-access-control': json.dumps(acl)}
resp = retry(post_account, headers=headers, use_account=1)
resp.read()
self.assertEqual(resp.status, 204)
# can list objects
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertIn(self.obj, listing)
# can get object
resp = retry(get, self.obj, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 200)
self.assertEqual(body, 'test')
# can put an object
obj_name = str(uuid4())
resp = retry(put, obj_name, use_account=3)
body = resp.read()
self.assertEqual(resp.status, 201)
# can delete an object
resp = retry(delete, self.obj, use_account=3)
body = resp.read()
self.assertIn(resp.status, (204, 404))
# sanity with account1
resp = retry(get_listing, use_account=3)
listing = resp.read()
self.assertEqual(resp.status, 200)
self.assertIn(obj_name, listing)
self.assertNotIn(self.obj, listing)
def test_manifest(self):
if tf.skip:
raise SkipTest
# Data for the object segments
segments1 = ['one', 'two', 'three', 'four', 'five']
segments2 = ['six', 'seven', 'eight']
segments3 = ['nine', 'ten', 'eleven']
# Upload the first set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments1/%s' % (
parsed.path, self.container, str(objnum)), segments1[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments1)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
# Upload the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments1/' % self.container,
'Content-Type': 'text/jibberish', 'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# Get the manifest (should get all the segments as the body)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments1))
self.assertEqual(resp.status, 200)
self.assertEqual(resp.getheader('content-type'), 'text/jibberish')
# Get with a range at the start of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=3-'})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments1[1:]))
self.assertEqual(resp.status, 206)
# Get with a range in the middle of the second segment
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=5-'})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments1)[5:])
self.assertEqual(resp.status, 206)
# Get with a full start and stop range
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token, 'Range': 'bytes=5-10'})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments1)[5:11])
self.assertEqual(resp.status, 206)
# Upload the second set of segments
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments2/%s' % (
parsed.path, self.container, str(objnum)), segments2[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments2)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
# Get the manifest (should still be the first segments of course)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments1))
self.assertEqual(resp.status, 200)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '', {
'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments2/' % self.container,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# Get the manifest (should be the second set of segments now)
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
if not tf.skip3:
# Ensure we can't access the manifest with the third account
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEqual(resp.read(), ''.join(segments2))
self.assertEqual(resp.status, 200)
# Create another container for the third set of segments
acontainer = uuid4().hex
def put(url, token, parsed, conn):
conn.request('PUT', parsed.path + '/' + acontainer, '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# Upload the third set of segments in the other container
def put(url, token, parsed, conn, objnum):
conn.request('PUT', '%s/%s/segments3/%s' % (
parsed.path, acontainer, str(objnum)), segments3[objnum],
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments3)):
resp = retry(put, objnum)
resp.read()
self.assertEqual(resp.status, 201)
# Update the manifest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/manifest' % (
parsed.path, self.container), '',
{'X-Auth-Token': token,
'X-Object-Manifest': '%s/segments3/' % acontainer,
'Content-Length': '0'})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
# Get the manifest to ensure it's the third set of segments
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get)
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
if not tf.skip3:
# Ensure we can't access the manifest with the third account
# (because the segments are in a protected container even if the
# manifest itself is not).
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
resp.read()
self.assertEqual(resp.status, 403)
# Grant access to the third account
def post(url, token, parsed, conn):
conn.request('POST', '%s/%s' % (parsed.path, acontainer),
'', {'X-Auth-Token': token,
'X-Container-Read': tf.swift_test_perm[2]})
return check_response(conn)
resp = retry(post)
resp.read()
self.assertEqual(resp.status, 204)
# The third account should be able to get the manifest now
def get(url, token, parsed, conn):
conn.request('GET', '%s/%s/manifest' % (
parsed.path, self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(get, use_account=3)
self.assertEqual(resp.read(), ''.join(segments3))
self.assertEqual(resp.status, 200)
# Delete the manifest
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/manifest' % (
parsed.path,
self.container), '', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete, objnum)
resp.read()
self.assertIn(resp.status, (204, 404))
# Delete the third set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments3/%s' % (
parsed.path, acontainer, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments3)):
resp = retry(delete, objnum)
resp.read()
self.assertIn(resp.status, (204, 404))
# Delete the second set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments2/%s' % (
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments2)):
resp = retry(delete, objnum)
resp.read()
self.assertIn(resp.status, (204, 404))
# Delete the first set of segments
def delete(url, token, parsed, conn, objnum):
conn.request('DELETE', '%s/%s/segments1/%s' % (
parsed.path, self.container, str(objnum)), '',
{'X-Auth-Token': token})
return check_response(conn)
for objnum in range(len(segments1)):
resp = retry(delete, objnum)
resp.read()
self.assertIn(resp.status, (204, 404))
# Delete the extra container
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '',
{'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
def test_delete_content_type(self):
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/hi' % (parsed.path, self.container),
'there', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
'', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertIn(resp.status, (204, 404))
self.assertEqual(resp.getheader('Content-Type'),
'text/html; charset=UTF-8')
def test_delete_if_delete_at_bad(self):
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT',
'%s/%s/hi-delete-bad' % (parsed.path, self.container),
'there', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
resp.read()
self.assertEqual(resp.status, 201)
def delete(url, token, parsed, conn):
conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-If-Delete-At': 'bad'})
return check_response(conn)
resp = retry(delete)
resp.read()
self.assertEqual(resp.status, 400)
def test_null_name(self):
if tf.skip:
raise SkipTest
def put(url, token, parsed, conn):
conn.request('PUT', '%s/%s/abc%%00def' % (
parsed.path,
self.container), 'test', {'X-Auth-Token': token})
return check_response(conn)
resp = retry(put)
if (tf.web_front_end == 'apache2'):
self.assertEqual(resp.status, 404)
else:
self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL')
self.assertEqual(resp.status, 412)
def test_cors(self):
if tf.skip:
raise SkipTest
try:
strict_cors = tf.cluster_info['swift']['strict_cors_mode']
except KeyError:
raise SkipTest("cors mode is unknown")
def put_cors_cont(url, token, parsed, conn, orig):
conn.request(
'PUT', '%s/%s' % (parsed.path, self.container),
'', {'X-Auth-Token': token,
'X-Container-Meta-Access-Control-Allow-Origin': orig})
return check_response(conn)
def put_obj(url, token, parsed, conn, obj):
conn.request(
'PUT', '%s/%s/%s' % (parsed.path, self.container, obj),
'test', {'X-Auth-Token': token})
return check_response(conn)
def check_cors(url, token, parsed, conn,
method, obj, headers):
if method != 'OPTIONS':
headers['X-Auth-Token'] = token
conn.request(
method, '%s/%s/%s' % (parsed.path, self.container, obj),
'', headers)
return conn.getresponse()
resp = retry(put_cors_cont, '*')
resp.read()
self.assertEqual(resp.status // 100, 2)
resp = retry(put_obj, 'cat')
resp.read()
self.assertEqual(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat', {'Origin': 'http://m.com'})
self.assertEqual(resp.status, 401)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
self.assertEqual(resp.status, 200)
resp.read()
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEqual(headers.get('access-control-allow-origin'),
'*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEqual(headers.get('access-control-allow-origin'),
'*')
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com',
'X-Web-Mode': 'True'})
self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEqual(headers.get('access-control-allow-origin'),
'*')
####################
resp = retry(put_cors_cont, 'http://secret.com')
resp.read()
self.assertEqual(resp.status // 100, 2)
resp = retry(check_cors,
'OPTIONS', 'cat',
{'Origin': 'http://m.com',
'Access-Control-Request-Method': 'GET'})
resp.read()
self.assertEqual(resp.status, 401)
if strict_cors:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertNotIn('access-control-allow-origin', headers)
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://secret.com'})
resp.read()
self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEqual(headers.get('access-control-allow-origin'),
'http://secret.com')
else:
resp = retry(check_cors,
'GET', 'cat', {'Origin': 'http://m.com'})
resp.read()
self.assertEqual(resp.status, 200)
headers = dict((k.lower(), v) for k, v in resp.getheaders())
self.assertEqual(headers.get('access-control-allow-origin'),
'http://m.com')
@requires_policies
def test_cross_policy_copy(self):
# create container in first policy
policy = self.policies.select()
container = self._create_container(
headers={'X-Storage-Policy': policy['name']})
obj = uuid4().hex
# create a container in second policy
other_policy = self.policies.exclude(name=policy['name']).select()
other_container = self._create_container(
headers={'X-Storage-Policy': other_policy['name']})
other_obj = uuid4().hex
def put_obj(url, token, parsed, conn, container, obj):
# to keep track of things, use the original path as the body
content = '%s/%s' % (container, obj)
path = '%s/%s' % (parsed.path, content)
conn.request('PUT', path, content, {'X-Auth-Token': token})
return check_response(conn)
# create objects
for c, o in zip((container, other_container), (obj, other_obj)):
resp = retry(put_obj, c, o)
resp.read()
self.assertEqual(resp.status, 201)
def put_copy_from(url, token, parsed, conn, container, obj, source):
dest_path = '%s/%s/%s' % (parsed.path, container, obj)
conn.request('PUT', dest_path, '',
{'X-Auth-Token': token,
'Content-Length': '0',
'X-Copy-From': source})
return check_response(conn)
copy_requests = (
(container, other_obj, '%s/%s' % (other_container, other_obj)),
(other_container, obj, '%s/%s' % (container, obj)),
)
# copy objects
for c, o, source in copy_requests:
resp = retry(put_copy_from, c, o, source)
resp.read()
self.assertEqual(resp.status, 201)
def get_obj(url, token, parsed, conn, container, obj):
path = '%s/%s/%s' % (parsed.path, container, obj)
conn.request('GET', path, '', {'X-Auth-Token': token})
return check_response(conn)
# validate contents, contents should be source
validate_requests = copy_requests
for c, o, body in validate_requests:
resp = retry(get_obj, c, o)
self.assertEqual(resp.status, 200)
self.assertEqual(body, resp.read())
@requires_bulk
def test_bulk_delete(self):
def bulk_delete(url, token, parsed, conn):
# try to bulk delete the object that was created during test setup
conn.request('DELETE', '%s/%s/%s?bulk-delete' % (
parsed.path, self.container, self.obj),
'%s/%s' % (self.container, self.obj),
{'X-Auth-Token': token,
'Accept': 'application/xml',
'Expect': '100-continue',
'Content-Type': 'text/plain'})
return check_response(conn)
resp = retry(bulk_delete)
self.assertEqual(resp.status, 200)
body = resp.read()
tree = minidom.parseString(body)
self.assertEqual(tree.documentElement.tagName, 'delete')
errors = tree.getElementsByTagName('errors')
self.assertEqual(len(errors), 1)
errors = [c.data if c.nodeType == c.TEXT_NODE else c.childNodes[0].data
for c in errors[0].childNodes
if c.nodeType != c.TEXT_NODE or c.data.strip()]
self.assertEqual(errors, [])
final_status = tree.getElementsByTagName('response_status')
self.assertEqual(len(final_status), 1)
self.assertEqual(len(final_status[0].childNodes), 1)
self.assertEqual(final_status[0].childNodes[0].data, '200 OK')
if __name__ == '__main__':
unittest2.main()