swift/test/functional/test_slo.py
Alistair Coles 639251d4c5 slo: add more functional tests for copying
Add functional tests for copying from an SLO using PUT requests with
an x-copy-from header.

Add functional tests for copying from an SLO using part-number parameter.

Change-Id: Ic40c4b104b11de9ab09485e82543521b37ea1daa
2024-11-04 11:21:07 +00:00

1712 lines
72 KiB
Python

#!/usr/bin/python -u
# Copyright (c) 2010-2016 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import email.parser
import itertools
import json
import time
from copy import deepcopy
from unittest import SkipTest
import six
from six.moves import urllib
from swift.common.swob import normalize_etag
from swift.common.utils import md5, config_true_value
from test.functional import check_response, retry
import test.functional as tf
from test.functional import cluster_info
from test.functional.tests import Utils, Base, Base2, BaseEnv
from test.functional.swift_test_client import Connection, ResponseError
def setUpModule():
tf.setup_package()
def tearDownModule():
tf.teardown_package()
def group_file_contents(file_contents):
# This looks a little funny, but iterating through a byte string on py3
# yields a sequence of ints, not a sequence of single-byte byte strings
# as it did on py2.
byte_iter = (file_contents[i:i + 1] for i in range(len(file_contents)))
return [
(char, sum(1 for _ in grp))
for char, grp in itertools.groupby(byte_iter)]
def md5hex(s):
if not isinstance(s, bytes):
s = s.encode('ascii')
return md5(s, usedforsecurity=False).hexdigest()
class TestSloEnv(BaseEnv):
slo_enabled = None # tri-state: None initially, then True/False
@classmethod
def create_segments(cls, container):
seg_info = {}
for letter, size in (('a', 1024 * 1024),
('b', 1024 * 1024),
('c', 1024 * 1024),
('d', 1024 * 1024),
('e', 1)):
seg_name = "seg_%s" % letter
file_item = container.file(seg_name)
file_item.write(letter.encode('ascii') * size)
seg_info[seg_name] = {
'size_bytes': size,
'etag': file_item.md5,
'path': '/%s/%s' % (container.name, seg_name)}
return seg_info
@classmethod
def setUp(cls):
if cls.slo_enabled is None:
cls.slo_enabled = 'slo' in cluster_info
if not cls.slo_enabled:
return
super(TestSloEnv, cls).setUp()
if not tf.skip2:
config2 = deepcopy(tf.config)
config2['account'] = tf.config['account2']
config2['username'] = tf.config['username2']
config2['password'] = tf.config['password2']
cls.conn2 = Connection(config2)
cls.conn2.authenticate()
cls.account2 = cls.conn2.get_account()
cls.account2.delete_containers()
if not tf.skip3:
config3 = tf.config.copy()
config3['username'] = tf.config['username3']
config3['password'] = tf.config['password3']
cls.conn3 = Connection(config3)
cls.conn3.authenticate()
cls.container = cls.account.container(Utils.create_name())
cls.container2 = cls.account.container(Utils.create_name())
for cont in (cls.container, cls.container2):
if not cont.create():
raise ResponseError(cls.conn.response)
cls.seg_info = seg_info = cls.create_segments(cls.container)
file_item = cls.container.file("manifest-abcde")
file_item.write(
json.dumps([seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]).encode('ascii'),
parms={'multipart-manifest': 'put'})
cls.container.file('seg_with_%ff_funky_name').write(b'z' * 10)
# Put the same manifest in the container2
file_item = cls.container2.file("manifest-abcde")
file_item.write(
json.dumps([seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]).encode('ascii'),
parms={'multipart-manifest': 'put'})
file_item = cls.container.file('manifest-cd')
cd_json = json.dumps([
seg_info['seg_c'], seg_info['seg_d']]).encode('ascii')
file_item.write(cd_json, parms={'multipart-manifest': 'put'})
cd_etag = md5((
seg_info['seg_c']['etag'] + seg_info['seg_d']['etag']
).encode('ascii'), usedforsecurity=False).hexdigest()
file_item = cls.container.file("manifest-bcd-submanifest")
file_item.write(
json.dumps([seg_info['seg_b'],
{'etag': cd_etag,
'size_bytes': (seg_info['seg_c']['size_bytes'] +
seg_info['seg_d']['size_bytes']),
'path': '/%s/%s' % (cls.container.name,
'manifest-cd')}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
bcd_submanifest_etag = md5((
seg_info['seg_b']['etag'] + cd_etag).encode('ascii'),
usedforsecurity=False).hexdigest()
file_item = cls.container.file("manifest-abcde-submanifest")
file_item.write(
json.dumps([
seg_info['seg_a'],
{'etag': bcd_submanifest_etag,
'size_bytes': (seg_info['seg_b']['size_bytes'] +
seg_info['seg_c']['size_bytes'] +
seg_info['seg_d']['size_bytes']),
'path': '/%s/%s' % (cls.container.name,
'manifest-bcd-submanifest')},
seg_info['seg_e']]).encode('ascii'),
parms={'multipart-manifest': 'put'})
abcde_submanifest_etag = md5((
seg_info['seg_a']['etag'] + bcd_submanifest_etag +
seg_info['seg_e']['etag']).encode('ascii'),
usedforsecurity=False).hexdigest()
abcde_submanifest_size = (seg_info['seg_a']['size_bytes'] +
seg_info['seg_b']['size_bytes'] +
seg_info['seg_c']['size_bytes'] +
seg_info['seg_d']['size_bytes'] +
seg_info['seg_e']['size_bytes'])
file_item = cls.container.file("ranged-manifest")
file_item.write(
json.dumps([
{'etag': abcde_submanifest_etag,
'size_bytes': abcde_submanifest_size,
'path': '/%s/%s' % (cls.container.name,
'manifest-abcde-submanifest'),
'range': '-1048578'}, # 'c' + ('d' * 2**20) + 'e'
{'etag': abcde_submanifest_etag,
'size_bytes': abcde_submanifest_size,
'path': '/%s/%s' % (cls.container.name,
'manifest-abcde-submanifest'),
'range': '524288-1572863'}, # 'a' * 2**19 + 'b' * 2**19
{'etag': abcde_submanifest_etag,
'size_bytes': abcde_submanifest_size,
'path': '/%s/%s' % (cls.container.name,
'manifest-abcde-submanifest'),
'range': '3145727-3145728'}]).encode('ascii'), # 'cd'
parms={'multipart-manifest': 'put'})
ranged_manifest_etag = md5((
abcde_submanifest_etag + ':3145727-4194304;' +
abcde_submanifest_etag + ':524288-1572863;' +
abcde_submanifest_etag + ':3145727-3145728;'
).encode('ascii'), usedforsecurity=False).hexdigest()
ranged_manifest_size = 2 * 1024 * 1024 + 4
file_item = cls.container.file("ranged-submanifest")
file_item.write(
json.dumps([
seg_info['seg_c'],
{'etag': ranged_manifest_etag,
'size_bytes': ranged_manifest_size,
'path': '/%s/%s' % (cls.container.name,
'ranged-manifest')},
{'etag': ranged_manifest_etag,
'size_bytes': ranged_manifest_size,
'path': '/%s/%s' % (cls.container.name,
'ranged-manifest'),
'range': '524289-1572865'},
{'etag': ranged_manifest_etag,
'size_bytes': ranged_manifest_size,
'path': '/%s/%s' % (cls.container.name,
'ranged-manifest'),
'range': '-3'}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
file_item = cls.container.file("manifest-db")
file_item.write(
json.dumps([
{'path': seg_info['seg_d']['path'], 'etag': None,
'size_bytes': None},
{'path': seg_info['seg_b']['path'], 'etag': None,
'size_bytes': None},
]).encode('ascii'), parms={'multipart-manifest': 'put'})
file_item = cls.container.file("ranged-manifest-repeated-segment")
file_item.write(
json.dumps([
{'path': seg_info['seg_a']['path'], 'etag': None,
'size_bytes': None, 'range': '-1048578'},
{'path': seg_info['seg_a']['path'], 'etag': None,
'size_bytes': None},
{'path': seg_info['seg_b']['path'], 'etag': None,
'size_bytes': None, 'range': '-1048578'},
]).encode('ascii'), parms={'multipart-manifest': 'put'})
file_item = cls.container.file("mixed-object-data-manifest")
file_item.write(
json.dumps([
{'data': base64.b64encode(b'APRE' * 8).decode('ascii')},
{'path': seg_info['seg_a']['path']},
{'data': base64.b64encode(b'APOS' * 16).decode('ascii')},
{'path': seg_info['seg_b']['path']},
{'data': base64.b64encode(b'BPOS' * 32).decode('ascii')},
{'data': base64.b64encode(b'CPRE' * 64).decode('ascii')},
{'path': seg_info['seg_c']['path']},
{'data': base64.b64encode(b'CPOS' * 8).decode('ascii')},
]).encode('ascii'), parms={'multipart-manifest': 'put'}
)
file_item = cls.container.file("nested-data-manifest")
file_item.write(
json.dumps([
{'path': '%s/%s' % (cls.container.name,
"mixed-object-data-manifest")}
]).encode('ascii'), parms={'multipart-manifest': 'put'}
)
class TestSlo(Base):
env = TestSloEnv
def setUp(self):
super(TestSlo, self).setUp()
if self.env.slo_enabled is False:
raise SkipTest("SLO not enabled")
elif self.env.slo_enabled is not True:
# just some sanity checking
raise Exception(
"Expected slo_enabled to be True/False, got %r" %
(self.env.slo_enabled,))
manifest_abcde_hash = md5(usedforsecurity=False)
for letter in (b'a', b'b', b'c', b'd'):
manifest_abcde_hash.update(
md5(letter * 1024 * 1024, usedforsecurity=False)
.hexdigest().encode('ascii'))
manifest_abcde_hash.update(
md5(b'e', usedforsecurity=False).hexdigest().encode('ascii'))
self.manifest_abcde_etag = manifest_abcde_hash.hexdigest()
def test_slo_get_simple_manifest(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read()
self.assertEqual(file_item.conn.response.status, 200)
headers = dict(
(h.lower(), v)
for h, v in file_item.conn.response.getheaders())
self.assertIn('etag', headers)
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
self.assertEqual([
(b'a', 1024 * 1024),
(b'b', 1024 * 1024),
(b'c', 1024 * 1024),
(b'd', 1024 * 1024),
(b'e', 1),
], group_file_contents(file_contents))
def test_slo_multipart_delete_part_number_ignored(self):
# create a container just for this test because we're going to delete
# objects that we create
container = self.env.account.container(Utils.create_name())
self.assertTrue(container.create())
# create segments in same container
seg_info = self.env.create_segments(container)
file_item = container.file("manifest-abcde")
self.assertTrue(file_item.write(
json.dumps([seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]).encode('ascii'),
parms={'multipart-manifest': 'put'}))
# sanity check, we have SLO...
file_item.initialize(parms={'part-number': '5'})
self.assertEqual(
file_item.conn.response.getheader('X-Static-Large-Object'), 'True')
self.assertEqual(
file_item.conn.response.getheader('X-Parts-Count'), '5')
self.assertEqual(6, len(container.files()))
# part-number should be ignored
status = file_item.conn.make_request(
'DELETE', file_item.path,
parms={'multipart-manifest': 'delete',
'part-number': '2'})
self.assertEqual(200, status)
# everything is gone
self.assertFalse(container.files())
def test_get_head_part_number_invalid(self):
file_item = self.env.container.file('manifest-abcde')
file_item.initialize()
ok_resp = file_item.conn.response
self.assertEqual(ok_resp.getheader('X-Static-Large-Object'), 'True')
self.assertEqual(ok_resp.getheader('Etag'), file_item.etag)
self.assertEqual(ok_resp.getheader('Content-Length'),
str(file_item.size))
# part-number is 1-indexed
self.assertRaises(ResponseError, file_item.read,
parms={'part-number': '0'})
resp_body = file_item.conn.response.read()
self.assertEqual(400, file_item.conn.response.status, resp_body)
self.assertEqual(b'Part number must be an integer greater than 0',
resp_body)
self.assertRaises(ResponseError, file_item.initialize,
parms={'part-number': '0'})
resp_body = file_item.conn.response.read()
self.assertEqual(400, file_item.conn.response.status)
self.assertEqual(b'', resp_body)
def test_get_head_part_number_out_of_range(self):
file_item = self.env.container.file('manifest-abcde')
file_item.initialize()
ok_resp = file_item.conn.response
self.assertEqual(ok_resp.getheader('X-Static-Large-Object'), 'True')
self.assertEqual(ok_resp.getheader('Etag'), file_item.etag)
self.assertEqual(ok_resp.getheader('Content-Length'),
str(file_item.size))
manifest_etag = ok_resp.getheader('Manifest-Etag')
def check_headers(resp):
self.assertEqual(resp.getheader('X-Static-Large-Object'), 'True')
self.assertEqual(resp.getheader('Etag'), file_item.etag)
self.assertEqual(resp.getheader('Manifest-Etag'), manifest_etag)
self.assertEqual(resp.getheader('X-Parts-Count'), '5')
self.assertEqual(resp.getheader('Content-Range'),
'bytes */%s' % file_item.size)
self.assertRaises(ResponseError, file_item.read,
parms={'part-number': '10001'})
resp_body = file_item.conn.response.read()
self.assertEqual(416, file_item.conn.response.status, resp_body)
self.assertEqual(b'The requested part number is not satisfiable',
resp_body)
check_headers(file_item.conn.response)
self.assertEqual(file_item.conn.response.getheader('Content-Length'),
str(len(resp_body)))
self.assertRaises(ResponseError, file_item.info,
parms={'part-number': '10001'})
resp_body = file_item.conn.response.read()
self.assertEqual(416, file_item.conn.response.status)
self.assertEqual(b'', resp_body)
check_headers(file_item.conn.response)
self.assertEqual(file_item.conn.response.getheader('Content-Length'),
'0')
def test_get_part_number_simple_manifest(self):
file_item = self.env.container.file('manifest-abcde')
seg_info_list = [
self.env.seg_info["seg_%s" % letter]
for letter in ['a', 'b', 'c', 'd', 'e']
]
checksum = md5(usedforsecurity=False)
total_size = 0
for seg_info in seg_info_list:
checksum.update(seg_info['etag'].encode('ascii'))
total_size += seg_info['size_bytes']
slo_etag = checksum.hexdigest()
start = 0
manifest_etag = None
for i, seg_info in enumerate(seg_info_list, start=1):
part_contents = file_item.read(parms={'part-number': i})
self.assertEqual(len(part_contents), seg_info['size_bytes'])
headers = dict(
(h.lower(), v)
for h, v in file_item.conn.response.getheaders())
self.assertEqual(headers['content-length'],
str(seg_info['size_bytes']))
self.assertEqual(headers['etag'], '"%s"' % slo_etag)
if not manifest_etag:
manifest_etag = headers['x-manifest-etag']
else:
self.assertEqual(headers['x-manifest-etag'], manifest_etag)
end = start + seg_info['size_bytes'] - 1
self.assertEqual(headers['content-range'],
'bytes %d-%d/%d' % (start, end, total_size), i)
self.assertEqual(headers['x-parts-count'], '5')
start = end + 1
def test_head_part_number_simple_manifest(self):
file_item = self.env.container.file('manifest-abcde')
seg_info_list = [
self.env.seg_info["seg_%s" % letter]
for letter in ['a', 'b', 'c', 'd', 'e']
]
checksum = md5(usedforsecurity=False)
total_size = 0
for seg_info in seg_info_list:
checksum.update(seg_info['etag'].encode('ascii'))
total_size += seg_info['size_bytes']
slo_etag = checksum.hexdigest()
start = 0
manifest_etag = None
for i, seg_info in enumerate(seg_info_list, start=1):
part_info = file_item.info(parms={'part-number': i},
exp_status=206)
headers = dict(
(h.lower(), v)
for h, v in file_item.conn.response.getheaders())
self.assertEqual(headers['content-length'],
str(seg_info['size_bytes']))
self.assertEqual(headers['etag'], '"%s"' % slo_etag)
self.assertEqual(headers['etag'], part_info['etag'])
if not manifest_etag:
manifest_etag = headers['x-manifest-etag']
else:
self.assertEqual(headers['x-manifest-etag'], manifest_etag)
end = start + seg_info['size_bytes'] - 1
self.assertEqual(headers['content-range'],
'bytes %d-%d/%d' % (start, end, total_size), i)
self.assertEqual(headers['x-parts-count'], '5')
start = end + 1
def test_x_delete_at_with_part_number_and_open_expired(self):
cont_name = self.env.account.container(self.env.container.name)
allow_open_expired = config_true_value(tf.cluster_info['swift'].get(
'allow_open_expired', 'false'))
if not allow_open_expired:
raise SkipTest('allow_open_expired is disabled')
# data for segments
segments = [b'one', b'two', b'three', b'four']
etags = []
for segment in segments:
etags.append(md5hex(segment))
def put_manifest(url, token, parsed, conn, object_segments):
now = int(time.time())
delete_time = now + 2
manifest_data = []
start = 0
for segment_object in range(len(object_segments)):
size = len(object_segments[segment_object])
end = start + size - 1
manifest_data.append({
'path': '/%s/segments/%s' % (cont_name,
str(segment_object)),
'etag': etags[segment_object],
'size_bytes': size,
})
start = end + 1
conn.request(
'PUT',
'%s/%s/manifest?multipart-manifest=put' % (parsed.path,
cont_name),
body=json.dumps(manifest_data),
headers={
'X-Auth-Token': token,
'X-Delete-At': delete_time,
'X-Static-Large-Object': 'true',
'Content-Type': 'application/json'
}
)
resp = check_response(conn)
body = resp.read()
self.assertEqual(resp.status, 201,
"Response status is not 201: %s" % body)
def put_segments(url, token, parsed, conn, object_segments):
now = int(time.time())
delete_time = now + 2
for objnum in range(len(object_segments)):
conn.request('PUT', '%s/%s/segments/%s' % (
parsed.path,
cont_name,
str(objnum)),
body=object_segments[objnum],
headers={
'X-Auth-Token': token,
'X-Delete-At': delete_time})
resp = check_response(conn)
body = resp.read()
self.assertEqual(resp.status, 201,
"Response status is not 201: %s" % body)
retry(put_segments, segments)
retry(put_manifest, segments)
# get the manifest
def get_manifest(url, token, parsed, conn, extra_headers=None):
headers = {'X-Auth-Token': token}
if extra_headers:
headers.update(extra_headers)
conn.request(
'GET',
'%s/%s/manifest?multipart-manifest=get' %
(parsed.path, cont_name),
'', headers)
return check_response(conn)
resp = retry(get_manifest)
self.assertEqual(resp.status, 200)
# wait for the manifest to expire
# the objects will also have expired at the same time
# since their x-delete-at times are the same
time.sleep(3)
resp = retry(get_manifest)
resp.read()
# check to see manifest has expired
self.assertEqual(resp.status, 404, resp.headers.get('x-trans-id'))
def get_or_head_part(url, token, parsed, conn,
extra_headers=None, method='GET',
part_number=None):
headers = {'X-Auth-Token': token}
if extra_headers:
headers.update(extra_headers)
conn.request(method, '%s/%s/manifest?part-number=%s' % (
parsed.path,
cont_name,
part_number
), '', headers)
return check_response(conn)
resp = retry(get_manifest, extra_headers={'X-Open-Expired': True})
resp.read()
# read the expired object with magic x-open-expired header
self.assertEqual(resp.status, 200)
for objnum in range(len(segments)):
part_num = str(objnum + 1)
resp = retry(get_or_head_part,
extra_headers={'X-Open-Expired': True},
part_number=part_num,)
resp.read()
self.assertEqual(resp.status, 206)
for objnum in range(len(segments)):
part_num = str(objnum + 1)
resp = retry(get_or_head_part,
extra_headers={'X-Open-Expired': True},
method='HEAD', part_number=part_num)
resp.read()
self.assertEqual(resp.status, 206)
# no x-open-expired case and it should 404
for objnum in range(len(segments)):
part_num = str(objnum + 1)
resp = retry(get_or_head_part,
method='HEAD', part_number=part_num)
resp.read()
self.assertEqual(resp.status, 404)
# same situation here
for objnum in range(len(segments)):
part_num = str(objnum + 1)
resp = retry(get_or_head_part,
method='GET', part_number=part_num)
resp.read()
self.assertEqual(resp.status, 404)
def head_manifest(url, token, parsed, conn, extra_headers=None):
headers = {'X-Auth-Token': token}
if extra_headers:
headers.update(extra_headers)
conn.request('HEAD', '%s/%s/manifest' % (parsed.path,
cont_name),
'', headers)
return check_response(conn)
resp = retry(head_manifest, extra_headers={'X-Open-Expired': True})
resp.read()
# head expired object with magic x-open-expired header
self.assertEqual(resp.status, 200)
def test_slo_container_listing(self):
# the listing object size should equal the sum of the size of the
# segments, not the size of the manifest body
file_item = self.env.container.file(Utils.create_name())
file_item.write(
json.dumps([self.env.seg_info['seg_a']]).encode('ascii'),
parms={'multipart-manifest': 'put'})
# The container listing exposes BOTH the MD5 of the manifest content
# and the SLO MD5-of-MD5s by splitting the latter out into a separate
# key. These should remain consistent when the object is updated with
# a POST.
file_item.initialize(parms={'multipart-manifest': 'get'})
manifest_etag = file_item.etag
if tf.cluster_info.get('etag_quoter', {}).get('enable_by_default'):
self.assertTrue(manifest_etag.startswith('"'))
self.assertTrue(manifest_etag.endswith('"'))
# ...but in the listing, it'll be stripped
manifest_etag = manifest_etag[1:-1]
else:
self.assertFalse(manifest_etag.startswith('"'))
self.assertFalse(manifest_etag.endswith('"'))
file_item.initialize()
slo_etag = file_item.etag
self.assertTrue(slo_etag.startswith('"'))
self.assertTrue(slo_etag.endswith('"'))
listing = self.env.container.files(parms={'format': 'json'})
for f_dict in listing:
if f_dict['name'] == file_item.name:
self.assertEqual(1024 * 1024, f_dict['bytes'])
self.assertEqual('application/octet-stream',
f_dict['content_type'])
self.assertEqual(manifest_etag, f_dict['hash'])
self.assertEqual(slo_etag, f_dict['slo_etag'])
break
else:
self.fail('Failed to find manifest file in container listing')
# now POST updated content-type file
file_item.content_type = 'image/jpeg'
file_item.sync_metadata({'X-Object-Meta-Test': 'blah'})
file_item.initialize()
self.assertEqual('image/jpeg', file_item.content_type) # sanity
# verify that the container listing is consistent with the file
listing = self.env.container.files(parms={'format': 'json'})
for f_dict in listing:
if f_dict['name'] == file_item.name:
self.assertEqual(1024 * 1024, f_dict['bytes'])
self.assertEqual(file_item.content_type,
f_dict['content_type'])
self.assertEqual(manifest_etag, f_dict['hash'])
self.assertEqual(slo_etag, f_dict['slo_etag'])
break
else:
self.fail('Failed to find manifest file in container listing')
# now POST with no change to content-type
file_item.sync_metadata({'X-Object-Meta-Test': 'blah'},
cfg={'no_content_type': True})
file_item.initialize()
self.assertEqual('image/jpeg', file_item.content_type) # sanity
# verify that the container listing is consistent with the file
listing = self.env.container.files(parms={'format': 'json'})
for f_dict in listing:
if f_dict['name'] == file_item.name:
self.assertEqual(1024 * 1024, f_dict['bytes'])
self.assertEqual(file_item.content_type,
f_dict['content_type'])
self.assertEqual(manifest_etag, f_dict['hash'])
self.assertEqual(slo_etag, f_dict['slo_etag'])
break
else:
self.fail('Failed to find manifest file in container listing')
def test_slo_get_nested_manifest(self):
file_item = self.env.container.file('manifest-abcde-submanifest')
file_contents = file_item.read()
self.assertEqual(4 * 1024 * 1024 + 1, len(file_contents))
self.assertEqual([
(b'a', 1024 * 1024),
(b'b', 1024 * 1024),
(b'c', 1024 * 1024),
(b'd', 1024 * 1024),
(b'e', 1),
], group_file_contents(file_item.read()))
def test_slo_get_ranged_manifest(self):
file_item = self.env.container.file('ranged-manifest')
self.assertEqual([
(b'c', 1),
(b'd', 1024 * 1024),
(b'e', 1),
(b'a', 512 * 1024),
(b'b', 512 * 1024),
(b'c', 1),
(b'd', 1),
], group_file_contents(file_item.read()))
def test_slo_get_ranged_manifest_repeated_segment(self):
file_item = self.env.container.file('ranged-manifest-repeated-segment')
self.assertEqual(
[(b'a', 2097152), (b'b', 1048576)],
group_file_contents(file_item.read()))
def test_slo_get_ranged_submanifest(self):
file_item = self.env.container.file('ranged-submanifest')
self.assertEqual([
(b'c', 1024 * 1024 + 1),
(b'd', 1024 * 1024),
(b'e', 1),
(b'a', 512 * 1024),
(b'b', 512 * 1024),
(b'c', 1),
(b'd', 512 * 1024 + 1),
(b'e', 1),
(b'a', 512 * 1024),
(b'b', 1),
(b'c', 1),
(b'd', 1),
], group_file_contents(file_item.read()))
def test_slo_ranged_get(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read(size=1024 * 1024 + 2,
offset=1024 * 1024 - 1)
self.assertEqual(file_item.conn.response.status, 206)
headers = dict(
(h.lower(), v)
for h, v in file_item.conn.response.getheaders())
self.assertIn('etag', headers)
self.assertEqual(headers['etag'], '"%s"' % self.manifest_abcde_etag)
self.assertEqual([
(b'a', 1),
(b'b', 1048576),
(b'c', 1),
], group_file_contents(file_contents))
def test_slo_ranged_get_half_open_on_right(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read(
hdrs={"Range": "bytes=1048571-"})
self.assertEqual([
(b'a', 5),
(b'b', 1048576),
(b'c', 1048576),
(b'd', 1048576),
(b'e', 1)
], group_file_contents(file_contents))
def test_slo_ranged_get_half_open_on_left(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read(
hdrs={"Range": "bytes=-123456"})
self.assertEqual([
(b'd', 123455),
(b'e', 1),
], group_file_contents(file_contents))
def test_slo_multi_ranged_get(self):
file_item = self.env.container.file('manifest-abcde')
file_contents = file_item.read(
hdrs={"Range": "bytes=1048571-1048580,2097147-2097156"})
# See testMultiRangeGets for explanation
if six.PY2:
parser = email.parser.FeedParser()
else:
parser = email.parser.BytesFeedParser()
parser.feed((
"Content-Type: %s\r\n\r\n" % file_item.content_type).encode())
parser.feed(file_contents)
root_message = parser.close()
self.assertTrue(root_message.is_multipart()) # sanity check
byteranges = root_message.get_payload()
self.assertEqual(len(byteranges), 2)
self.assertEqual(byteranges[0]['Content-Type'],
"application/octet-stream")
self.assertEqual(
byteranges[0]['Content-Range'], "bytes 1048571-1048580/4194305")
self.assertEqual(byteranges[0].get_payload(decode=True), b"aaaaabbbbb")
self.assertEqual(byteranges[1]['Content-Type'],
"application/octet-stream")
self.assertEqual(
byteranges[1]['Content-Range'], "bytes 2097147-2097156/4194305")
self.assertEqual(byteranges[1].get_payload(decode=True), b"bbbbbccccc")
def test_slo_ranged_submanifest(self):
file_item = self.env.container.file('manifest-abcde-submanifest')
file_contents = file_item.read(size=1024 * 1024 + 2,
offset=1024 * 1024 * 2 - 1)
self.assertEqual([
(b'b', 1),
(b'c', 1024 * 1024),
(b'd', 1),
], group_file_contents(file_contents))
def test_slo_etag_is_quote_wrapped_hash_of_etags(self):
# we have this check in test_slo_get_simple_manifest, too,
# but verify that it holds for HEAD requests
file_item = self.env.container.file('manifest-abcde')
self.assertEqual('"%s"' % self.manifest_abcde_etag,
file_item.info()['etag'])
def test_slo_etag_is_quote_wrapped_hash_of_etags_submanifests(self):
def hd(x):
return md5(x, usedforsecurity=False).hexdigest().encode('ascii')
expected_etag = hd(hd(b'a' * 1024 * 1024) +
hd(hd(b'b' * 1024 * 1024) +
hd(hd(b'c' * 1024 * 1024) +
hd(b'd' * 1024 * 1024))) +
hd(b'e'))
file_item = self.env.container.file('manifest-abcde-submanifest')
self.assertEqual('"%s"' % expected_etag.decode('ascii'),
file_item.info()['etag'])
def test_slo_etag_mismatch(self):
file_item = self.env.container.file("manifest-a-bad-etag")
try:
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'etag': 'not it',
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_size_mismatch(self):
file_item = self.env.container.file("manifest-a-bad-size")
try:
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024 - 1,
'etag': md5(
b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_client_etag_mismatch(self):
file_item = self.env.container.file("manifest-a-mismatch-etag")
try:
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'etag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'},
hdrs={'Etag': 'NOTetagofthesegments'})
except ResponseError as err:
self.assertEqual(422, err.status)
def test_slo_client_etag(self):
file_item = self.env.container.file("manifest-a-b-etag")
etag_a = md5(b'a' * 1024 * 1024, usedforsecurity=False).hexdigest()
etag_b = md5(b'b' * 1024 * 1024, usedforsecurity=False).hexdigest()
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'etag': etag_a,
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}, {
'size_bytes': 1024 * 1024,
'etag': etag_b,
'path': '/%s/%s' % (self.env.container.name, 'seg_b'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'},
hdrs={'Etag': md5((etag_a + etag_b).encode(),
usedforsecurity=False).hexdigest()})
self.assert_status(201)
def test_slo_unspecified_etag(self):
file_item = self.env.container.file("manifest-a-unspecified-etag")
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'etag': None,
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_unspecified_size(self):
file_item = self.env.container.file("manifest-a-unspecified-size")
file_item.write(
json.dumps([{
'size_bytes': None,
'etag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_funky_segment(self):
file_item = self.env.container.file("manifest-with-funky-segment")
file_item.write(
json.dumps([{
'path': '/%s/%s' % (self.env.container.name,
'seg_with_%ff_funky_name'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
self.assertEqual(b'z' * 10, file_item.read())
def test_slo_missing_etag(self):
file_item = self.env.container.file("manifest-a-missing-etag")
file_item.write(
json.dumps([{
'size_bytes': 1024 * 1024,
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_missing_size(self):
file_item = self.env.container.file("manifest-a-missing-size")
file_item.write(
json.dumps([{
'etag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_path_only(self):
file_item = self.env.container.file("manifest-a-path-only")
file_item.write(
json.dumps([{
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assert_status(201)
def test_slo_typo_etag(self):
file_item = self.env.container.file("manifest-a-typo-etag")
try:
file_item.write(
json.dumps([{
'teag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'size_bytes': 1024 * 1024,
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_typo_size(self):
file_item = self.env.container.file("manifest-a-typo-size")
try:
file_item.write(
json.dumps([{
'etag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'siz_bytes': 1024 * 1024,
'path': '/%s/%s' % (self.env.container.name, 'seg_a'),
}]).encode('ascii'),
parms={'multipart-manifest': 'put'})
except ResponseError as err:
self.assertEqual(400, err.status)
else:
self.fail("Expected ResponseError but didn't get it")
def test_slo_overwrite_segment_with_manifest(self):
file_item = self.env.container.file("seg_b")
with self.assertRaises(ResponseError) as catcher:
file_item.write(
json.dumps([
{'size_bytes': 1024 * 1024,
'etag': md5(b'a' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_a')},
{'size_bytes': 1024 * 1024,
'etag': md5(b'b' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_b')},
{'size_bytes': 1024 * 1024,
'etag': md5(b'c' * 1024 * 1024,
usedforsecurity=False).hexdigest(),
'path': '/%s/%s' % (self.env.container.name, 'seg_c')},
]).encode('ascii'),
parms={'multipart-manifest': 'put'})
self.assertEqual(400, catcher.exception.status)
def test_slo_copy(self):
file_item = self.env.container.file("manifest-abcde")
file_item.copy(self.env.container.name, "copied-abcde")
copied = self.env.container.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
self.assertEqual(4 * 1024 * 1024 + 1, len(copied_contents))
def test_slo_copy_using_x_copy_from(self):
# as per test_slo_copy but using a PUT with x-copy-from
file_item = self.env.container.file("manifest-abcde")
file_item.copy_using_x_copy_from(
self.env.container.name, "copied-abcde")
copied = self.env.container.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
self.assertEqual(4 * 1024 * 1024 + 1, len(copied_contents))
def test_slo_copy_part_number(self):
file_item = self.env.container.file("manifest-abcde")
file_item.copy(self.env.container.name, "copied-abcde",
parms={'part-number': '1'})
copied = self.env.container.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
# just the first part is copied
self.assertEqual(1024 * 1024, len(copied_contents))
self.assertEqual(b'a' * 10, copied_contents[:10])
def test_slo_copy_part_number_using_x_copy_from(self):
# as per test_slo_copy_part_number but using a PUT with x-copy-from
file_item = self.env.container.file("manifest-abcde")
# part-number on the client PUT target is actually applied to the
# internal GET source request
file_item.copy_using_x_copy_from(
self.env.container.name, "copied-abcde",
parms={'part-number': '1'})
copied = self.env.container.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
# just the first part is copied
self.assertEqual(1024 * 1024, len(copied_contents))
self.assertEqual(b'a' * 10, copied_contents[:10])
def test_slo_copy_account(self):
acct = urllib.parse.unquote(self.env.conn.account_name)
# same account copy
file_item = self.env.container.file("manifest-abcde")
file_item.copy_account(acct, self.env.container.name, "copied-abcde")
copied = self.env.container.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
self.assertEqual(4 * 1024 * 1024 + 1, len(copied_contents))
if not tf.skip2:
# copy to different account
acct = urllib.parse.unquote(self.env.conn2.account_name)
dest_cont = self.env.account2.container(Utils.create_name())
self.assertTrue(dest_cont.create(hdrs={
'X-Container-Write': self.env.conn.user_acl
}))
file_item = self.env.container.file("manifest-abcde")
file_item.copy_account(acct, dest_cont, "copied-abcde")
copied = dest_cont.file("copied-abcde")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
self.assertEqual(4 * 1024 * 1024 + 1, len(copied_contents))
def test_slo_copy_the_manifest(self):
source = self.env.container.file("manifest-abcde")
source.initialize(parms={'multipart-manifest': 'get'})
source_contents = source.read(parms={'multipart-manifest': 'get'})
source_json = json.loads(source_contents)
manifest_etag = md5(source_contents, usedforsecurity=False).hexdigest()
if tf.cluster_info.get('etag_quoter', {}).get('enable_by_default'):
manifest_etag = '"%s"' % manifest_etag
self.assertEqual(manifest_etag, source.etag)
source.initialize()
self.assertEqual('application/octet-stream', source.content_type)
self.assertNotEqual(manifest_etag, source.etag)
slo_etag = source.etag
self.assertTrue(source.copy(self.env.container.name,
"copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'}))
copied = self.env.container.file("copied-abcde-manifest-only")
copied.initialize(parms={'multipart-manifest': 'get'})
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
copied_json = json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
self.assertEqual(source_json, copied_json)
self.assertEqual(manifest_etag, copied.etag)
copied.initialize()
self.assertEqual('application/octet-stream', copied.content_type)
self.assertEqual(slo_etag, copied.etag)
# verify the listing metadata
listing = self.env.container.files(parms={'format': 'json'})
names = {}
for f_dict in listing:
if f_dict['name'] in ('manifest-abcde',
'copied-abcde-manifest-only'):
names[f_dict['name']] = f_dict
self.assertIn('manifest-abcde', names)
actual = names['manifest-abcde']
self.assertEqual(4 * 1024 * 1024 + 1, actual['bytes'])
self.assertEqual('application/octet-stream', actual['content_type'])
self.assertEqual(normalize_etag(manifest_etag), actual['hash'])
self.assertEqual(slo_etag, actual['slo_etag'])
self.assertIn('copied-abcde-manifest-only', names)
actual = names['copied-abcde-manifest-only']
self.assertEqual(4 * 1024 * 1024 + 1, actual['bytes'])
self.assertEqual('application/octet-stream', actual['content_type'])
self.assertEqual(normalize_etag(manifest_etag), actual['hash'])
self.assertEqual(slo_etag, actual['slo_etag'])
# Test copy manifest including data segments
source = self.env.container.file("mixed-object-data-manifest")
source_contents = source.read(parms={'multipart-manifest': 'get'})
source_json = json.loads(source_contents)
source.copy(
self.env.container.name,
"copied-mixed-object-data-manifest",
parms={'multipart-manifest': 'get'})
copied = self.env.container.file("copied-mixed-object-data-manifest")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
copied_json = json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
self.assertEqual(source_contents, copied_contents)
self.assertEqual(copied_json[0], {
'data': base64.b64encode(b'APRE' * 8).decode('ascii')})
def test_slo_copy_the_manifest_using_x_copy_from(self):
# as per test_slo_copy_the_manifest but using a PUT with x-copy-from
source = self.env.container.file("manifest-abcde")
source.initialize(parms={'multipart-manifest': 'get'})
source_contents = source.read(parms={'multipart-manifest': 'get'})
source_json = json.loads(source_contents)
manifest_etag = md5(source_contents, usedforsecurity=False).hexdigest()
if tf.cluster_info.get('etag_quoter', {}).get('enable_by_default'):
manifest_etag = '"%s"' % manifest_etag
self.assertEqual(manifest_etag, source.etag)
source.initialize()
self.assertEqual('application/octet-stream', source.content_type)
self.assertNotEqual(manifest_etag, source.etag)
# multipart-manifest=get on the client PUT target request actually
# applies to the internal GET source request
self.assertTrue(
source.copy_using_x_copy_from(self.env.container.name,
"copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'}))
copied = self.env.container.file("copied-abcde-manifest-only")
copied.initialize(parms={'multipart-manifest': 'get'})
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
copied_json = json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
self.assertEqual(source_json, copied_json)
self.assertEqual(manifest_etag, copied.etag)
def test_slo_copy_the_manifest_updating_metadata(self):
source = self.env.container.file("manifest-abcde")
source.content_type = 'application/octet-stream'
source.sync_metadata({'test': 'original'})
source.initialize(parms={'multipart-manifest': 'get'})
source_contents = source.read(parms={'multipart-manifest': 'get'})
source_json = json.loads(source_contents)
manifest_etag = md5(source_contents, usedforsecurity=False).hexdigest()
if tf.cluster_info.get('etag_quoter', {}).get('enable_by_default'):
manifest_etag = '"%s"' % manifest_etag
self.assertEqual(manifest_etag, source.etag)
source.initialize()
self.assertEqual('application/octet-stream', source.content_type)
self.assertNotEqual(manifest_etag, source.etag)
slo_etag = source.etag
self.assertEqual(source.metadata['test'], 'original')
self.assertTrue(
source.copy(self.env.container.name, "copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'},
hdrs={'Content-Type': 'image/jpeg',
'X-Object-Meta-Test': 'updated'}))
copied = self.env.container.file("copied-abcde-manifest-only")
copied.initialize(parms={'multipart-manifest': 'get'})
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
copied_json = json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
self.assertEqual(source_json, copied_json)
self.assertEqual(manifest_etag, copied.etag)
copied.initialize()
self.assertEqual('image/jpeg', copied.content_type)
self.assertEqual(slo_etag, copied.etag)
self.assertEqual(copied.metadata['test'], 'updated')
# verify the listing metadata
listing = self.env.container.files(parms={'format': 'json'})
names = {}
for f_dict in listing:
if f_dict['name'] in ('manifest-abcde',
'copied-abcde-manifest-only'):
names[f_dict['name']] = f_dict
self.assertIn('manifest-abcde', names)
actual = names['manifest-abcde']
self.assertEqual(4 * 1024 * 1024 + 1, actual['bytes'])
self.assertEqual('application/octet-stream', actual['content_type'])
# the container listing should have the etag of the manifest contents
self.assertEqual(normalize_etag(manifest_etag), actual['hash'])
self.assertEqual(slo_etag, actual['slo_etag'])
self.assertIn('copied-abcde-manifest-only', names)
actual = names['copied-abcde-manifest-only']
self.assertEqual(4 * 1024 * 1024 + 1, actual['bytes'])
self.assertEqual('image/jpeg', actual['content_type'])
self.assertEqual(normalize_etag(manifest_etag), actual['hash'])
self.assertEqual(slo_etag, actual['slo_etag'])
def test_slo_copy_the_manifest_account(self):
if tf.skip2:
raise SkipTest('Account2 not set')
acct = urllib.parse.unquote(self.env.conn.account_name)
# same account
file_item = self.env.container.file("manifest-abcde")
file_item.copy_account(acct,
self.env.container.name,
"copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'})
copied = self.env.container.file("copied-abcde-manifest-only")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
# different account
acct = urllib.parse.unquote(self.env.conn2.account_name)
dest_cont = self.env.account2.container(Utils.create_name())
self.assertTrue(dest_cont.create(hdrs={
'X-Container-Write': self.env.conn.user_acl
}))
# manifest copy will fail because there is no read access to segments
# in destination account
self.assertRaises(ResponseError, file_item.copy_account,
acct, dest_cont, "copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'})
self.assertEqual(400, file_item.conn.response.status)
resp_body = file_item.conn.response.read()
self.assertEqual(5, resp_body.count(b'403 Forbidden'),
'Unexpected response body %r' % resp_body)
# create segments container in account2 with read access for account1
segs_container = self.env.account2.container(self.env.container.name)
self.assertTrue(segs_container.create(hdrs={
'X-Container-Read': self.env.conn.user_acl
}))
# manifest copy will still fail because there are no segments in
# destination account
self.assertRaises(ResponseError, file_item.copy_account,
acct, dest_cont, "copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'})
self.assertEqual(400, file_item.conn.response.status)
resp_body = file_item.conn.response.read()
self.assertEqual(5, resp_body.count(b'404 Not Found'),
'Unexpected response body %r' % resp_body)
# create segments in account2 container with same name as in account1,
# manifest copy now succeeds
self.env.create_segments(segs_container)
self.assertTrue(file_item.copy_account(
acct, dest_cont, "copied-abcde-manifest-only",
parms={'multipart-manifest': 'get'}))
copied = dest_cont.file("copied-abcde-manifest-only")
copied_contents = copied.read(parms={'multipart-manifest': 'get'})
try:
json.loads(copied_contents)
except ValueError:
self.fail("COPY didn't copy the manifest (invalid json on GET)")
def test_slo_put_heartbeating(self):
if 'yield_frequency' not in cluster_info['slo']:
# old swift?
raise SkipTest('Swift does not seem to support heartbeating')
def do_put(headers=None, include_error=False):
file_item = self.env.container.file("manifest-heartbeat")
seg_info = self.env.seg_info
manifest_data = [seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]
if include_error:
manifest_data.append({'path': 'non-existent/segment'})
resp = file_item.write(
json.dumps(manifest_data).encode('ascii'),
parms={'multipart-manifest': 'put', 'heartbeat': 'on'},
hdrs=headers, return_resp=True)
self.assertEqual(resp.status, 202)
self.assertTrue(resp.chunked)
body_lines = resp.body.split(b'\n', 2)
self.assertFalse(body_lines[0].strip()) # all whitespace
self.assertEqual(b'\r', body_lines[1])
return body_lines[2]
body_lines = do_put().decode('utf8').split('\n')
self.assertIn('Response Status: 201 Created', body_lines)
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
self.assertIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
body_lines = do_put(
{'Accept': 'text/plain'}).decode('utf8').split('\n')
self.assertIn('Response Status: 201 Created', body_lines)
self.assertIn('Etag', [line.split(':', 1)[0] for line in body_lines])
self.assertIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
body = do_put({'Accept': 'application/json'})
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertIn('Etag', resp)
del resp['Etag']
self.assertIn('Last Modified', resp)
del resp['Last Modified']
self.assertEqual(resp, {
'Response Status': '201 Created',
'Response Body': '',
'Errors': [],
})
body_lines = do_put(include_error=True).decode('utf8').split('\n')
self.assertIn('Response Status: 400 Bad Request', body_lines)
self.assertIn('Response Body: Bad Request', body_lines)
self.assertNotIn('Etag', [line.split(':', 1)[0]
for line in body_lines])
self.assertNotIn('Last Modified', [line.split(':', 1)[0]
for line in body_lines])
self.assertEqual(body_lines[-3:], [
'Errors:',
'non-existent/segment, 404 Not Found',
'',
])
body = do_put({'Accept': 'application/json'}, include_error=True)
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertNotIn('Etag', resp)
self.assertNotIn('Last Modified', resp)
self.assertEqual(resp, {
'Response Status': '400 Bad Request',
'Response Body': 'Bad Request\nThe server could not comply with '
'the request since it is either malformed or '
'otherwise incorrect.',
'Errors': [
['non-existent/segment', '404 Not Found'],
],
})
body = do_put({'Accept': 'application/json', 'ETag': 'bad etag'})
try:
resp = json.loads(body)
except ValueError:
self.fail('Expected JSON, got %r' % body)
self.assertNotIn('Etag', resp)
self.assertNotIn('Last Modified', resp)
self.assertEqual(resp, {
'Response Status': '422 Unprocessable Entity',
'Response Body': 'Unprocessable Entity\nUnable to process the '
'contained instructions',
'Errors': [],
})
def _make_manifest(self):
file_item = self.env.container.file("manifest-post")
seg_info = self.env.seg_info
file_item.write(
json.dumps([seg_info['seg_a'], seg_info['seg_b'],
seg_info['seg_c'], seg_info['seg_d'],
seg_info['seg_e']]).encode('ascii'),
parms={'multipart-manifest': 'put'})
return file_item
def test_slo_post_the_manifest_metadata_update(self):
file_item = self._make_manifest()
# sanity check, check the object is an SLO manifest
file_item.info()
file_item.header_fields([('slo', 'x-static-large-object')])
# POST a user metadata (i.e. x-object-meta-post)
file_item.sync_metadata({'post': 'update'})
updated = self.env.container.file("manifest-post")
updated.info()
updated.header_fields([('user-meta', 'x-object-meta-post')]) # sanity
updated.header_fields([('slo', 'x-static-large-object')])
updated_contents = updated.read(parms={'multipart-manifest': 'get'})
try:
json.loads(updated_contents)
except ValueError:
self.fail("Unexpected content on GET, expected a json body")
def test_slo_post_the_manifest_metadata_update_with_qs(self):
# multipart-manifest query should be ignored on post
for verb in ('put', 'get', 'delete'):
file_item = self._make_manifest()
# sanity check, check the object is an SLO manifest
file_item.info()
file_item.header_fields([('slo', 'x-static-large-object')])
# POST a user metadata (i.e. x-object-meta-post)
file_item.sync_metadata(metadata={'post': 'update'},
parms={'multipart-manifest': verb})
updated = self.env.container.file("manifest-post")
updated.info()
updated.header_fields(
[('user-meta', 'x-object-meta-post')]) # sanity
updated.header_fields([('slo', 'x-static-large-object')])
updated_contents = updated.read(
parms={'multipart-manifest': 'get'})
try:
json.loads(updated_contents)
except ValueError:
self.fail(
"Unexpected content on GET, expected a json body")
def test_slo_get_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_body = manifest.read(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
manifest.content_type)
try:
json.loads(got_body)
except ValueError:
self.fail("GET with multipart-manifest=get got invalid json")
def test_slo_get_the_manifest_with_details_from_server(self):
manifest = self.env.container.file("manifest-db")
got_body = manifest.read(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
manifest.content_type)
try:
value = json.loads(got_body)
except ValueError:
self.fail("GET with multipart-manifest=get got invalid json")
self.assertEqual(len(value), 2)
self.assertEqual(value[0]['bytes'], 1024 * 1024)
self.assertEqual(
value[0]['hash'],
md5(b'd' * 1024 * 1024, usedforsecurity=False).hexdigest())
expected_name = '/%s/seg_d' % self.env.container.name
if six.PY2:
expected_name = expected_name.decode("utf-8")
self.assertEqual(value[0]['name'], expected_name)
self.assertEqual(value[1]['bytes'], 1024 * 1024)
self.assertEqual(
value[1]['hash'],
md5(b'b' * 1024 * 1024, usedforsecurity=False).hexdigest())
expected_name = '/%s/seg_b' % self.env.container.name
if six.PY2:
expected_name = expected_name.decode("utf-8")
self.assertEqual(value[1]['name'], expected_name)
def test_slo_get_raw_the_manifest_with_details_from_server(self):
manifest = self.env.container.file("manifest-db")
got_body = manifest.read(parms={'multipart-manifest': 'get',
'format': 'raw'})
self.assert_etag(
md5(got_body, usedforsecurity=False).hexdigest())
# raw format should have the actual manifest object content-type
self.assertEqual('application/octet-stream', manifest.content_type)
try:
value = json.loads(got_body)
except ValueError:
msg = "GET with multipart-manifest=get&format=raw got invalid json"
self.fail(msg)
self.assertEqual(
set(value[0].keys()), set(('size_bytes', 'etag', 'path')))
self.assertEqual(len(value), 2)
self.assertEqual(value[0]['size_bytes'], 1024 * 1024)
self.assertEqual(
value[0]['etag'],
md5(b'd' * 1024 * 1024, usedforsecurity=False).hexdigest())
expected_name = '/%s/seg_d' % self.env.container.name
if six.PY2:
expected_name = expected_name.decode("utf-8")
self.assertEqual(value[0]['path'], expected_name)
self.assertEqual(value[1]['size_bytes'], 1024 * 1024)
self.assertEqual(
value[1]['etag'],
md5(b'b' * 1024 * 1024, usedforsecurity=False).hexdigest())
expected_name = '/%s/seg_b' % self.env.container.name
if six.PY2:
expected_name = expected_name.decode("utf-8")
self.assertEqual(value[1]['path'], expected_name)
file_item = self.env.container.file("manifest-from-get-raw")
file_item.write(got_body, parms={'multipart-manifest': 'put'})
file_contents = file_item.read()
self.assertEqual(2 * 1024 * 1024, len(file_contents))
def test_slo_head_the_manifest(self):
manifest = self.env.container.file("manifest-abcde")
got_info = manifest.info(parms={'multipart-manifest': 'get'})
self.assertEqual('application/json; charset=utf-8',
got_info['content_type'])
def test_slo_if_match_get(self):
manifest = self.env.container.file("manifest-abcde")
etag = manifest.info()['etag']
self.assertRaises(ResponseError, manifest.read,
hdrs={'If-Match': 'not-%s' % etag})
self.assert_status(412)
manifest.read(hdrs={'If-Match': etag})
self.assert_status(200)
def test_slo_if_none_match_put(self):
file_item = self.env.container.file("manifest-if-none-match")
manifest = json.dumps([{
'size_bytes': 1024 * 1024,
'etag': None,
'path': '/%s/%s' % (self.env.container.name, 'seg_a')}])
self.assertRaises(ResponseError, file_item.write,
manifest.encode('ascii'),
parms={'multipart-manifest': 'put'},
hdrs={'If-None-Match': '"not-star"'})
self.assert_status(400)
file_item.write(manifest.encode('ascii'),
parms={'multipart-manifest': 'put'},
hdrs={'If-None-Match': '*'})
self.assert_status(201)
self.assertRaises(ResponseError, file_item.write,
manifest.encode('ascii'),
parms={'multipart-manifest': 'put'},
hdrs={'If-None-Match': '*'})
self.assert_status(412)
def test_slo_if_none_match_get(self):
manifest = self.env.container.file("manifest-abcde")
etag = manifest.info()['etag']
self.assertRaises(ResponseError, manifest.read,
hdrs={'If-None-Match': etag})
self.assert_status(304)
manifest.read(hdrs={'If-None-Match': "not-%s" % etag})
self.assert_status(200)
def test_slo_if_match_head(self):
manifest = self.env.container.file("manifest-abcde")
etag = manifest.info()['etag']
self.assertRaises(ResponseError, manifest.info,
hdrs={'If-Match': 'not-%s' % etag})
self.assert_status(412)
manifest.info(hdrs={'If-Match': etag})
self.assert_status(200)
def test_slo_if_none_match_head(self):
manifest = self.env.container.file("manifest-abcde")
etag = manifest.info()['etag']
self.assertRaises(ResponseError, manifest.info,
hdrs={'If-None-Match': etag})
self.assert_status(304)
manifest.info(hdrs={'If-None-Match': "not-%s" % etag})
self.assert_status(200)
def test_slo_referer_on_segment_container(self):
if tf.skip3:
raise SkipTest('Username3 not set')
# First the account2 (test3) should fail
headers = {'X-Auth-Token': self.env.conn3.storage_token,
'Referer': 'http://blah.example.com'}
slo_file = self.env.container2.file('manifest-abcde')
self.assertRaises(ResponseError, slo_file.read,
hdrs=headers)
self.assert_status(403)
# Now set the referer on the slo container only
referer_metadata = {'X-Container-Read': '.r:*.example.com,.rlistings'}
self.env.container2.update_metadata(referer_metadata)
self.assertRaises(ResponseError, slo_file.read,
hdrs=headers)
self.assert_status(409)
# Finally set the referer on the segment container
self.env.container.update_metadata(referer_metadata)
contents = slo_file.read(hdrs=headers)
self.assertEqual(4 * 1024 * 1024 + 1, len(contents))
self.assertEqual(b'a', contents[:1])
self.assertEqual(b'a', contents[1024 * 1024 - 1:1024 * 1024])
self.assertEqual(b'b', contents[1024 * 1024:1024 * 1024 + 1])
self.assertEqual(b'd', contents[-2:-1])
self.assertEqual(b'e', contents[-1:])
def test_slo_data_segments(self):
# len('APRE' * 8) == 32
# len('APOS' * 16) == 64
# len('BPOS' * 32) == 128
# len('CPRE' * 64) == 256
# len(a_pre + seg_a + post_a) == 32 + 1024 ** 2 + 64
# len(seg_b + post_b) == 1024 ** 2 + 128
# len(c_pre + seg_c) == 256 + 1024 ** 2
# len(total) == 3146208
for file_name in ("mixed-object-data-manifest",
"nested-data-manifest"):
file_item = self.env.container.file(file_name)
file_contents = file_item.read(size=3 * 1024 ** 2 + 456,
offset=28)
self.assertEqual([
(b'A', 1),
(b'P', 1),
(b'R', 1),
(b'E', 1),
(b'a', 1024 * 1024),
] + [
(b'A', 1),
(b'P', 1),
(b'O', 1),
(b'S', 1),
] * 16 + [
(b'b', 1024 * 1024),
] + [
(b'B', 1),
(b'P', 1),
(b'O', 1),
(b'S', 1),
] * 32 + [
(b'C', 1),
(b'P', 1),
(b'R', 1),
(b'E', 1),
] * 64 + [
(b'c', 1024 * 1024),
] + [
(b'C', 1),
(b'P', 1),
(b'O', 1),
(b'S', 1),
], group_file_contents(file_contents))
class TestSloUTF8(Base2, TestSlo):
pass