#!/usr/bin/python # Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import json import unittest2 from unittest2 import SkipTest from uuid import uuid4 import time from six.moves import range from test.functional import check_response, retry, requires_acls, \ requires_policies import test.functional as tf def setUpModule(): tf.setup_package() def tearDownModule(): tf.teardown_package() class TestObject(unittest2.TestCase): def setUp(self): if tf.skip: raise SkipTest self.container = uuid4().hex self.containers = [] self._create_container(self.container) self._create_container(self.container, use_account=2) self.obj = uuid4().hex def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, self.obj), 'test', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def _create_container(self, name=None, headers=None, use_account=1): if not name: name = uuid4().hex self.containers.append(name) headers = headers or {} def put(url, token, parsed, conn, name): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('PUT', parsed.path + '/' + name, '', new_headers) return check_response(conn) resp = retry(put, name, use_account=use_account) resp.read() self.assertEqual(resp.status, 201) # With keystoneauth we need the accounts to have had the project # domain id persisted as sysmeta prior to testing ACLs. This may # not be the case if, for example, the account was created using # a request with reseller_admin role, when project domain id may # not have been known. So we ensure that the project domain id is # in sysmeta by making a POST to the accounts using an admin role. def post(url, token, parsed, conn): conn.request('POST', parsed.path, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(post, use_account=use_account) resp.read() self.assertEqual(resp.status, 204) return name def tearDown(self): if tf.skip: raise SkipTest # get list of objects in container def get(url, token, parsed, conn, container): conn.request( 'GET', parsed.path + '/' + container + '?format=json', '', {'X-Auth-Token': token}) return check_response(conn) # delete an object def delete(url, token, parsed, conn, container, obj): conn.request( 'DELETE', '/'.join([parsed.path, container, obj['name']]), '', {'X-Auth-Token': token}) return check_response(conn) for container in self.containers: while True: resp = retry(get, container) body = resp.read() if resp.status == 404: break self.assertTrue(resp.status // 100 == 2, resp.status) objs = json.loads(body) if not objs: break for obj in objs: resp = retry(delete, container, obj) resp.read() self.assertIn(resp.status, (204, 404)) # delete the container def delete(url, token, parsed, conn, name): conn.request('DELETE', parsed.path + '/' + name, '', {'X-Auth-Token': token}) return check_response(conn) for container in self.containers: resp = retry(delete, container) resp.read() self.assertIn(resp.status, (204, 404)) def test_if_none_match(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, 'if_none_match_test'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'If-None-Match': '*'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) resp = retry(put) resp.read() self.assertEqual(resp.status, 412) def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, 'if_none_match_test'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'If-None-Match': 'somethingelse'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 400) def test_too_small_x_timestamp(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'too_small_x_timestamp'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Timestamp': '-1'}) return check_response(conn) def head(url, token, parsed, conn): conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container, 'too_small_x_timestamp'), '', {'X-Auth-Token': token, 'Content-Length': '0'}) return check_response(conn) ts_before = time.time() resp = retry(put) body = resp.read() ts_after = time.time() if resp.status == 400: # shunt_inbound_x_timestamp must be false self.assertIn( 'X-Timestamp should be a UNIX timestamp float value', body) else: self.assertEqual(resp.status, 201) self.assertEqual(body, '') resp = retry(head) resp.read() self.assertGreater(float(resp.headers['x-timestamp']), ts_before) self.assertLess(float(resp.headers['x-timestamp']), ts_after) def test_too_big_x_timestamp(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'too_big_x_timestamp'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Timestamp': '99999999999.9999999999'}) return check_response(conn) def head(url, token, parsed, conn): conn.request('HEAD', '%s/%s/%s' % (parsed.path, self.container, 'too_big_x_timestamp'), '', {'X-Auth-Token': token, 'Content-Length': '0'}) return check_response(conn) ts_before = time.time() resp = retry(put) body = resp.read() ts_after = time.time() if resp.status == 400: # shunt_inbound_x_timestamp must be false self.assertIn( 'X-Timestamp should be a UNIX timestamp float value', body) else: self.assertEqual(resp.status, 201) self.assertEqual(body, '') resp = retry(head) resp.read() self.assertGreater(float(resp.headers['x-timestamp']), ts_before) self.assertLess(float(resp.headers['x-timestamp']), ts_after) def test_x_delete_after(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Delete-After': '1'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def get(url, token, parsed, conn): conn.request( 'GET', '%s/%s/%s' % (parsed.path, self.container, 'x_delete_after'), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) resp.read() count = 0 while resp.status == 200 and count < 10: resp = retry(get) resp.read() count += 1 time.sleep(1) self.assertEqual(resp.status, 404) # To avoid an error when the object deletion in tearDown(), # the object is added again. resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def test_x_delete_at(self): def put(url, token, parsed, conn): dt = datetime.datetime.now() epoch = time.mktime(dt.timetuple()) delete_time = str(int(epoch) + 3) conn.request( 'PUT', '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Delete-At': delete_time}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def get(url, token, parsed, conn): conn.request( 'GET', '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at'), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) resp.read() count = 0 while resp.status == 200 and count < 10: resp = retry(get) resp.read() count += 1 time.sleep(1) self.assertEqual(resp.status, 404) # To avoid an error when the object deletion in tearDown(), # the object is added again. resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def test_non_integer_x_delete_after(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'non_integer_x_delete_after'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Delete-After': '*'}) return check_response(conn) resp = retry(put) body = resp.read() self.assertEqual(resp.status, 400) self.assertEqual(body, 'Non-integer X-Delete-After') def test_non_integer_x_delete_at(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'non_integer_x_delete_at'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Delete-At': '*'}) return check_response(conn) resp = retry(put) body = resp.read() self.assertEqual(resp.status, 400) self.assertEqual(body, 'Non-integer X-Delete-At') def test_x_delete_at_in_the_past(self): def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % (parsed.path, self.container, 'x_delete_at_in_the_past'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Delete-At': '0'}) return check_response(conn) resp = retry(put) body = resp.read() self.assertEqual(resp.status, 400) self.assertEqual(body, 'X-Delete-At in past') def test_copy_object(self): if tf.skip: raise SkipTest source = '%s/%s' % (self.container, self.obj) dest = '%s/%s' % (self.container, 'test_copy') # get contents of source def get_source(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, source), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get_source) source_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(source_contents, 'test') # copy source to dest with X-Copy-From def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Copy-From': source}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # contents of dest should be the same as source def get_dest(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get_dest) dest_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents) # delete the copy def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) # verify dest does not exist resp = retry(get_dest) resp.read() self.assertEqual(resp.status, 404) # copy source to dest with COPY def copy(url, token, parsed, conn): conn.request('COPY', '%s/%s' % (parsed.path, source), '', {'X-Auth-Token': token, 'Destination': dest}) return check_response(conn) resp = retry(copy) resp.read() self.assertEqual(resp.status, 201) # contents of dest should be the same as source resp = retry(get_dest) dest_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents) # copy source to dest with COPY and range def copy(url, token, parsed, conn): conn.request('COPY', '%s/%s' % (parsed.path, source), '', {'X-Auth-Token': token, 'Destination': dest, 'Range': 'bytes=1-2'}) return check_response(conn) resp = retry(copy) resp.read() self.assertEqual(resp.status, 201) # contents of dest should be the same as source resp = retry(get_dest) dest_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents[1:3]) # delete the copy resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) def test_copy_between_accounts(self): if tf.skip: raise SkipTest source = '%s/%s' % (self.container, self.obj) dest = '%s/%s' % (self.container, 'test_copy') # get contents of source def get_source(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, source), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get_source) source_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(source_contents, 'test') acct = tf.parsed[0].path.split('/', 2)[2] # copy source to dest with X-Copy-From-Account def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Copy-From-Account': acct, 'X-Copy-From': source}) return check_response(conn) # try to put, will not succeed # user does not have permissions to read from source resp = retry(put, use_account=2) self.assertEqual(resp.status, 403) # add acl to allow reading from source def post(url, token, parsed, conn): conn.request('POST', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[1]}) return check_response(conn) resp = retry(post) self.assertEqual(resp.status, 204) # retry previous put, now should succeed resp = retry(put, use_account=2) self.assertEqual(resp.status, 201) # contents of dest should be the same as source def get_dest(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get_dest, use_account=2) dest_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents) # delete the copy def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s' % (parsed.path, dest), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete, use_account=2) resp.read() self.assertIn(resp.status, (204, 404)) # verify dest does not exist resp = retry(get_dest, use_account=2) resp.read() self.assertEqual(resp.status, 404) acct_dest = tf.parsed[1].path.split('/', 2)[2] # copy source to dest with COPY def copy(url, token, parsed, conn): conn.request('COPY', '%s/%s' % (parsed.path, source), '', {'X-Auth-Token': token, 'Destination-Account': acct_dest, 'Destination': dest}) return check_response(conn) # try to copy, will not succeed # user does not have permissions to write to destination resp = retry(copy) resp.read() self.assertEqual(resp.status, 403) # add acl to allow write to destination def post(url, token, parsed, conn): conn.request('POST', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token, 'X-Container-Write': tf.swift_test_perm[0]}) return check_response(conn) resp = retry(post, use_account=2) self.assertEqual(resp.status, 204) # now copy will succeed resp = retry(copy) resp.read() self.assertEqual(resp.status, 201) # contents of dest should be the same as source resp = retry(get_dest, use_account=2) dest_contents = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(dest_contents, source_contents) # delete the copy resp = retry(delete, use_account=2) resp.read() self.assertIn(resp.status, (204, 404)) def test_public_object(self): if tf.skip: raise SkipTest def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % (parsed.path, self.container, self.obj)) return check_response(conn) try: resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: self.assertTrue(str(err).startswith('No result after ')) def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, 'X-Container-Read': '.r:*'}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) resp = retry(get) resp.read() self.assertEqual(resp.status, 200) def post(url, token, parsed, conn): conn.request('POST', parsed.path + '/' + self.container, '', {'X-Auth-Token': token, 'X-Container-Read': ''}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) try: resp = retry(get) raise Exception('Should not have been able to GET') except Exception as err: self.assertTrue(str(err).startswith('No result after ')) def test_private_object(self): if tf.skip or tf.skip3: raise SkipTest # Ensure we can't access the object with the third account def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % ( parsed.path, self.container, self.obj), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) resp.read() self.assertEqual(resp.status, 403) # create a shared container writable by account3 shared_container = uuid4().hex def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % ( parsed.path, shared_container), '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[2], 'X-Container-Write': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # verify third account can not copy from private container def copy(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, shared_container, 'private_object'), '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Copy-From': '%s/%s' % (self.container, self.obj)}) return check_response(conn) resp = retry(copy, use_account=3) resp.read() self.assertEqual(resp.status, 403) # verify third account can write "obj1" to shared container def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), 'test', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put, use_account=3) resp.read() self.assertEqual(resp.status, 201) # verify third account can copy "obj1" to shared container def copy2(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), '', {'X-Auth-Token': token, 'Destination': '%s/%s' % (shared_container, 'obj1')}) return check_response(conn) resp = retry(copy2, use_account=3) resp.read() self.assertEqual(resp.status, 201) # verify third account STILL can not copy from private container def copy3(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % ( parsed.path, self.container, self.obj), '', {'X-Auth-Token': token, 'Destination': '%s/%s' % (shared_container, 'private_object')}) return check_response(conn) resp = retry(copy3, use_account=3) resp.read() self.assertEqual(resp.status, 403) # clean up "obj1" def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) # clean up shared_container def delete(url, token, parsed, conn): conn.request('DELETE', parsed.path + '/' + shared_container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) def test_container_write_only(self): if tf.skip or tf.skip3: raise SkipTest # Ensure we can't access the object with the third account def get(url, token, parsed, conn): conn.request('GET', '%s/%s/%s' % ( parsed.path, self.container, self.obj), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) resp.read() self.assertEqual(resp.status, 403) # create a shared container writable (but not readable) by account3 shared_container = uuid4().hex def put(url, token, parsed, conn): conn.request('PUT', '%s/%s' % ( parsed.path, shared_container), '', {'X-Auth-Token': token, 'X-Container-Write': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # verify third account can write "obj1" to shared container def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), 'test', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put, use_account=3) resp.read() self.assertEqual(resp.status, 201) # verify third account cannot copy "obj1" to shared container def copy(url, token, parsed, conn): conn.request('COPY', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), '', {'X-Auth-Token': token, 'Destination': '%s/%s' % (shared_container, 'obj2')}) return check_response(conn) resp = retry(copy, use_account=3) resp.read() self.assertEqual(resp.status, 403) # verify third account can POST to "obj1" in shared container def post(url, token, parsed, conn): conn.request('POST', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), '', {'X-Auth-Token': token, 'X-Object-Meta-Color': 'blue'}) return check_response(conn) resp = retry(post, use_account=3) resp.read() self.assertEqual(resp.status, 202) # verify third account can DELETE from shared container def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/%s' % ( parsed.path, shared_container, 'obj1'), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete, use_account=3) resp.read() self.assertIn(resp.status, (204, 404)) # clean up shared_container def delete(url, token, parsed, conn): conn.request('DELETE', parsed.path + '/' + shared_container, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) @requires_acls def test_read_only(self): if tf.skip3: raise tf.SkipTest def get_listing(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def get(url, token, parsed, conn, name): conn.request('GET', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, name), 'test', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) # cannot list objects resp = retry(get_listing, use_account=3) resp.read() self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() self.assertEqual(resp.status, 403) # grant read-only access acl_user = tf.swift_test_user[2] acl = {'read-only': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(body, 'test') # can not put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() self.assertEqual(resp.status, 403) # can not delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() self.assertEqual(resp.status, 403) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertNotIn(obj_name, listing) self.assertIn(self.obj, listing) @requires_acls def test_read_write(self): if tf.skip3: raise SkipTest def get_listing(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def get(url, token, parsed, conn, name): conn.request('GET', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, name), 'test', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) # cannot list objects resp = retry(get_listing, use_account=3) resp.read() self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() self.assertEqual(resp.status, 403) # grant read-write access acl_user = tf.swift_test_user[2] acl = {'read-write': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(body, 'test') # can put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() self.assertEqual(resp.status, 201) # can delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() self.assertIn(resp.status, (204, 404)) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertIn(obj_name, listing) self.assertNotIn(self.obj, listing) @requires_acls def test_admin(self): if tf.skip3: raise SkipTest def get_listing(url, token, parsed, conn): conn.request('GET', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) def post_account(url, token, parsed, conn, headers): new_headers = dict({'X-Auth-Token': token}, **headers) conn.request('POST', parsed.path, '', new_headers) return check_response(conn) def get(url, token, parsed, conn, name): conn.request('GET', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) def put(url, token, parsed, conn, name): conn.request('PUT', '%s/%s/%s' % ( parsed.path, self.container, name), 'test', {'X-Auth-Token': token}) return check_response(conn) def delete(url, token, parsed, conn, name): conn.request('DELETE', '%s/%s/%s' % ( parsed.path, self.container, name), '', {'X-Auth-Token': token}) return check_response(conn) # cannot list objects resp = retry(get_listing, use_account=3) resp.read() self.assertEqual(resp.status, 403) # cannot get object resp = retry(get, self.obj, use_account=3) resp.read() self.assertEqual(resp.status, 403) # grant admin access acl_user = tf.swift_test_user[2] acl = {'admin': [acl_user]} headers = {'x-account-access-control': json.dumps(acl)} resp = retry(post_account, headers=headers, use_account=1) resp.read() self.assertEqual(resp.status, 204) # can list objects resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertIn(self.obj, listing) # can get object resp = retry(get, self.obj, use_account=3) body = resp.read() self.assertEqual(resp.status, 200) self.assertEqual(body, 'test') # can put an object obj_name = str(uuid4()) resp = retry(put, obj_name, use_account=3) body = resp.read() self.assertEqual(resp.status, 201) # can delete an object resp = retry(delete, self.obj, use_account=3) body = resp.read() self.assertIn(resp.status, (204, 404)) # sanity with account1 resp = retry(get_listing, use_account=3) listing = resp.read() self.assertEqual(resp.status, 200) self.assertIn(obj_name, listing) self.assertNotIn(self.obj, listing) def test_manifest(self): if tf.skip: raise SkipTest # Data for the object segments segments1 = ['one', 'two', 'three', 'four', 'five'] segments2 = ['six', 'seven', 'eight'] segments3 = ['nine', 'ten', 'eleven'] # Upload the first set of segments def put(url, token, parsed, conn, objnum): conn.request('PUT', '%s/%s/segments1/%s' % ( parsed.path, self.container, str(objnum)), segments1[objnum], {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments1)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) # Upload the manifest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/manifest' % ( parsed.path, self.container), '', { 'X-Auth-Token': token, 'X-Object-Manifest': '%s/segments1/' % self.container, 'Content-Type': 'text/jibberish', 'Content-Length': '0'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # Get the manifest (should get all the segments as the body) def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments1)) self.assertEqual(resp.status, 200) self.assertEqual(resp.getheader('content-type'), 'text/jibberish') # Get with a range at the start of the second segment def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', { 'X-Auth-Token': token, 'Range': 'bytes=3-'}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments1[1:])) self.assertEqual(resp.status, 206) # Get with a range in the middle of the second segment def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', { 'X-Auth-Token': token, 'Range': 'bytes=5-'}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments1)[5:]) self.assertEqual(resp.status, 206) # Get with a full start and stop range def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', { 'X-Auth-Token': token, 'Range': 'bytes=5-10'}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments1)[5:11]) self.assertEqual(resp.status, 206) # Upload the second set of segments def put(url, token, parsed, conn, objnum): conn.request('PUT', '%s/%s/segments2/%s' % ( parsed.path, self.container, str(objnum)), segments2[objnum], {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments2)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) # Get the manifest (should still be the first segments of course) def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments1)) self.assertEqual(resp.status, 200) # Update the manifest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/manifest' % ( parsed.path, self.container), '', { 'X-Auth-Token': token, 'X-Object-Manifest': '%s/segments2/' % self.container, 'Content-Length': '0'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # Get the manifest (should be the second set of segments now) def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments2)) self.assertEqual(resp.status, 200) if not tf.skip3: # Ensure we can't access the manifest with the third account def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) resp.read() self.assertEqual(resp.status, 403) # Grant access to the third account def post(url, token, parsed, conn): conn.request('POST', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # The third account should be able to get the manifest now def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) self.assertEqual(resp.read(), ''.join(segments2)) self.assertEqual(resp.status, 200) # Create another container for the third set of segments acontainer = uuid4().hex def put(url, token, parsed, conn): conn.request('PUT', parsed.path + '/' + acontainer, '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # Upload the third set of segments in the other container def put(url, token, parsed, conn, objnum): conn.request('PUT', '%s/%s/segments3/%s' % ( parsed.path, acontainer, str(objnum)), segments3[objnum], {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments3)): resp = retry(put, objnum) resp.read() self.assertEqual(resp.status, 201) # Update the manifest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token, 'X-Object-Manifest': '%s/segments3/' % acontainer, 'Content-Length': '0'}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) # Get the manifest to ensure it's the third set of segments def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get) self.assertEqual(resp.read(), ''.join(segments3)) self.assertEqual(resp.status, 200) if not tf.skip3: # Ensure we can't access the manifest with the third account # (because the segments are in a protected container even if the # manifest itself is not). def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) resp.read() self.assertEqual(resp.status, 403) # Grant access to the third account def post(url, token, parsed, conn): conn.request('POST', '%s/%s' % (parsed.path, acontainer), '', {'X-Auth-Token': token, 'X-Container-Read': tf.swift_test_perm[2]}) return check_response(conn) resp = retry(post) resp.read() self.assertEqual(resp.status, 204) # The third account should be able to get the manifest now def get(url, token, parsed, conn): conn.request('GET', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(get, use_account=3) self.assertEqual(resp.read(), ''.join(segments3)) self.assertEqual(resp.status, 200) # Delete the manifest def delete(url, token, parsed, conn, objnum): conn.request('DELETE', '%s/%s/manifest' % ( parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete, objnum) resp.read() self.assertIn(resp.status, (204, 404)) # Delete the third set of segments def delete(url, token, parsed, conn, objnum): conn.request('DELETE', '%s/%s/segments3/%s' % ( parsed.path, acontainer, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments3)): resp = retry(delete, objnum) resp.read() self.assertIn(resp.status, (204, 404)) # Delete the second set of segments def delete(url, token, parsed, conn, objnum): conn.request('DELETE', '%s/%s/segments2/%s' % ( parsed.path, self.container, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments2)): resp = retry(delete, objnum) resp.read() self.assertIn(resp.status, (204, 404)) # Delete the first set of segments def delete(url, token, parsed, conn, objnum): conn.request('DELETE', '%s/%s/segments1/%s' % ( parsed.path, self.container, str(objnum)), '', {'X-Auth-Token': token}) return check_response(conn) for objnum in range(len(segments1)): resp = retry(delete, objnum) resp.read() self.assertIn(resp.status, (204, 404)) # Delete the extra container def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s' % (parsed.path, acontainer), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) def test_delete_content_type(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/hi' % (parsed.path, self.container), 'there', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container), '', {'X-Auth-Token': token}) return check_response(conn) resp = retry(delete) resp.read() self.assertIn(resp.status, (204, 404)) self.assertEqual(resp.getheader('Content-Type'), 'text/html; charset=UTF-8') def test_delete_if_delete_at_bad(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/hi-delete-bad' % (parsed.path, self.container), 'there', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) resp.read() self.assertEqual(resp.status, 201) def delete(url, token, parsed, conn): conn.request('DELETE', '%s/%s/hi' % (parsed.path, self.container), '', {'X-Auth-Token': token, 'X-If-Delete-At': 'bad'}) return check_response(conn) resp = retry(delete) resp.read() self.assertEqual(resp.status, 400) def test_null_name(self): if tf.skip: raise SkipTest def put(url, token, parsed, conn): conn.request('PUT', '%s/%s/abc%%00def' % ( parsed.path, self.container), 'test', {'X-Auth-Token': token}) return check_response(conn) resp = retry(put) if (tf.web_front_end == 'apache2'): self.assertEqual(resp.status, 404) else: self.assertEqual(resp.read(), 'Invalid UTF8 or contains NULL') self.assertEqual(resp.status, 412) def test_cors(self): if tf.skip: raise SkipTest try: strict_cors = tf.cluster_info['swift']['strict_cors_mode'] except KeyError: raise SkipTest("cors mode is unknown") def put_cors_cont(url, token, parsed, conn, orig): conn.request( 'PUT', '%s/%s' % (parsed.path, self.container), '', {'X-Auth-Token': token, 'X-Container-Meta-Access-Control-Allow-Origin': orig}) return check_response(conn) def put_obj(url, token, parsed, conn, obj): conn.request( 'PUT', '%s/%s/%s' % (parsed.path, self.container, obj), 'test', {'X-Auth-Token': token}) return check_response(conn) def check_cors(url, token, parsed, conn, method, obj, headers): if method != 'OPTIONS': headers['X-Auth-Token'] = token conn.request( method, '%s/%s/%s' % (parsed.path, self.container, obj), '', headers) return conn.getresponse() resp = retry(put_cors_cont, '*') resp.read() self.assertEqual(resp.status // 100, 2) resp = retry(put_obj, 'cat') resp.read() self.assertEqual(resp.status // 100, 2) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com'}) self.assertEqual(resp.status, 401) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com', 'Access-Control-Request-Method': 'GET'}) self.assertEqual(resp.status, 200) resp.read() headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEqual(headers.get('access-control-allow-origin'), '*') resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEqual(headers.get('access-control-allow-origin'), '*') resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com', 'X-Web-Mode': 'True'}) self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEqual(headers.get('access-control-allow-origin'), '*') #################### resp = retry(put_cors_cont, 'http://secret.com') resp.read() self.assertEqual(resp.status // 100, 2) resp = retry(check_cors, 'OPTIONS', 'cat', {'Origin': 'http://m.com', 'Access-Control-Request-Method': 'GET'}) resp.read() self.assertEqual(resp.status, 401) if strict_cors: resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) resp.read() self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertNotIn('access-control-allow-origin', headers) resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://secret.com'}) resp.read() self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEqual(headers.get('access-control-allow-origin'), 'http://secret.com') else: resp = retry(check_cors, 'GET', 'cat', {'Origin': 'http://m.com'}) resp.read() self.assertEqual(resp.status, 200) headers = dict((k.lower(), v) for k, v in resp.getheaders()) self.assertEqual(headers.get('access-control-allow-origin'), 'http://m.com') @requires_policies def test_cross_policy_copy(self): # create container in first policy policy = self.policies.select() container = self._create_container( headers={'X-Storage-Policy': policy['name']}) obj = uuid4().hex # create a container in second policy other_policy = self.policies.exclude(name=policy['name']).select() other_container = self._create_container( headers={'X-Storage-Policy': other_policy['name']}) other_obj = uuid4().hex def put_obj(url, token, parsed, conn, container, obj): # to keep track of things, use the original path as the body content = '%s/%s' % (container, obj) path = '%s/%s' % (parsed.path, content) conn.request('PUT', path, content, {'X-Auth-Token': token}) return check_response(conn) # create objects for c, o in zip((container, other_container), (obj, other_obj)): resp = retry(put_obj, c, o) resp.read() self.assertEqual(resp.status, 201) def put_copy_from(url, token, parsed, conn, container, obj, source): dest_path = '%s/%s/%s' % (parsed.path, container, obj) conn.request('PUT', dest_path, '', {'X-Auth-Token': token, 'Content-Length': '0', 'X-Copy-From': source}) return check_response(conn) copy_requests = ( (container, other_obj, '%s/%s' % (other_container, other_obj)), (other_container, obj, '%s/%s' % (container, obj)), ) # copy objects for c, o, source in copy_requests: resp = retry(put_copy_from, c, o, source) resp.read() self.assertEqual(resp.status, 201) def get_obj(url, token, parsed, conn, container, obj): path = '%s/%s/%s' % (parsed.path, container, obj) conn.request('GET', path, '', {'X-Auth-Token': token}) return check_response(conn) # validate contents, contents should be source validate_requests = copy_requests for c, o, body in validate_requests: resp = retry(get_obj, c, o) self.assertEqual(resp.status, 200) self.assertEqual(body, resp.read()) if __name__ == '__main__': unittest2.main()