py3: port encryption

This got away from me a bit with the functional tests masquerading as
unit tests.

Change-Id: I1237c02eff96e53fff8f9661a2d85c4695b73371
This commit is contained in:
Tim Burke 2018-09-10 16:34:50 -06:00 committed by Pete Zaitcev
parent 3465d639e3
commit 37b814657e
22 changed files with 508 additions and 426 deletions

View File

@ -40,7 +40,7 @@ class Crypto(object):
cipher = 'AES_CTR_256'
# AES will accept several key sizes - we are using 256 bits i.e. 32 bytes
key_length = 32
iv_length = algorithms.AES.block_size / 8
iv_length = algorithms.AES.block_size // 8
def __init__(self, conf=None):
self.logger = get_logger(conf, log_route="crypto")
@ -80,7 +80,7 @@ class Crypto(object):
offset_blocks, offset_in_block = divmod(offset, self.iv_length)
ivl = int(binascii.hexlify(iv), 16) + offset_blocks
ivl %= 1 << algorithms.AES.block_size
iv = str(bytearray.fromhex(format(
iv = bytes(bytearray.fromhex(format(
ivl, '0%dx' % (2 * self.iv_length))))
else:
offset_in_block = 0
@ -89,7 +89,7 @@ class Crypto(object):
backend=self.backend)
dec = engine.decryptor()
# Adjust decryption boundary within current AES block
dec.update('*' * offset_in_block)
dec.update(b'*' * offset_in_block)
return dec
def create_iv(self):
@ -274,6 +274,8 @@ def append_crypto_meta(value, crypto_meta):
:param crypto_meta: a dict of crypto meta
:return: a string of the form <value>; swift_meta=<serialized crypto meta>
"""
if not isinstance(value, str):
raise ValueError
return '%s; swift_meta=%s' % (value, dump_crypto_meta(crypto_meta))

View File

@ -24,7 +24,8 @@ from swift.common.middleware.crypto.crypto_utils import CryptoWSGIContext, \
from swift.common.exceptions import EncryptionException, UnknownSecretIdError
from swift.common.request_helpers import get_object_transient_sysmeta, \
get_sys_meta_prefix, get_user_meta_prefix
from swift.common.swob import Request, HTTPException, HTTPInternalServerError
from swift.common.swob import Request, HTTPException, \
HTTPInternalServerError, wsgi_to_bytes, bytes_to_wsgi
from swift.common.utils import get_logger, config_true_value, \
parse_content_range, closing_if_possible, parse_content_type, \
FileLikeIter, multipart_byteranges_to_document_iters
@ -84,7 +85,7 @@ class BaseDecrypterContext(CryptoWSGIContext):
body='Error decrypting %s' % self.server_type,
content_type='text/plain')
def decrypt_value_with_meta(self, value, key, required=False):
def decrypt_value_with_meta(self, value, key, required, encoder):
"""
Base64-decode and decrypt a value if crypto meta can be extracted from
the value itself, otherwise return the value unmodified.
@ -109,14 +110,15 @@ class BaseDecrypterContext(CryptoWSGIContext):
extracted_value, crypto_meta = extract_crypto_meta(value)
if crypto_meta:
self.crypto.check_crypto_meta(crypto_meta)
value = self.decrypt_value(extracted_value, key, crypto_meta)
value = self.decrypt_value(
extracted_value, key, crypto_meta, encoder)
elif required:
raise EncryptionException(
"Missing crypto meta in value %s" % value)
return value
def decrypt_value(self, value, key, crypto_meta):
def decrypt_value(self, value, key, crypto_meta, encoder):
"""
Base64-decode and decrypt a value using the crypto_meta provided.
@ -127,10 +129,10 @@ class BaseDecrypterContext(CryptoWSGIContext):
:returns: decrypted value
"""
if not value:
return ''
return encoder(b'')
crypto_ctxt = self.crypto.create_decryption_ctxt(
key, crypto_meta['iv'], 0)
return crypto_ctxt.update(base64.b64decode(value))
return encoder(crypto_ctxt.update(base64.b64decode(value)))
def get_decryption_keys(self, req, crypto_meta=None):
"""
@ -169,7 +171,8 @@ class DecrypterObjContext(BaseDecrypterContext):
found.
"""
try:
return self.decrypt_value_with_meta(value, key, required)
return self.decrypt_value_with_meta(
value, key, required, bytes_to_wsgi)
except EncryptionException as err:
self.logger.error(
_("Error decrypting header %(header)s: %(error)s"),
@ -251,21 +254,22 @@ class DecrypterObjContext(BaseDecrypterContext):
parts_iter = multipart_byteranges_to_document_iters(
FileLikeIter(resp), boundary)
for first_byte, last_byte, length, headers, body in parts_iter:
yield "--" + boundary + "\r\n"
yield b"--" + boundary + b"\r\n"
for header_pair in headers:
yield "%s: %s\r\n" % header_pair
for header, value in headers:
yield b"%s: %s\r\n" % (wsgi_to_bytes(header),
wsgi_to_bytes(value))
yield "\r\n"
yield b"\r\n"
decrypt_ctxt = self.crypto.create_decryption_ctxt(
body_key, crypto_meta['iv'], first_byte)
for chunk in iter(lambda: body.read(DECRYPT_CHUNK_SIZE), ''):
for chunk in iter(lambda: body.read(DECRYPT_CHUNK_SIZE), b''):
yield decrypt_ctxt.update(chunk)
yield "\r\n"
yield b"\r\n"
yield "--" + boundary + "--"
yield b"--" + boundary + b"--"
def response_iter(self, resp, body_key, crypto_meta, offset):
"""
@ -331,7 +335,7 @@ class DecrypterObjContext(BaseDecrypterContext):
if (self._get_status_int() == 206 and
content_type == 'multipart/byteranges'):
boundary = dict(content_type_attrs)["boundary"]
boundary = wsgi_to_bytes(dict(content_type_attrs)["boundary"])
resp_iter = self.multipart_response_iter(
app_resp, boundary, body_key, put_crypto_meta)
else:
@ -380,10 +384,10 @@ class DecrypterContContext(BaseDecrypterContext):
Content-Length header with new body length and return a body iter.
"""
with closing_if_possible(resp_iter):
resp_body = ''.join(resp_iter)
resp_body = b''.join(resp_iter)
body_json = json.loads(resp_body)
new_body = json.dumps([self.decrypt_obj_dict(req, obj_dict)
for obj_dict in body_json])
for obj_dict in body_json]).encode('ascii')
self.update_content_length(len(new_body))
return [new_body]
@ -397,8 +401,11 @@ class DecrypterContContext(BaseDecrypterContext):
try:
self.crypto.check_crypto_meta(crypto_meta)
keys = self.get_decryption_keys(req, crypto_meta)
# Note that symlinks (for example) may put swift paths in
# the listing ETag, so we can't just use ASCII.
obj_dict['hash'] = self.decrypt_value(
ciphertext, keys['container'], crypto_meta)
ciphertext, keys['container'], crypto_meta,
encoder=lambda x: x.decode('utf-8'))
except EncryptionException as err:
if not isinstance(err, UnknownSecretIdError) or \
err.args[0] not in bad_keys:

View File

@ -24,7 +24,7 @@ from swift.common.middleware.crypto.crypto_utils import CryptoWSGIContext, \
from swift.common.request_helpers import get_object_transient_sysmeta, \
strip_user_meta_prefix, is_user_meta, update_etag_is_at_header
from swift.common.swob import Request, Match, HTTPException, \
HTTPUnprocessableEntity
HTTPUnprocessableEntity, wsgi_to_bytes, bytes_to_wsgi
from swift.common.utils import get_logger, config_true_value, \
MD5_OF_EMPTY_STRING
@ -46,7 +46,8 @@ def encrypt_header_val(crypto, value, key):
crypto_meta = crypto.create_crypto_meta()
crypto_ctxt = crypto.create_encryption_ctxt(key, crypto_meta['iv'])
enc_val = base64.b64encode(crypto_ctxt.update(value))
enc_val = bytes_to_wsgi(base64.b64encode(
crypto_ctxt.update(wsgi_to_bytes(value))))
return enc_val, crypto_meta
@ -58,6 +59,8 @@ def _hmac_etag(key, etag):
:param etag: The etag to hash.
:returns: a Base64-encoded representation of the HMAC
"""
if not isinstance(etag, bytes):
etag = wsgi_to_bytes(etag)
result = hmac.new(key, etag, digestmod=hashlib.sha256).digest()
return base64.b64encode(result).decode()

View File

@ -18,7 +18,7 @@ import os
from swift.common.exceptions import UnknownSecretIdError
from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK
from swift.common.swob import Request, HTTPException
from swift.common.swob import Request, HTTPException, wsgi_to_bytes
from swift.common.utils import readconf, strict_b64decode, get_logger
from swift.common.wsgi import WSGIContext
@ -188,7 +188,8 @@ class BaseKeyMaster(object):
@property
def root_secret_ids(self):
return sorted(self._root_secrets.keys())
# Only sorted to simplify testing
return sorted(self._root_secrets.keys(), key=lambda x: x or '')
def _load_keymaster_config_file(self, conf):
if not self.keymaster_config_path:
@ -256,7 +257,8 @@ class BaseKeyMaster(object):
self.logger.warning('Unrecognised secret id: %s' % secret_id)
raise UnknownSecretIdError(secret_id)
else:
return hmac.new(key, path, digestmod=hashlib.sha256).digest()
return hmac.new(key, wsgi_to_bytes(path),
digestmod=hashlib.sha256).digest()
class KeyMaster(BaseKeyMaster):

View File

@ -1005,7 +1005,7 @@ class Request(object):
"""
_ver, entity_path = self.split_path(1, 2, rest_with_last=True)
if entity_path is not None:
return '/' + entity_path
return '/' + wsgi_to_str(entity_path)
@property
def is_chunked(self):

View File

@ -22,6 +22,8 @@ from swift import gettext_ as _
from eventlet import Timeout
import six
import swift.common.db
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR, \
@ -553,10 +555,11 @@ class ContainerController(BaseStorageServer):
response = dict(record)
else:
(name, created, size, content_type, etag) = record[:5]
name_ = name.decode('utf8') if six.PY2 else name
if content_type is None:
return {'subdir': name.decode('utf8')}
return {'subdir': name_}
response = {
'bytes': size, 'hash': etag, 'name': name.decode('utf8'),
'bytes': size, 'hash': etag, 'name': name_,
'content_type': content_type}
override_bytes_from_content_type(response, logger=self.logger)
response['last_modified'] = Timestamp(created).isoformat
@ -712,7 +715,7 @@ class ContainerController(BaseStorageServer):
if out_content_type.endswith('/xml'):
body = listing_formats.container_to_xml(listing, container)
elif out_content_type.endswith('/json'):
body = json.dumps(listing)
body = json.dumps(listing).encode('ascii')
else:
body = listing_formats.listing_to_text(listing)

View File

@ -2913,7 +2913,7 @@ class ECDiskFileReader(BaseDiskFileReader):
# TODO: reset frag buf to '' if tell() shows that start is on a frag
# boundary so that we check frags selected by a range not starting at 0
if self._started_at_0:
self.frag_buf = ''
self.frag_buf = b''
else:
self.frag_buf = None

View File

@ -55,7 +55,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPCreated, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPUnprocessableEntity, \
HTTPClientDisconnect, HTTPMethodNotAllowed, Request, Response, \
HTTPInsufficientStorage, HTTPForbidden, HTTPException, HTTPConflict, \
HTTPServerError
HTTPServerError, wsgi_to_bytes
from swift.obj.diskfile import RESERVED_DATAFILE_META, DiskFileRouter
@ -537,7 +537,7 @@ class ObjectController(BaseStorageServer):
"""
timeout_reader = self._make_timeout_reader(footer_iter)
try:
footer_body = ''.join(iter(timeout_reader, ''))
footer_body = b''.join(iter(timeout_reader, b''))
except ChunkReadError:
raise HTTPClientDisconnect()
except ChunkReadTimeout:
@ -826,8 +826,8 @@ class ObjectController(BaseStorageServer):
if have_metadata_footer or use_multiphase_commit:
obj_input.set_hundred_continue_response_headers(
hundred_continue_headers)
mime_boundary = request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary')
mime_boundary = wsgi_to_bytes(request.headers.get(
'X-Backend-Obj-Multipart-Mime-Boundary'))
if not mime_boundary:
raise HTTPBadRequest("no MIME boundary")
@ -855,7 +855,7 @@ class ObjectController(BaseStorageServer):
elapsed_time = 0
upload_expiration = time.time() + self.max_upload_time
timeout_reader = self._make_timeout_reader(obj_input)
for chunk in iter(timeout_reader, ''):
for chunk in iter(timeout_reader, b''):
start_time = time.time()
if start_time > upload_expiration:
self.logger.increment('PUT.timeouts')
@ -1342,7 +1342,7 @@ class ObjectController(BaseStorageServer):
except Exception:
self.logger.exception("zero_copy_send() blew up")
raise
yield ''
yield b''
# Get headers ready to go out
res(env, start_response)

View File

@ -79,7 +79,7 @@ def update_headers(response, headers):
if hasattr(headers, 'items'):
headers = headers.items()
for name, value in headers:
if name == 'etag':
if name.lower() == 'etag':
response.headers[name] = value.replace('"', '')
elif name.lower() not in (
'date', 'content-length', 'content-type',

View File

@ -461,7 +461,7 @@ class BaseObjectController(Controller):
if final_phase:
body = response.read()
else:
body = ''
body = b''
bodies.append(body)
if response.status == HTTP_INSUFFICIENT_STORAGE:
putter.failed = True
@ -497,7 +497,7 @@ class BaseObjectController(Controller):
while len(statuses) < num_nodes:
statuses.append(HTTP_SERVICE_UNAVAILABLE)
reasons.append('')
bodies.append('')
bodies.append(b'')
return statuses, reasons, bodies, etags
@ -811,7 +811,7 @@ class BaseObjectController(Controller):
self.app.client_chunk_size)
except (ValueError, IOError) as e:
raise ChunkReadError(str(e))
data_source = iter(reader, '')
data_source = iter(reader, b'')
# check if object is set to be automatically deleted (i.e. expired)
req, delete_at_container, delete_at_part, \
@ -1071,7 +1071,7 @@ class ECAppIter(object):
self.range_specs = range_specs
self.fa_length = fa_length
self.obj_length = obj_length if obj_length is not None else 0
self.boundary = ''
self.boundary = b''
self.logger = logger
self.mime_boundary = None
@ -1418,7 +1418,7 @@ class ECAppIter(object):
def put_fragments_in_queue(frag_iter, queue):
try:
for fragment in frag_iter:
if fragment.startswith(' '):
if fragment.startswith(b' '):
raise Exception('Leading whitespace on fragment.')
queue.put(fragment)
except GreenletExit:
@ -1687,7 +1687,7 @@ class Putter(object):
if self.state == DATA_SENT:
raise ValueError("called end_of_object_data twice")
self.queue.put('')
self.queue.put(b'')
self.state = DATA_SENT
def _send_file(self, write_timeout, exception_handler):
@ -1702,7 +1702,7 @@ class Putter(object):
chunk = self.queue.get()
if not self.failed:
if self.chunked:
to_send = "%x\r\n%s\r\n" % (len(chunk), chunk)
to_send = b"%x\r\n%s\r\n" % (len(chunk), chunk)
else:
to_send = chunk
try:
@ -1791,7 +1791,7 @@ class MIMEPutter(Putter):
# We're sending the object plus other stuff in the same request
# body, all wrapped up in multipart MIME, so we'd better start
# off the MIME document before sending any object data.
self.queue.put("--%s\r\nX-Document: object body\r\n\r\n" %
self.queue.put(b"--%s\r\nX-Document: object body\r\n\r\n" %
(self.mime_boundary,))
def end_of_object_data(self, footer_metadata=None):
@ -1809,25 +1809,25 @@ class MIMEPutter(Putter):
elif self.state == NO_DATA_SENT and self.mime_boundary:
self._start_object_data()
footer_body = json.dumps(footer_metadata)
footer_md5 = md5(footer_body).hexdigest()
footer_body = json.dumps(footer_metadata).encode('ascii')
footer_md5 = md5(footer_body).hexdigest().encode('ascii')
tail_boundary = ("--%s" % (self.mime_boundary,))
tail_boundary = (b"--%s" % (self.mime_boundary,))
if not self.multiphase:
# this will be the last part sent
tail_boundary = tail_boundary + "--"
tail_boundary = tail_boundary + b"--"
message_parts = [
("\r\n--%s\r\n" % self.mime_boundary),
"X-Document: object metadata\r\n",
"Content-MD5: %s\r\n" % footer_md5,
"\r\n",
footer_body, "\r\n",
tail_boundary, "\r\n",
(b"\r\n--%s\r\n" % self.mime_boundary),
b"X-Document: object metadata\r\n",
b"Content-MD5: %s\r\n" % footer_md5,
b"\r\n",
footer_body, b"\r\n",
tail_boundary, b"\r\n",
]
self.queue.put("".join(message_parts))
self.queue.put(b"".join(message_parts))
self.queue.put('')
self.queue.put(b'')
self.state = DATA_SENT
def send_commit_confirmation(self):
@ -1844,17 +1844,17 @@ class MIMEPutter(Putter):
self.state = DATA_ACKED
if self.mime_boundary:
body = "put_commit_confirmation"
tail_boundary = ("--%s--" % (self.mime_boundary,))
body = b"put_commit_confirmation"
tail_boundary = (b"--%s--" % (self.mime_boundary,))
message_parts = [
"X-Document: put commit\r\n",
"\r\n",
body, "\r\n",
b"X-Document: put commit\r\n",
b"\r\n",
body, b"\r\n",
tail_boundary,
]
self.queue.put("".join(message_parts))
self.queue.put(b"".join(message_parts))
self.queue.put('')
self.queue.put(b'')
self.state = COMMIT_SENT
@classmethod
@ -1874,7 +1874,7 @@ class MIMEPutter(Putter):
:raises MultiphasePUTNotSupported: if need_multiphase is set but
backend node can't handle multiphase PUT
"""
mime_boundary = "%.64x" % random.randint(0, 16 ** 64)
mime_boundary = b"%.64x" % random.randint(0, 16 ** 64)
headers = HeaderKeyDict(headers)
# when using a multipart mime request to backend the actual
# content-length is not equal to the object content size, so move the
@ -1944,7 +1944,7 @@ def chunk_transformer(policy):
pieces.append(piece)
to_take -= len(piece)
total_buf_len -= len(piece)
chunks_to_encode.append(''.join(pieces))
chunks_to_encode.append(b''.join(pieces))
frags_by_byte_order = []
for chunk_to_encode in chunks_to_encode:
@ -1962,7 +1962,7 @@ def chunk_transformer(policy):
# (frag_B1 + frag_B2 + ...), # destined for node B
# (frag_C1 + frag_C2 + ...), # destined for node C
# ...]
obj_data = [''.join(frags)
obj_data = [b''.join(frags)
for frags in zip(*frags_by_byte_order)]
chunk = yield obj_data
else:
@ -1971,12 +1971,12 @@ def chunk_transformer(policy):
# Now we've gotten an empty chunk, which indicates end-of-input.
# Take any leftover bytes and encode them.
last_bytes = ''.join(buf)
last_bytes = b''.join(buf)
if last_bytes:
last_frags = policy.pyeclib_driver.encode(last_bytes)
yield last_frags
else:
yield [''] * policy.ec_n_unique_fragments
yield [b''] * policy.ec_n_unique_fragments
def trailing_metadata(policy, client_obj_hasher,
@ -2542,7 +2542,7 @@ class ECObjectController(BaseObjectController):
# (i.e. 1:[0] at first)
# Grouping all missing fragment indexes for each frag_index
available_indexes = putter_to_frag_index.values()
available_indexes = list(putter_to_frag_index.values())
lack_list = collections.defaultdict(list)
for frag_index in range(policy.ec_n_unique_fragments):
# Set the missing index to lack_list
@ -2643,7 +2643,7 @@ class ECObjectController(BaseObjectController):
self.app.logger.increment('client_disconnects')
raise HTTPClientDisconnect(request=req)
send_chunk('') # flush out any buffered data
send_chunk(b'') # flush out any buffered data
computed_etag = (etag_hasher.hexdigest()
if etag_hasher else None)

View File

@ -402,17 +402,17 @@ class FakeMemcache(object):
def readuntil2crlfs(fd):
rv = ''
lc = ''
rv = b''
lc = b''
crlfs = 0
while crlfs < 2:
c = fd.read(1)
if not c:
raise ValueError("didn't get two CRLFs; just got %r" % rv)
rv = rv + c
if c == '\r' and lc != '\n':
if c == b'\r' and lc != b'\n':
crlfs = 0
if lc == '\r' and c == '\n':
if lc == b'\r' and c == b'\n':
crlfs += 1
lc = c
return rv

View File

@ -20,12 +20,12 @@ from swift.common.middleware.crypto.crypto_utils import Crypto
def fetch_crypto_keys(key_id=None):
id_to_keys = {None: {'account': 'This is an account key 012345678',
'container': 'This is a container key 01234567',
'object': 'This is an object key 0123456789'},
'myid': {'account': 'This is an account key 123456789',
'container': 'This is a container key 12345678',
'object': 'This is an object key 1234567890'}}
id_to_keys = {None: {'account': b'This is an account key 012345678',
'container': b'This is a container key 01234567',
'object': b'This is an object key 0123456789'},
'myid': {'account': b'This is an account key 123456789',
'container': b'This is a container key 12345678',
'object': b'This is an object key 1234567890'}}
key_id = key_id or {}
secret_id = key_id.get('secret_id') or None
try:
@ -57,7 +57,7 @@ def decrypt(key, iv, enc_val):
return dec_val
FAKE_IV = "This is an IV123"
FAKE_IV = b"This is an IV123"
# do not use this example encryption_root_secret in production, use a randomly
# generated value with high entropy
TEST_KEYMASTER_CONF = {

View File

@ -84,20 +84,22 @@ class TestCryptoWsgiContext(unittest.TestCase):
def test_get_keys_missing_callback(self):
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn('missing callback',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_callback_exception(self):
def callback(*args, **kwargs):
raise Exception('boom')
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys({CRYPTO_KEY_CALLBACK: callback})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn('from callback: boom',
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
@ -105,10 +107,11 @@ class TestCryptoWsgiContext(unittest.TestCase):
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_object_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
@ -117,10 +120,11 @@ class TestCryptoWsgiContext(unittest.TestCase):
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Missing key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_missing_container_key_for_specified_required_list(self):
bad_keys = dict(fetch_crypto_keys())
@ -129,43 +133,47 @@ class TestCryptoWsgiContext(unittest.TestCase):
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Missing key for 'container'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_bad_object_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys['object'] = 'the minor key'
bad_keys['object'] = b'the minor key'
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Bad key for 'object'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_bad_container_key_for_default_required_list(self):
bad_keys = dict(fetch_crypto_keys())
bad_keys['container'] = 'the major key'
bad_keys['container'] = b'the major key'
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK: lambda *args, **kwargs: bad_keys},
required=['object', 'container'])
self.assertIn('500 Internal Error', cm.exception.message)
self.assertIn('500 Internal Error', cm.exception.status)
self.assertIn("Bad key for 'container'",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_keys_not_a_dict(self):
with self.assertRaises(HTTPException) as cm:
self.crypto_context.get_keys(
{CRYPTO_KEY_CALLBACK:
lambda *args, **kwargs: ['key', 'quay', 'qui']})
self.assertIn('500 Internal Error', cm.exception.message)
self.assertEqual('500 Internal Error', cm.exception.status)
self.assertIn("Did not get a keys dict",
self.fake_logger.get_lines_for_level('error')[0])
self.assertIn('Unable to retrieve encryption keys.', cm.exception.body)
self.assertIn(b'Unable to retrieve encryption keys.',
cm.exception.body)
def test_get_multiple_keys(self):
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
@ -177,13 +185,13 @@ class TestCryptoWsgiContext(unittest.TestCase):
class TestModuleMethods(unittest.TestCase):
meta = {'iv': '0123456789abcdef', 'cipher': 'AES_CTR_256'}
meta = {'iv': b'0123456789abcdef', 'cipher': 'AES_CTR_256'}
serialized_meta = '%7B%22cipher%22%3A+%22AES_CTR_256%22%2C+%22' \
'iv%22%3A+%22MDEyMzQ1Njc4OWFiY2RlZg%3D%3D%22%7D'
meta_with_key = {'iv': '0123456789abcdef', 'cipher': 'AES_CTR_256',
'body_key': {'key': 'fedcba9876543210fedcba9876543210',
'iv': 'fedcba9876543210'}}
meta_with_key = {'iv': b'0123456789abcdef', 'cipher': 'AES_CTR_256',
'body_key': {'key': b'fedcba9876543210fedcba9876543210',
'iv': b'fedcba9876543210'}}
serialized_meta_with_key = '%7B%22body_key%22%3A+%7B%22iv%22%3A+%22ZmVkY' \
'2JhOTg3NjU0MzIxMA%3D%3D%22%2C+%22key%22%3A+%' \
'22ZmVkY2JhOTg3NjU0MzIxMGZlZGNiYTk4NzY1NDMyMT' \
@ -208,30 +216,44 @@ class TestModuleMethods(unittest.TestCase):
def assert_raises(value, message):
with self.assertRaises(EncryptionException) as cm:
crypto_utils.load_crypto_meta(value)
self.assertIn('Bad crypto meta %r' % value, cm.exception.message)
self.assertIn(message, cm.exception.message)
self.assertIn('Bad crypto meta %r' % value, cm.exception.args[0])
if isinstance(message, (tuple, list)):
for opt in message:
if opt in cm.exception.args[0]:
break
else:
self.fail('Expected to find one of %r in %r' % (
message, cm.exception.args[0]))
else:
self.assertIn(message, cm.exception.args[0])
assert_raises(None, 'crypto meta not a string')
assert_raises(99, 'crypto meta not a string')
assert_raises('', 'No JSON object could be decoded')
assert_raises('abc', 'No JSON object could be decoded')
assert_raises('', ('No JSON object could be decoded',
'Expecting value: line 1 column 1'))
assert_raises('abc', ('No JSON object could be decoded',
'Expecting value: line 1 column 1'))
assert_raises('[]', 'crypto meta not a Mapping')
bad_type_messages = [
'must be string or buffer',
'argument should be a bytes-like object or ASCII string',
]
assert_raises('{"iv": "abcdef"}', 'Incorrect padding')
assert_raises('{"iv": []}', 'must be string or buffer')
assert_raises('{"iv": {}}', 'must be string or buffer')
assert_raises('{"iv": 99}', 'must be string or buffer')
assert_raises('{"iv": []}', bad_type_messages)
assert_raises('{"iv": {}}', bad_type_messages)
assert_raises('{"iv": 99}', bad_type_messages)
assert_raises('{"key": "abcdef"}', 'Incorrect padding')
assert_raises('{"key": []}', 'must be string or buffer')
assert_raises('{"key": {}}', 'must be string or buffer')
assert_raises('{"key": 99}', 'must be string or buffer')
assert_raises('{"key": []}', bad_type_messages)
assert_raises('{"key": {}}', bad_type_messages)
assert_raises('{"key": 99}', bad_type_messages)
assert_raises('{"body_key": {"iv": "abcdef"}}', 'Incorrect padding')
assert_raises('{"body_key": {"iv": []}}', 'must be string or buffer')
assert_raises('{"body_key": {"iv": {}}}', 'must be string or buffer')
assert_raises('{"body_key": {"iv": 99}}', 'must be string or buffer')
assert_raises('{"body_key": {"iv": []}}', bad_type_messages)
assert_raises('{"body_key": {"iv": {}}}', bad_type_messages)
assert_raises('{"body_key": {"iv": 99}}', bad_type_messages)
assert_raises('{"body_key": {"key": "abcdef"}}', 'Incorrect padding')
assert_raises('{"body_key": {"key": []}}', 'must be string or buffer')
assert_raises('{"body_key": {"key": {}}}', 'must be string or buffer')
assert_raises('{"body_key": {"key": 99}}', 'must be string or buffer')
assert_raises('{"body_key": {"key": []}}', bad_type_messages)
assert_raises('{"body_key": {"key": {}}}', bad_type_messages)
assert_raises('{"body_key": {"key": 99}}', bad_type_messages)
def test_dump_then_load_crypto_meta(self):
actual = crypto_utils.load_crypto_meta(
@ -289,7 +311,7 @@ class TestCrypto(unittest.TestCase):
self.crypto = Crypto({})
def test_create_encryption_context(self):
value = 'encrypt me' * 100 # more than one cipher block
value = b'encrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_encryption_ctxt(key, iv)
@ -298,16 +320,16 @@ class TestCrypto(unittest.TestCase):
backend=default_backend()).encryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
for bad_iv in (b'a little too long', b'too short'):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, key, bad_iv)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
for bad_key in (b'objKey', b'a' * 31, b'a' * 33, b'a' * 16, b'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_encryption_ctxt, bad_key, iv)
def test_create_decryption_context(self):
value = 'decrypt me' * 100 # more than one cipher block
value = b'decrypt me' * 100 # more than one cipher block
key = os.urandom(32)
iv = os.urandom(16)
ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
@ -316,51 +338,50 @@ class TestCrypto(unittest.TestCase):
backend=default_backend()).decryptor().update(value)
self.assertEqual(expected, ctxt.update(value))
for bad_iv in ('a little too long', 'too short'):
for bad_iv in (b'a little too long', b'too short'):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, key, bad_iv, 0)
for bad_key in ('objKey', 'a' * 31, 'a' * 33, 'a' * 16, 'a' * 24):
for bad_key in (b'objKey', b'a' * 31, b'a' * 33, b'a' * 16, b'a' * 24):
self.assertRaises(
ValueError, self.crypto.create_decryption_ctxt, bad_key, iv, 0)
with self.assertRaises(ValueError) as cm:
self.crypto.create_decryption_ctxt(key, iv, -1)
self.assertEqual("Offset must not be negative", cm.exception.message)
self.assertEqual("Offset must not be negative", cm.exception.args[0])
def test_enc_dec_small_chunks(self):
self.enc_dec_chunks(['encrypt me', 'because I', 'am sensitive'])
self.enc_dec_chunks([b'encrypt me', b'because I', b'am sensitive'])
def test_enc_dec_large_chunks(self):
self.enc_dec_chunks([os.urandom(65536), os.urandom(65536)])
def enc_dec_chunks(self, chunks):
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
self.assertTrue(b''.join(enc_val) != chunks)
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 0)
dec_val = [dec_ctxt.update(chunk) for chunk in enc_val]
self.assertEqual(''.join(chunks), ''.join(dec_val),
self.assertEqual(b''.join(chunks), b''.join(dec_val),
'Expected value {%s} but got {%s}' %
(''.join(chunks), ''.join(dec_val)))
(b''.join(chunks), b''.join(dec_val)))
def test_decrypt_range(self):
chunks = ['0123456789abcdef', 'ghijklmnopqrstuv']
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
chunks = [b'0123456789abcdef', b'ghijklmnopqrstuv']
key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
iv = self.crypto.create_iv()
enc_ctxt = self.crypto.create_encryption_ctxt(key, iv)
enc_val = [enc_ctxt.update(chunk) for chunk in chunks]
self.assertTrue(''.join(enc_val) != chunks)
# Simulate a ranged GET from byte 19 to 32 : 'jklmnopqrstuv'
dec_ctxt = self.crypto.create_decryption_ctxt(key, iv, 19)
ranged_chunks = [enc_val[1][3:]]
dec_val = [dec_ctxt.update(chunk) for chunk in ranged_chunks]
self.assertEqual('jklmnopqrstuv', ''.join(dec_val),
self.assertEqual(b'jklmnopqrstuv', b''.join(dec_val),
'Expected value {%s} but got {%s}' %
('jklmnopqrstuv', ''.join(dec_val)))
(b'jklmnopqrstuv', b''.join(dec_val)))
def test_create_decryption_context_non_zero_offset(self):
# Verify that iv increments for each 16 bytes of offset.
@ -371,7 +392,7 @@ class TestCrypto(unittest.TestCase):
# body, until it reaches 2^128 -1 when it should wrap to zero. We check
# that is happening by verifying a decrypted value using various
# offsets.
key = 'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
key = b'objL7wjV6L79Sfs4y7dy41273l0k6Wki'
def do_test():
for offset, exp_iv in mappings.items():
@ -381,55 +402,55 @@ class TestCrypto(unittest.TestCase):
modes.CTR(exp_iv),
backend=default_backend())
expected = cipher.decryptor().update(
'p' * offset_in_block + 'ciphertext')
actual = dec_ctxt.update('ciphertext')
b'p' * offset_in_block + b'ciphertext')
actual = dec_ctxt.update(b'ciphertext')
expected = expected[offset % 16:]
self.assertEqual(expected, actual,
'Expected %r but got %r, iv=%s and offset=%s'
% (expected, actual, iv, offset))
iv = '0000000010000000'
iv = b'0000000010000000'
mappings = {
2: '0000000010000000',
16: '0000000010000001',
19: '0000000010000001',
48: '0000000010000003',
1024: '000000001000000p',
5119: '000000001000001o'
2: b'0000000010000000',
16: b'0000000010000001',
19: b'0000000010000001',
48: b'0000000010000003',
1024: b'000000001000000p',
5119: b'000000001000001o'
}
do_test()
# choose max iv value and test that it wraps to zero
iv = chr(0xff) * 16
iv = b'\xff' * 16
mappings = {
2: iv,
16: str(bytearray.fromhex('00' * 16)), # iv wraps to 0
19: str(bytearray.fromhex('00' * 16)),
48: str(bytearray.fromhex('00' * 15 + '02')),
1024: str(bytearray.fromhex('00' * 15 + '3f')),
5119: str(bytearray.fromhex('00' * 14 + '013E'))
16: bytes(bytearray.fromhex('00' * 16)), # iv wraps to 0
19: bytes(bytearray.fromhex('00' * 16)),
48: bytes(bytearray.fromhex('00' * 15 + '02')),
1024: bytes(bytearray.fromhex('00' * 15 + '3f')),
5119: bytes(bytearray.fromhex('00' * 14 + '013E'))
}
do_test()
iv = chr(0x0) * 16
iv = b'\x00' * 16
mappings = {
2: iv,
16: str(bytearray.fromhex('00' * 15 + '01')),
19: str(bytearray.fromhex('00' * 15 + '01')),
48: str(bytearray.fromhex('00' * 15 + '03')),
1024: str(bytearray.fromhex('00' * 15 + '40')),
5119: str(bytearray.fromhex('00' * 14 + '013F'))
16: bytes(bytearray.fromhex('00' * 15 + '01')),
19: bytes(bytearray.fromhex('00' * 15 + '01')),
48: bytes(bytearray.fromhex('00' * 15 + '03')),
1024: bytes(bytearray.fromhex('00' * 15 + '40')),
5119: bytes(bytearray.fromhex('00' * 14 + '013F'))
}
do_test()
iv = chr(0x0) * 8 + chr(0xff) * 8
iv = b'\x00' * 8 + b'\xff' * 8
mappings = {
2: iv,
16: str(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
19: str(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
48: str(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '02')),
1024: str(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '3F')),
5119: str(bytearray.fromhex('00' * 7 + '01' + '00' * 6 + '013E'))
16: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
19: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 8)),
48: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '02')),
1024: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 7 + '3F')),
5119: bytes(bytearray.fromhex('00' * 7 + '01' + '00' * 6 + '013E'))
}
do_test()
@ -438,33 +459,33 @@ class TestCrypto(unittest.TestCase):
with self.assertRaises(ValueError) as cm:
self.crypto.check_key(key)
self.assertEqual("Key must be length 32 bytes",
cm.exception.message)
cm.exception.args[0])
def test_check_crypto_meta(self):
meta = {'cipher': 'AES_CTR_256'}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'iv'",
cm.exception.message)
cm.exception.args[0])
for bad_iv in ('a little too long', 'too short'):
meta['iv'] = bad_iv
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: IV must be length 16 bytes",
cm.exception.message)
cm.exception.args[0])
meta = {'iv': os.urandom(16)}
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Missing 'cipher'",
cm.exception.message)
cm.exception.args[0])
meta['cipher'] = 'Mystery cipher'
with self.assertRaises(EncryptionException) as cm:
self.crypto.check_crypto_meta(meta)
self.assertEqual("Bad crypto meta: Cipher must be AES_CTR_256",
cm.exception.message)
cm.exception.args[0])
def test_create_iv(self):
self.assertEqual(16, len(self.crypto.create_iv()))
@ -520,7 +541,7 @@ class TestCrypto(unittest.TestCase):
with self.assertRaises(ValueError) as cm:
self.crypto.unwrap_key(wrapping_key, wrapped)
self.assertEqual(
cm.exception.message, 'Key must be length 32 bytes')
cm.exception.args[0], 'Key must be length 32 bytes')
if __name__ == '__main__':

View File

@ -26,7 +26,7 @@ from swift.common.middleware.crypto import decrypter
from swift.common.middleware.crypto.crypto_utils import CRYPTO_KEY_CALLBACK, \
dump_crypto_meta, Crypto, load_crypto_meta
from swift.common.swob import Request, HTTPException, HTTPOk, \
HTTPPreconditionFailed, HTTPNotFound, HTTPPartialContent
HTTPPreconditionFailed, HTTPNotFound, HTTPPartialContent, bytes_to_wsgi
from test.unit import FakeLogger
from test.unit.common.middleware.crypto.crypto_helpers import md5hex, \
@ -41,8 +41,10 @@ def get_crypto_meta_header(crypto_meta=None):
def encrypt_and_append_meta(value, key, crypto_meta=None):
if not isinstance(value, bytes):
value = value.encode('ascii')
return '%s; swift_meta=%s' % (
base64.b64encode(encrypt(value, key, FAKE_IV)),
base64.b64encode(encrypt(value, key, FAKE_IV)).decode('ascii'),
get_crypto_meta_header(crypto_meta))
@ -70,14 +72,16 @@ class TestDecrypterObjectRequests(unittest.TestCase):
'content-type': 'text/plain',
'content-length': content_length,
'X-Object-Sysmeta-Crypto-Etag': '%s; swift_meta=%s' % (
base64.b64encode(encrypt(plaintext_etag, object_key, FAKE_IV)),
bytes_to_wsgi(base64.b64encode(encrypt(
plaintext_etag.encode('ascii'), object_key, FAKE_IV))),
get_crypto_meta_header(other_crypto_meta)),
'X-Object-Sysmeta-Crypto-Body-Meta':
get_crypto_meta_header(body_crypto_meta),
'X-Object-Transient-Sysmeta-Crypto-Meta':
get_crypto_meta_header(other_crypto_meta),
'x-object-transient-sysmeta-crypto-meta-test':
base64.b64encode(encrypt('encrypt me', object_key, FAKE_IV)) +
bytes_to_wsgi(base64.b64encode(encrypt(
b'encrypt me', object_key, FAKE_IV))) +
';swift_meta=' + get_crypto_meta_header(other_crypto_meta),
'x-object-sysmeta-container-update-override-etag':
encrypt_and_append_meta('encrypt me, too', cont_key),
@ -124,7 +128,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
return resp
def test_GET_success(self):
body = 'FAKE APP'
body = b'FAKE APP'
resp = self._test_request_success('GET', body)
self.assertEqual(body, resp.body)
@ -137,17 +141,17 @@ class TestDecrypterObjectRequests(unittest.TestCase):
self.assertEqual(body, resp.body)
def test_HEAD_success(self):
body = 'FAKE APP'
body = b'FAKE APP'
resp = self._test_request_success('HEAD', body)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
key_id_val = {'secret_id': 'myid'}
resp = self._test_request_success('HEAD', body, key_id=key_id_val)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
key_id_val = {'secret_id': ''}
resp = self._test_request_success('HEAD', body, key_id=key_id_val)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def _check_different_keys_for_data_and_metadata(self, method):
env = {'REQUEST_METHOD': method,
@ -155,7 +159,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
req = Request.blank('/v1/a/c/o', environ=env)
data_key_id = {}
metadata_key_id = {'secret_id': 'myid'}
body = 'object data'
body = b'object data'
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
@ -188,17 +192,17 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_different_keys_for_data_and_metadata(self):
resp = self._check_different_keys_for_data_and_metadata('GET')
self.assertEqual('object data', resp.body)
self.assertEqual(b'object data', resp.body)
def test_HEAD_different_keys_for_data_and_metadata(self):
resp = self._check_different_keys_for_data_and_metadata('HEAD')
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def _check_unencrypted_data_and_encrypted_metadata(self, method):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'object data'
body = b'object data'
plaintext_etag = md5hex(body)
metadata_key = fetch_crypto_keys()
# synthesise headers for unencrypted PUT + headers for encrypted POST
@ -226,26 +230,26 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_unencrypted_data_and_encrypted_metadata(self):
resp = self._check_unencrypted_data_and_encrypted_metadata('GET')
self.assertEqual('object data', resp.body)
self.assertEqual(b'object data', resp.body)
def test_HEAD_unencrypted_data_and_encrypted_metadata(self):
resp = self._check_unencrypted_data_and_encrypted_metadata('HEAD')
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def _check_encrypted_data_and_unencrypted_metadata(self, method):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'object data'
body = b'object data'
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
data_key = fetch_crypto_keys()
hdrs = self._make_response_headers(
len(enc_body), plaintext_etag, data_key, body_key)
for k, v in hdrs.items():
if is_object_transient_sysmeta(k):
hdrs.pop(k)
to_remove = [k for k in hdrs if is_object_transient_sysmeta(k)]
for k in to_remove:
hdrs.pop(k)
hdrs['x-object-meta-test'] = 'unencrypted'
self.app.register(
@ -259,14 +263,14 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_encrypted_data_and_unencrypted_metadata(self):
resp = self._check_encrypted_data_and_unencrypted_metadata('GET')
self.assertEqual('object data', resp.body)
self.assertEqual(b'object data', resp.body)
def test_HEAD_encrypted_data_and_unencrypted_metadata(self):
resp = self._check_encrypted_data_and_unencrypted_metadata('HEAD')
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def test_headers_case(self):
body = 'fAkE ApP'
body = b'fAkE ApP'
req = Request.blank('/v1/a/c/o', body='FaKe')
req.environ[CRYPTO_KEY_CALLBACK] = fetch_crypto_keys
plaintext_etag = md5hex(body)
@ -294,23 +298,23 @@ class TestDecrypterObjectRequests(unittest.TestCase):
'Content-Type': 'text/plain',
}
self.assertEqual(dict(headers), expected)
self.assertEqual('fAkE ApP', ''.join(app_iter))
self.assertEqual(b'fAkE ApP', b''.join(app_iter))
def _test_412_response(self, method):
# simulate a 412 response to a conditional GET which has an Etag header
data = 'the object content'
data = b'the object content'
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env, method=method)
resp_body = 'I am sorry, you have failed to meet a precondition'
resp_body = b'I am sorry, you have failed to meet a precondition'
hdrs = self._make_response_headers(
len(resp_body), md5hex(data), fetch_crypto_keys(), 'not used')
len(resp_body), md5hex(data), fetch_crypto_keys(), b'not used')
self.app.register(method, '/v1/a/c/o', HTTPPreconditionFailed,
body=resp_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('412 Precondition Failed', resp.status)
# the response body should not be decrypted, it is already plaintext
self.assertEqual(resp_body if method == 'GET' else '', resp.body)
self.assertEqual(resp_body if method == 'GET' else b'', resp.body)
# whereas the Etag and other headers should be decrypted
self.assertEqual(md5hex(data), resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
@ -328,7 +332,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
# simulate a 404 response, sanity check response headers
env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env, method=method)
resp_body = 'You still have not found what you are looking for'
resp_body = b'You still have not found what you are looking for'
hdrs = {'content-type': 'text/plain',
'content-length': len(resp_body)}
self.app.register(method, '/v1/a/c/o', HTTPNotFound,
@ -337,7 +341,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
self.assertEqual('404 Not Found', resp.status)
# the response body should not be decrypted, it is already plaintext
self.assertEqual(resp_body if method == 'GET' else '', resp.body)
self.assertEqual(resp_body if method == 'GET' else b'', resp.body)
# there should be no etag header inserted by decrypter
self.assertNotIn('Etag', resp.headers)
self.assertEqual('text/plain', resp.headers['Content-Type'])
@ -352,19 +356,19 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
len(body), md5hex(body), fetch_crypto_keys(), b'not used')
# simulate missing crypto meta from encrypted etag
hdrs['X-Object-Sysmeta-Crypto-Etag'] = \
base64.b64encode(encrypt(md5hex(body), key, FAKE_IV))
hdrs['X-Object-Sysmeta-Crypto-Etag'] = bytes_to_wsgi(base64.b64encode(
encrypt(md5hex(body).encode('ascii'), key, FAKE_IV)))
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertIn('Error decrypting header', resp.body)
self.assertIn(b'Error decrypting header', resp.body)
self.assertIn('Error decrypting header X-Object-Sysmeta-Crypto-Etag',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -372,11 +376,11 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
len(body), md5hex(body), fetch_crypto_keys(), b'not used')
# simulate missing crypto meta from encrypted override etag
hdrs['X-Object-Sysmeta-Container-Update-Override-Etag'] = \
encrypt_and_append_meta(
@ -392,43 +396,43 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_override_etag_bad_iv(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
bad_crypto_meta['iv'] = b'bad_iv'
resp = self._test_override_etag_bad_meta('GET', bad_crypto_meta)
self.assertIn('Error decrypting header', resp.body)
self.assertIn(b'Error decrypting header', resp.body)
def test_HEAD_override_etag_bad_iv(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
bad_crypto_meta['iv'] = b'bad_iv'
resp = self._test_override_etag_bad_meta('HEAD', bad_crypto_meta)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def test_GET_override_etag_bad_cipher(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown cipher'
resp = self._test_override_etag_bad_meta('GET', bad_crypto_meta)
self.assertIn('Error decrypting header', resp.body)
self.assertIn(b'Error decrypting header', resp.body)
def test_HEAD_override_etag_bad_cipher(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown cipher'
resp = self._test_override_etag_bad_meta('HEAD', bad_crypto_meta)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
def _test_bad_key(self, method):
# use bad key
def bad_fetch_crypto_keys(**kwargs):
keys = fetch_crypto_keys()
keys['object'] = 'bad key'
keys['object'] = b'bad key'
return keys
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: bad_fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
len(body), md5hex(body), fetch_crypto_keys(), b'not used')
self.app.register(method, '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
return req.get_response(self.decrypter)
@ -442,7 +446,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_with_bad_key(self):
resp = self._test_bad_key('GET')
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
self.assertEqual(b'Unable to retrieve encryption keys.',
resp.body)
self.assertIn("Bad key for 'object'",
self.decrypter.logger.get_lines_for_level('error')[0])
@ -452,12 +456,14 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
enc_val = base64.b64encode(encrypt('encrypt me', key, FAKE_IV))
len(body), md5hex(body),
fetch_crypto_keys(), b'not used')
enc_val = base64.b64encode(encrypt(
b'encrypt me', key, FAKE_IV)).decode('ascii')
if bad_crypto_meta:
enc_val += ';swift_meta=' + get_crypto_meta_header(
crypto_meta=bad_crypto_meta)
@ -483,7 +489,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_HEAD_with_bad_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
bad_crypto_meta['iv'] = b'bad_iv'
self._test_bad_crypto_meta_for_user_metadata('HEAD', bad_crypto_meta)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -497,10 +503,10 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_with_bad_iv_for_user_metadata(self):
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['iv'] = 'bad_iv'
bad_crypto_meta['iv'] = b'bad_iv'
resp = self._test_bad_crypto_meta_for_user_metadata(
'GET', bad_crypto_meta)
self.assertEqual('Error decrypting header', resp.body)
self.assertEqual(b'Error decrypting header', resp.body)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -509,7 +515,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
bad_crypto_meta.pop('iv')
resp = self._test_bad_crypto_meta_for_user_metadata(
'GET', bad_crypto_meta)
self.assertEqual('Error decrypting header', resp.body)
self.assertEqual(b'Error decrypting header', resp.body)
self.assertIn(
'iv', self.decrypter.logger.get_lines_for_level('error')[0])
@ -518,24 +524,24 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
len(body), md5hex(body), fetch_crypto_keys(), b'not used')
hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] = \
get_crypto_meta_header(crypto_meta=bad_crypto_meta)
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=enc_body,
headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting object', resp.body)
self.assertEqual(b'Error decrypting object', resp.body)
self.assertIn('Error decrypting object',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_iv_for_object_body(self):
bad_crypto_meta = fake_get_crypto_meta(key=os.urandom(32))
bad_crypto_meta['iv'] = 'bad_iv'
bad_crypto_meta['iv'] = b'bad_iv'
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -548,7 +554,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_with_bad_body_key_for_object_body(self):
body_key_meta = {'key': 'wrapped too short key', 'iv': FAKE_IV}
body_key_meta = {'key': b'wrapped too short key', 'iv': FAKE_IV}
bad_crypto_meta = fake_get_crypto_meta(body_key=body_key_meta)
self._test_GET_with_bad_crypto_meta_for_object_body(bad_crypto_meta)
self.assertIn('Key must be length 32',
@ -566,7 +572,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
enc_body = encrypt(body, body_key, FAKE_IV)
@ -595,13 +601,14 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
obj_key = fetch_crypto_keys()['object']
hdrs = {'Etag': md5hex(body),
'content-type': 'text/plain',
'content-length': len(body),
'x-object-transient-sysmeta-crypto-meta-test':
base64.b64encode(encrypt('encrypt me', obj_key, FAKE_IV)) +
bytes_to_wsgi(base64.b64encode(encrypt(
b'encrypt me', obj_key, FAKE_IV))) +
';swift_meta=' + get_crypto_meta_header(),
'x-object-sysmeta-test': 'do not encrypt me'}
self.app.register('GET', '/v1/a/c/o', HTTPOk, body=body, headers=hdrs)
@ -620,8 +627,8 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
chunks = ['some', 'chunks', 'of data']
body = ''.join(chunks)
chunks = [b'some', b'chunks', b'of data']
body = b''.join(chunks)
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
ctxt = Crypto().create_encryption_ctxt(body_key, FAKE_IV)
@ -642,8 +649,8 @@ class TestDecrypterObjectRequests(unittest.TestCase):
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
req.headers['Content-Range'] = 'bytes 3-10/17'
chunks = ['0123', '45678', '9abcdef']
body = ''.join(chunks)
chunks = [b'0123', b'45678', b'9abcdef']
body = b''.join(chunks)
plaintext_etag = md5hex(body)
body_key = os.urandom(32)
ctxt = Crypto().create_encryption_ctxt(body_key, FAKE_IV)
@ -656,7 +663,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('3456789a', resp.body)
self.assertEqual(b'3456789a', resp.body)
self.assertEqual('200 OK', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual('text/plain', resp.headers['Content-Type'])
@ -670,20 +677,20 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_multipart_ciphertext(self):
# build fake multipart response body
body_key = os.urandom(32)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext = b'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(ciphertext)
body = ''
body = b''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + ciphertext[start:end] + '\r\n'
body += '--multipartboundary--'
body += b'--multipartboundary\r\n'
body += b'Content-Type: %s\r\n' % ctype.encode('utf-8')
body += b'Content-Range: bytes %d-%d/%d' % (start, end - 1, length)
body += b'\r\n\r\n' + ciphertext[start:end] + b'\r\n'
body += b'--multipartboundary--'
# register request with fake swift
hdrs = self._make_response_headers(
@ -707,18 +714,18 @@ class TestDecrypterObjectRequests(unittest.TestCase):
# the multipart headers could be re-ordered, so parse response body to
# verify expected content
resp_lines = resp.body.split('\r\n')
resp_lines = resp.body.split(b'\r\n')
resp_lines.reverse()
for start, end, ctype in parts:
self.assertEqual('--multipartboundary', resp_lines.pop())
self.assertEqual(b'--multipartboundary', resp_lines.pop())
expected_header_lines = {
'Content-Type: %s' % ctype,
'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)}
b'Content-Type: %s' % ctype.encode('utf8'),
b'Content-Range: bytes %d-%d/%d' % (start, end - 1, length)}
resp_header_lines = {resp_lines.pop(), resp_lines.pop()}
self.assertEqual(expected_header_lines, resp_header_lines)
self.assertEqual('', resp_lines.pop())
self.assertEqual(b'', resp_lines.pop())
self.assertEqual(plaintext[start:end], resp_lines.pop())
self.assertEqual('--multipartboundary--', resp_lines.pop())
self.assertEqual(b'--multipartboundary--', resp_lines.pop())
# we should have consumed the whole response body
self.assertFalse(resp_lines)
@ -727,7 +734,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
# *just* having multipart content type shouldn't trigger the mime doc
# code path
body_key = os.urandom(32)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext = b'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
@ -754,19 +761,19 @@ class TestDecrypterObjectRequests(unittest.TestCase):
def test_GET_multipart_no_body_crypto_meta(self):
# build fake multipart response body
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext = b'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(plaintext)
body = ''
body = b''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + plaintext[start:end] + '\r\n'
body += '--multipartboundary--'
body += b'--multipartboundary\r\n'
body += b'Content-Type: %s\r\n' % ctype.encode('utf-8')
body += b'Content-Range: bytes %d-%d/%d' % (start, end - 1, length)
body += b'\r\n\r\n' + plaintext[start:end] + b'\r\n'
body += b'--multipartboundary--'
# register request with fake swift
hdrs = {
@ -795,24 +802,24 @@ class TestDecrypterObjectRequests(unittest.TestCase):
# build fake multipart response body
key = fetch_crypto_keys()['object']
ctxt = Crypto().create_encryption_ctxt(key, FAKE_IV)
plaintext = 'Cwm fjord veg balks nth pyx quiz'
plaintext = b'Cwm fjord veg balks nth pyx quiz'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, ctxt=ctxt)
parts = ((0, 3, 'text/plain'),
(4, 9, 'text/plain; charset=us-ascii'),
(24, 32, 'text/plain'))
length = len(ciphertext)
body = ''
body = b''
for start, end, ctype in parts:
body += '--multipartboundary\r\n'
body += 'Content-Type: %s\r\n' % ctype
body += 'Content-Range: bytes %s-%s/%s' % (start, end - 1, length)
body += '\r\n\r\n' + ciphertext[start:end] + '\r\n'
body += '--multipartboundary--'
body += b'--multipartboundary\r\n'
body += b'Content-Type: %s\r\n' % ctype.encode('utf-8')
body += b'Content-Range: bytes %d-%d/%d' % (start, end - 1, length)
body += b'\r\n\r\n' + ciphertext[start:end] + b'\r\n'
body += b'--multipartboundary--'
# register request with fake swift
hdrs = self._make_response_headers(
len(body), plaintext_etag, fetch_crypto_keys(), 'not used')
len(body), plaintext_etag, fetch_crypto_keys(), b'not used')
hdrs['content-type'] = \
'multipart/byteranges;boundary=multipartboundary'
hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] = \
@ -826,31 +833,31 @@ class TestDecrypterObjectRequests(unittest.TestCase):
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting object', resp.body)
self.assertEqual(b'Error decrypting object', resp.body)
self.assertIn('Error decrypting object',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_bad_body_cipher(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'Mystery cipher', 'iv': '1234567887654321'})
{'cipher': 'Mystery cipher', 'iv': b'1234567887654321'})
self.assertIn('Cipher must be AES_CTR_256',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_missing_body_cipher(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'iv': '1234567887654321'})
{'iv': b'1234567887654321'})
self.assertIn('cipher',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_too_short_body_iv(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'AES_CTR_256', 'iv': 'too short'})
{'cipher': 'AES_CTR_256', 'iv': b'too short'})
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
def test_GET_multipart_too_long_body_iv(self):
self._test_GET_multipart_bad_body_crypto_meta(
{'cipher': 'AES_CTR_256', 'iv': 'a little too long'})
{'cipher': 'AES_CTR_256', 'iv': b'a little too long'})
self.assertIn('IV must be length 16',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -864,15 +871,16 @@ class TestDecrypterObjectRequests(unittest.TestCase):
# Do not provide keys, and do not set override flag
env = {'REQUEST_METHOD': 'GET'}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex('not the body'), fetch_crypto_keys(), 'not used')
len(body), md5hex(b'not the body'),
fetch_crypto_keys(), b'not used')
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
self.assertEqual(b'Unable to retrieve encryption keys.',
resp.body)
self.assertIn('missing callback',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -884,15 +892,15 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: raise_exc}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
hdrs = self._make_response_headers(
len(body), md5hex(body), fetch_crypto_keys(), 'not used')
len(body), md5hex(body), fetch_crypto_keys(), b'not used')
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Unable to retrieve encryption keys.',
self.assertEqual(b'Unable to retrieve encryption keys.',
resp.body)
self.assertIn('from callback: Testing',
self.decrypter.logger.get_lines_for_level('error')[0])
@ -902,19 +910,19 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
enc_body = encrypt(body, fetch_crypto_keys()['object'], FAKE_IV)
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
hdrs = self._make_response_headers(
len(enc_body), md5hex(body), fetch_crypto_keys(), 'not used')
len(enc_body), md5hex(body), fetch_crypto_keys(), b'not used')
hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] = \
get_crypto_meta_header(crypto_meta=bad_crypto_meta)
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting object', resp.body)
self.assertEqual(b'Error decrypting object', resp.body)
self.assertIn('Error decrypting object',
self.decrypter.logger.get_lines_for_level('error')[0])
self.assertIn('Bad crypto meta: Cipher',
@ -925,22 +933,24 @@ class TestDecrypterObjectRequests(unittest.TestCase):
env = {'REQUEST_METHOD': 'GET',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
key = fetch_crypto_keys()['object']
enc_body = encrypt(body, key, FAKE_IV)
bad_crypto_meta = fake_get_crypto_meta()
bad_crypto_meta['cipher'] = 'unknown_cipher'
hdrs = self._make_response_headers(
len(enc_body), md5hex(body), fetch_crypto_keys(), 'not used')
len(enc_body), md5hex(body), fetch_crypto_keys(), b'not used')
enc_val = bytes_to_wsgi(base64.b64encode(
encrypt(b'encrypt me', key, FAKE_IV)))
hdrs.update({'x-object-transient-sysmeta-crypto-meta-test':
base64.b64encode(encrypt('encrypt me', key, FAKE_IV)) +
';swift_meta=' +
enc_val + ';swift_meta=' +
get_crypto_meta_header(crypto_meta=bad_crypto_meta)})
self.app.register(
'GET', '/v1/a/c/o', HTTPOk, body=enc_body, headers=hdrs)
resp = req.get_response(self.decrypter)
self.assertEqual('500 Internal Error', resp.status)
self.assertEqual('Error decrypting header', resp.body)
self.assertEqual(b'Error decrypting header', resp.body)
self.assertIn(
'Error decrypting header X-Object-Transient-Sysmeta-Crypto-Meta-'
'Test', self.decrypter.logger.get_lines_for_level('error')[0])
@ -951,7 +961,7 @@ class TestDecrypterObjectRequests(unittest.TestCase):
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
'swift.crypto.override': True}
req = Request.blank('/v1/a/c/o', environ=env)
body = 'FAKE APP'
body = b'FAKE APP'
hdrs = {'Etag': md5hex(body),
'content-type': 'text/plain',
'content-length': len(body),
@ -993,7 +1003,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
def test_GET_container_success(self):
# no format requested, listing has names only
fake_body = 'testfile1\ntestfile2\n'
fake_body = b'testfile1\ntestfile2\n'
calls = [0]
def wrapped_fetch_crypto_keys():
@ -1004,11 +1014,11 @@ class TestDecrypterContainerRequests(unittest.TestCase):
callback=wrapped_fetch_crypto_keys)
self.assertEqual('200 OK', resp.status)
names = resp.body.split('\n')
self.assertEqual(3, len(names))
self.assertIn('testfile1', names)
self.assertIn('testfile2', names)
self.assertIn('', names)
self.assertEqual(resp.body.split(b'\n'), [
b'testfile1',
b'testfile2',
b'',
])
self.assertEqual(0, calls[0])
def test_GET_container_json(self):
@ -1035,7 +1045,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"content_type": content_type_2}
listing = [subdir, obj_dict_1, obj_dict_2]
fake_body = json.dumps(listing)
fake_body = json.dumps(listing).encode('ascii')
resp = self._make_cont_get_req(fake_body, 'json')
@ -1069,7 +1079,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"content_type": content_type_2}
listing = [obj_dict_1, obj_dict_2]
fake_body = json.dumps(listing)
fake_body = json.dumps(listing).encode('ascii')
resp = self._make_cont_get_req(fake_body, 'json', override=True)
@ -1096,13 +1106,14 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"content_type": "image/jpeg"}
listing = [obj_dict_1]
fake_body = json.dumps(listing)
fake_body = json.dumps(listing).encode('ascii')
resp = self._make_cont_get_req(fake_body, 'json')
self.assertEqual('200 OK', resp.status)
self.assertEqual(['<unknown>'],
[x['hash'] for x in json.loads(resp.body)])
self.assertEqual(
['<unknown>'],
[x['hash'] for x in json.loads(resp.body)])
self.assertIn("Cipher must be AES_CTR_256",
self.decrypter.logger.get_lines_for_level('error')[0])
self.assertIn('Error decrypting container listing',
@ -1123,13 +1134,14 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"content_type": "image/jpeg"}
listing = [obj_dict_1]
fake_body = json.dumps(listing)
fake_body = json.dumps(listing).encode('ascii')
resp = self._make_cont_get_req(fake_body, 'json')
self.assertEqual('200 OK', resp.status)
self.assertEqual(['<unknown>'],
[x['hash'] for x in json.loads(resp.body)])
self.assertEqual(
['<unknown>'],
[x['hash'] for x in json.loads(resp.body)])
self.assertEqual(self.decrypter.logger.get_lines_for_level('error'), [
'get_keys(): unknown key id: unknown_key',
'Error decrypting container listing: unknown_key',
@ -1145,7 +1157,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"content_type": 'application/symlink'}
listing = [obj_dict]
fake_body = json.dumps(listing)
fake_body = json.dumps(listing).encode('ascii')
resp = self._make_cont_get_req(fake_body, 'json')

View File

@ -26,7 +26,8 @@ from swift.common.middleware.crypto import encrypter
from swift.common.middleware.crypto.crypto_utils import (
CRYPTO_KEY_CALLBACK, Crypto)
from swift.common.swob import (
Request, HTTPException, HTTPCreated, HTTPAccepted, HTTPOk, HTTPBadRequest)
Request, HTTPException, HTTPCreated, HTTPAccepted, HTTPOk, HTTPBadRequest,
wsgi_to_bytes, bytes_to_wsgi)
from swift.common.utils import FileLikeIter
from test.unit import FakeLogger, EMPTY_ETAG
@ -57,8 +58,8 @@ class TestEncrypter(unittest.TestCase):
meta_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, meta_iv)
self.assertEqual(
base64.b64encode(encrypt(value, key, meta_iv)),
enc_val)
base64.b64encode(encrypt(wsgi_to_bytes(value), key, meta_iv)),
wsgi_to_bytes(enc_val))
# if there is any encrypted user metadata then this header should exist
self.assertIn('X-Object-Transient-Sysmeta-Crypto-Meta', req_hdrs)
common_meta = json.loads(urlparse.unquote_plus(
@ -70,7 +71,7 @@ class TestEncrypter(unittest.TestCase):
def test_PUT_req(self):
body_key = os.urandom(32)
object_key = fetch_crypto_keys()['object']
plaintext = 'FAKE APP'
plaintext = b'FAKE APP'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
ciphertext_etag = md5hex(ciphertext)
@ -126,14 +127,17 @@ class TestEncrypter(unittest.TestCase):
# verify encrypted version of plaintext etag
actual = base64.b64decode(encrypted_etag)
etag_iv = base64.b64decode(actual_meta['iv'])
enc_etag = encrypt(plaintext_etag, object_key, etag_iv)
enc_etag = encrypt(plaintext_etag.encode('ascii'), object_key, etag_iv)
self.assertEqual(enc_etag, actual)
# verify etag MAC for conditional requests
actual_hmac = base64.b64decode(
req_hdrs['X-Object-Sysmeta-Crypto-Etag-Mac'])
self.assertEqual(actual_hmac, hmac.new(
object_key, plaintext_etag, hashlib.sha256).digest())
exp_hmac = hmac.new(
object_key,
plaintext_etag.encode('ascii'),
hashlib.sha256).digest()
self.assertEqual(actual_hmac, exp_hmac)
# verify encrypted etag for container update
self.assertIn(
@ -154,8 +158,9 @@ class TestEncrypter(unittest.TestCase):
cont_key = fetch_crypto_keys()['container']
cont_etag_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, cont_etag_iv)
self.assertEqual(encrypt(plaintext_etag, cont_key, cont_etag_iv),
base64.b64decode(parts[0]))
exp_etag = encrypt(plaintext_etag.encode('ascii'),
cont_key, cont_etag_iv)
self.assertEqual(exp_etag, base64.b64decode(parts[0]))
# content-type is not encrypted
self.assertEqual('text/plain', req_hdrs['Content-Type'])
@ -222,14 +227,14 @@ class TestEncrypter(unittest.TestCase):
# verify object is empty by getting direct from the app
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
resp = get_req.get_response(self.app)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertEqual(EMPTY_ETAG, resp.headers['Etag'])
def _test_PUT_with_other_footers(self, override_etag):
# verify handling of another middleware's footer callback
body_key = os.urandom(32)
object_key = fetch_crypto_keys()['object']
plaintext = 'FAKE APP'
plaintext = b'FAKE APP'
plaintext_etag = md5hex(plaintext)
ciphertext = encrypt(plaintext, body_key, FAKE_IV)
ciphertext_etag = md5hex(ciphertext)
@ -283,7 +288,8 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(ciphertext_etag, req_hdrs['Etag'])
actual = base64.b64decode(encrypted_etag)
etag_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(encrypt(plaintext_etag, object_key, etag_iv), actual)
exp_etag = encrypt(plaintext_etag.encode('ascii'), object_key, etag_iv)
self.assertEqual(exp_etag, actual)
# verify encrypted etag for container update
self.assertIn(
@ -303,8 +309,9 @@ class TestEncrypter(unittest.TestCase):
cont_key = fetch_crypto_keys()['container']
cont_etag_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, cont_etag_iv)
self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv),
base64.b64decode(parts[0]))
exp_etag = encrypt(override_etag.encode('ascii'),
cont_key, cont_etag_iv)
self.assertEqual(exp_etag, base64.b64decode(parts[0]))
# verify body crypto meta
actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta']
@ -331,7 +338,7 @@ class TestEncrypter(unittest.TestCase):
def _test_PUT_with_etag_override_in_headers(self, override_etag):
# verify handling of another middleware's
# container-update-override-etag in headers
plaintext = 'FAKE APP'
plaintext = b'FAKE APP'
plaintext_etag = md5hex(plaintext)
env = {'REQUEST_METHOD': 'PUT',
@ -373,8 +380,9 @@ class TestEncrypter(unittest.TestCase):
cont_etag_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, cont_etag_iv)
self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv),
base64.b64decode(parts[0]))
exp_etag = encrypt(override_etag.encode('ascii'),
cont_key, cont_etag_iv)
self.assertEqual(exp_etag, base64.b64decode(parts[0]))
def test_PUT_with_etag_override_in_headers(self):
self._test_PUT_with_etag_override_in_headers('override_etag')
@ -413,10 +421,10 @@ class TestEncrypter(unittest.TestCase):
req_hdrs['X-Object-Sysmeta-Container-Update-Override-Etag'])
def test_PUT_with_empty_etag_override_in_headers(self):
self._test_PUT_with_empty_etag_override_in_headers('body')
self._test_PUT_with_empty_etag_override_in_headers(b'body')
def test_PUT_with_empty_etag_override_in_headers_no_body(self):
self._test_PUT_with_empty_etag_override_in_headers('')
self._test_PUT_with_empty_etag_override_in_headers(b'')
def _test_PUT_with_empty_etag_override_in_footers(self, plaintext):
# verify that an override etag value of '' from other middleware is
@ -450,15 +458,15 @@ class TestEncrypter(unittest.TestCase):
req_hdrs['X-Object-Sysmeta-Container-Update-Override-Etag'])
def test_PUT_with_empty_etag_override_in_footers(self):
self._test_PUT_with_empty_etag_override_in_footers('body')
self._test_PUT_with_empty_etag_override_in_footers(b'body')
def test_PUT_with_empty_etag_override_in_footers_no_body(self):
self._test_PUT_with_empty_etag_override_in_footers('')
self._test_PUT_with_empty_etag_override_in_footers(b'')
def test_PUT_with_bad_etag_in_other_footers(self):
# verify that etag supplied in footers from other middleware overrides
# header etag when validating inbound plaintext etags
plaintext = 'FAKE APP'
plaintext = b'FAKE APP'
plaintext_etag = md5hex(plaintext)
other_footers = {
'Etag': 'bad etag',
@ -567,7 +575,7 @@ class TestEncrypter(unittest.TestCase):
cont_etag_iv = base64.b64decode(actual_meta['iv'])
self.assertEqual(FAKE_IV, cont_etag_iv)
self.assertEqual(encrypt('other override', cont_key, cont_etag_iv),
self.assertEqual(encrypt(b'other override', cont_key, cont_etag_iv),
base64.b64decode(parts[0]))
# verify that other middleware's footers made it to app
@ -625,7 +633,7 @@ class TestEncrypter(unittest.TestCase):
self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-'))
def test_POST_req(self):
body = 'FAKE APP'
body = b'FAKE APP'
env = {'REQUEST_METHOD': 'POST',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {'x-object-meta-test': 'encrypt me',
@ -700,14 +708,16 @@ class TestEncrypter(unittest.TestCase):
# masked values for secret_id None
key = fetch_crypto_keys()['object']
masked_etags = [
'"%s"' % base64.b64encode(hmac.new(
key, etag.strip('"'), hashlib.sha256).digest())
'"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
key, wsgi_to_bytes(etag.strip('"')),
hashlib.sha256).digest()))
for etag in plain_etags if etag not in ('*', '')]
# masked values for secret_id myid
key = fetch_crypto_keys(key_id={'secret_id': 'myid'})['object']
masked_etags_myid = [
'"%s"' % base64.b64encode(hmac.new(
key, etag.strip('"'), hashlib.sha256).digest())
'"%s"' % bytes_to_wsgi(base64.b64encode(hmac.new(
key, wsgi_to_bytes(etag.strip('"')),
hashlib.sha256).digest()))
for etag in plain_etags if etag not in ('*', '')]
expected_etags = set((expected_plain_etags or plain_etags) +
masked_etags + masked_etags_myid)
@ -793,12 +803,12 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
actual_headers['X-Backend-Etag-Is-At'])
self.assertIn('"%s"' % base64.b64encode(
hmac.new(key, 'an etag', hashlib.sha256).digest()),
self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode(
hmac.new(key, b'an etag', hashlib.sha256).digest())),
actual_headers['If-Match'])
self.assertIn('"another etag"', actual_headers['If-None-Match'])
self.assertIn('"%s"' % base64.b64encode(
hmac.new(key, 'another etag', hashlib.sha256).digest()),
self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode(
hmac.new(key, b'another etag', hashlib.sha256).digest())),
actual_headers['If-None-Match'])
def test_GET_etag_is_at_not_duplicated(self):
@ -824,8 +834,8 @@ class TestEncrypter(unittest.TestCase):
def test_PUT_multiseg_no_client_etag(self):
body_key = os.urandom(32)
chunks = ['some', 'chunks', 'of data']
body = ''.join(chunks)
chunks = [b'some', b'chunks', b'of data']
body = b''.join(chunks)
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
'wsgi.input': FileLikeIter(chunks)}
@ -848,8 +858,8 @@ class TestEncrypter(unittest.TestCase):
def test_PUT_multiseg_good_client_etag(self):
body_key = os.urandom(32)
chunks = ['some', 'chunks', 'of data']
body = ''.join(chunks)
chunks = [b'some', b'chunks', b'of data']
body = b''.join(chunks)
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
'wsgi.input': FileLikeIter(chunks)}
@ -872,8 +882,8 @@ class TestEncrypter(unittest.TestCase):
get_req.get_response(self.app).body)
def test_PUT_multiseg_bad_client_etag(self):
chunks = ['some', 'chunks', 'of data']
body = ''.join(chunks)
chunks = [b'some', b'chunks', b'of data']
body = b''.join(chunks)
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: fetch_crypto_keys,
'wsgi.input': FileLikeIter(chunks)}
@ -886,7 +896,7 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('422 Unprocessable Entity', resp.status)
def test_PUT_missing_key_callback(self):
body = 'FAKE APP'
body = b'FAKE APP'
env = {'REQUEST_METHOD': 'PUT'}
hdrs = {'content-type': 'text/plain',
'content-length': str(len(body))}
@ -895,13 +905,13 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('500 Internal Error', resp.status)
self.assertIn('missing callback',
self.encrypter.logger.get_lines_for_level('error')[0])
self.assertEqual('Unable to retrieve encryption keys.', resp.body)
self.assertEqual(b'Unable to retrieve encryption keys.', resp.body)
def test_PUT_error_in_key_callback(self):
def raise_exc(*args, **kwargs):
raise Exception('Testing')
body = 'FAKE APP'
body = b'FAKE APP'
env = {'REQUEST_METHOD': 'PUT',
CRYPTO_KEY_CALLBACK: raise_exc}
hdrs = {'content-type': 'text/plain',
@ -911,7 +921,7 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('500 Internal Error', resp.status)
self.assertIn('from callback: Testing',
self.encrypter.logger.get_lines_for_level('error')[0])
self.assertEqual('Unable to retrieve encryption keys.', resp.body)
self.assertEqual(b'Unable to retrieve encryption keys.', resp.body)
def test_PUT_encryption_override(self):
# set crypto override to disable encryption.
@ -921,7 +931,7 @@ class TestEncrypter(unittest.TestCase):
'X-Object-Sysmeta-Other': 'other sysmeta',
'X-Object-Sysmeta-Container-Update-Override-Etag':
'other override'}
body = 'FAKE APP'
body = b'FAKE APP'
env = {'REQUEST_METHOD': 'PUT',
'swift.crypto.override': True,
'swift.callback.update_footers':
@ -944,7 +954,7 @@ class TestEncrypter(unittest.TestCase):
def _test_constraints_checking(self, method):
# verify that the check_metadata function is called on PUT and POST
body = 'FAKE APP'
body = b'FAKE APP'
env = {'REQUEST_METHOD': method,
CRYPTO_KEY_CALLBACK: fetch_crypto_keys}
hdrs = {'content-type': 'text/plain',
@ -952,7 +962,7 @@ class TestEncrypter(unittest.TestCase):
req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs)
mocked_func = 'swift.common.middleware.crypto.encrypter.check_metadata'
with mock.patch(mocked_func) as mocked:
mocked.side_effect = [HTTPBadRequest('testing')]
mocked.side_effect = [HTTPBadRequest(b'testing')]
resp = req.get_response(self.encrypter)
self.assertEqual('400 Bad Request', resp.status)
self.assertEqual(1, mocked.call_count)
@ -991,10 +1001,10 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(2, len(encrypted))
crypted_val, crypt_info = encrypted
expected_crypt_val = base64.b64encode(
encrypt('aaa', object_key, FAKE_IV))
encrypt(b'aaa', object_key, FAKE_IV))
expected_crypt_info = {
'cipher': 'AES_CTR_256', 'iv': 'This is an IV123'}
self.assertEqual(expected_crypt_val, crypted_val)
'cipher': 'AES_CTR_256', 'iv': b'This is an IV123'}
self.assertEqual(expected_crypt_val, wsgi_to_bytes(crypted_val))
self.assertEqual(expected_crypt_info, crypt_info)
# - Empty string raises a ValueError for safety
@ -1002,14 +1012,14 @@ class TestEncrypter(unittest.TestCase):
encrypter.encrypt_header_val(Crypto(), '', object_key)
self.assertEqual('empty value is not acceptable',
cm.exception.message)
cm.exception.args[0])
# - None also raises a ValueError for safety
with self.assertRaises(ValueError) as cm:
encrypter.encrypt_header_val(Crypto(), None, object_key)
self.assertEqual('empty value is not acceptable',
cm.exception.message)
cm.exception.args[0])
if __name__ == '__main__':

View File

@ -55,7 +55,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
def setUp(self):
skip_if_no_xattrs()
self.plaintext = 'unencrypted body content'
self.plaintext = b'unencrypted body content'
self.plaintext_etag = md5hex(self.plaintext)
self._setup_crypto_app()
@ -82,6 +82,8 @@ class TestCryptoPipelineChanges(unittest.TestCase):
self.object_name = 'o'
self.object_path = self.container_path + '/' + self.object_name
container_path = self.container_path
if not isinstance(container_path, bytes):
container_path = container_path.encode('utf8')
req = Request.blank(
container_path, method='PUT',
headers={'X-Storage-Policy': policy_name})
@ -95,7 +97,10 @@ class TestCryptoPipelineChanges(unittest.TestCase):
self.assertEqual(policy_name, resp.headers['X-Storage-Policy'])
def _put_object(self, app, body):
req = Request.blank(self.object_path, method='PUT', body=body,
object_path = self.object_path
if not isinstance(object_path, bytes):
object_path = object_path.encode('utf8')
req = Request.blank(object_path, method='PUT', body=body,
headers={'Content-Type': 'application/test'})
resp = req.get_response(app)
self.assertEqual('201 Created', resp.status)
@ -103,7 +108,10 @@ class TestCryptoPipelineChanges(unittest.TestCase):
return resp
def _post_object(self, app):
req = Request.blank(self.object_path, method='POST',
object_path = self.object_path
if not isinstance(object_path, bytes):
object_path = object_path.encode('utf8')
req = Request.blank(object_path, method='POST',
headers={'Content-Type': 'application/test',
'X-Object-Meta-Fruit': 'Kiwi'})
resp = req.get_response(app)
@ -111,7 +119,10 @@ class TestCryptoPipelineChanges(unittest.TestCase):
return resp
def _copy_object(self, app, destination):
req = Request.blank(self.object_path, method='COPY',
object_path = self.object_path
if not isinstance(object_path, bytes):
object_path = object_path.encode('utf8')
req = Request.blank(object_path, method='COPY',
headers={'Destination': destination})
resp = req.get_response(app)
self.assertEqual('201 Created', resp.status)
@ -120,6 +131,8 @@ class TestCryptoPipelineChanges(unittest.TestCase):
def _check_GET_and_HEAD(self, app, object_path=None):
object_path = object_path or self.object_path
if not isinstance(object_path, bytes):
object_path = object_path.encode('utf8')
req = Request.blank(object_path, method='GET')
resp = req.get_response(app)
self.assertEqual('200 OK', resp.status)
@ -129,13 +142,15 @@ class TestCryptoPipelineChanges(unittest.TestCase):
req = Request.blank(object_path, method='HEAD')
resp = req.get_response(app)
self.assertEqual('200 OK', resp.status)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertEqual('Kiwi', resp.headers['X-Object-Meta-Fruit'])
def _check_match_requests(self, method, app, object_path=None):
object_path = object_path or self.object_path
if not isinstance(object_path, bytes):
object_path = object_path.encode('utf8')
# verify conditional match requests
expected_body = self.plaintext if method == 'GET' else ''
expected_body = self.plaintext if method == 'GET' else b''
# If-Match matches
req = Request.blank(object_path, method=method,
@ -160,7 +175,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
headers={'If-Match': '"not the etag"'})
resp = req.get_response(app)
self.assertEqual('412 Precondition Failed', resp.status)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertEqual(self.plaintext_etag, resp.headers['Etag'])
# If-None-Match matches
@ -169,7 +184,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
headers={'If-None-Match': '"%s"' % self.plaintext_etag})
resp = req.get_response(app)
self.assertEqual('304 Not Modified', resp.status)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertEqual(self.plaintext_etag, resp.headers['Etag'])
# If-None-Match wildcard
@ -177,7 +192,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
headers={'If-None-Match': '*'})
resp = req.get_response(app)
self.assertEqual('304 Not Modified', resp.status)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertEqual(self.plaintext_etag, resp.headers['Etag'])
# If-None-Match does not match
@ -191,6 +206,8 @@ class TestCryptoPipelineChanges(unittest.TestCase):
def _check_listing(self, app, expect_mismatch=False, container_path=None):
container_path = container_path or self.container_path
if not isinstance(container_path, bytes):
container_path = container_path.encode('utf8')
req = Request.blank(
container_path, method='GET', query_string='format=json')
resp = req.get_response(app)
@ -351,7 +368,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
req = Request.blank(self.object_path, method='HEAD')
resp = req.get_response(app)
self.assertEqual('200 OK', resp.status)
self.assertEqual('', resp.body)
self.assertEqual(b'', resp.body)
self.assertNotEqual('Kiwi', resp.headers['X-Object-Meta-Fruit'])
def test_write_with_crypto_read_without_crypto(self):
@ -427,9 +444,9 @@ class TestCryptoPipelineChanges(unittest.TestCase):
crypto_meta = crypto_meta_param[len('swift_meta='):]
listing_etag_iv = load_crypto_meta(crypto_meta)['iv']
exp_enc_listing_etag = base64.b64encode(
encrypt(self.plaintext_etag,
encrypt(self.plaintext_etag.encode('ascii'),
self.km.create_key('/a/%s' % self.container_name),
listing_etag_iv))
listing_etag_iv)).decode('ascii')
self.assertEqual(exp_enc_listing_etag, parts[0])
# Verify diskfile data and metadata is encrypted
@ -446,7 +463,7 @@ class TestCryptoPipelineChanges(unittest.TestCase):
policy=policy)
with df.open():
meta = df.get_metadata()
contents = ''.join(df.reader())
contents = b''.join(df.reader())
metadata = dict((k.lower(), v) for k, v in meta.items())
# verify on disk data - body
body_iv = load_crypto_meta(
@ -463,8 +480,8 @@ class TestCryptoPipelineChanges(unittest.TestCase):
'x-object-transient-sysmeta-crypto-meta-fruit'].split(';')
meta = meta.strip()[len('swift_meta='):]
metadata_iv = load_crypto_meta(meta)['iv']
exp_enc_meta = base64.b64encode(encrypt('Kiwi', obj_key,
metadata_iv))
exp_enc_meta = base64.b64encode(encrypt(
b'Kiwi', obj_key, metadata_iv)).decode('ascii')
self.assertEqual(exp_enc_meta, enc_val)
self.assertNotIn('x-object-meta-fruit', metadata)
@ -486,14 +503,16 @@ class TestCryptoPipelineChanges(unittest.TestCase):
actual_enc_etag, _junk, actual_etag_meta = metadata[
'x-object-sysmeta-crypto-etag'].partition('; swift_meta=')
etag_iv = load_crypto_meta(actual_etag_meta)['iv']
exp_enc_etag = base64.b64encode(encrypt(self.plaintext_etag,
obj_key, etag_iv))
exp_enc_etag = base64.b64encode(encrypt(
self.plaintext_etag.encode('ascii'),
obj_key, etag_iv)).decode('ascii')
self.assertEqual(exp_enc_etag, actual_enc_etag)
# verify etag hmac
exp_etag_mac = hmac.new(
obj_key, self.plaintext_etag, digestmod=hashlib.sha256)
exp_etag_mac = base64.b64encode(exp_etag_mac.digest())
obj_key, self.plaintext_etag.encode('ascii'),
digestmod=hashlib.sha256).digest()
exp_etag_mac = base64.b64encode(exp_etag_mac).decode('ascii')
self.assertEqual(exp_etag_mac,
metadata['x-object-sysmeta-crypto-etag-mac'])
@ -505,8 +524,8 @@ class TestCryptoPipelineChanges(unittest.TestCase):
listing_etag_iv = load_crypto_meta(crypto_meta)['iv']
cont_key = self.km.create_key('/a/%s' % self.container_name)
exp_enc_listing_etag = base64.b64encode(
encrypt(self.plaintext_etag, cont_key,
listing_etag_iv))
encrypt(self.plaintext_etag.encode('ascii'), cont_key,
listing_etag_iv)).decode('ascii')
self.assertEqual(exp_enc_listing_etag, parts[0])
self._check_GET_and_HEAD(self.crypto_app)

View File

@ -65,7 +65,7 @@ class TestKeymaster(unittest.TestCase):
('HEAD', swob.HTTPNoContent, '204')):
resp_headers = {}
self.swift.register(
method, '/v1' + path, resp_class, resp_headers, '')
method, '/v1' + path, resp_class, resp_headers, b'')
req = Request.blank(
'/v1' + path, environ={'REQUEST_METHOD': method})
start_response, calls = capture_start_response()
@ -142,10 +142,11 @@ class TestKeymaster(unittest.TestCase):
def do_test(dflt_id):
for secret in (os.urandom(32), os.urandom(33), os.urandom(50)):
encoded_secret = base64.b64encode(secret)
self.assertIsInstance(encoded_secret, bytes)
for conf_val in (
bytes(encoded_secret),
unicode(encoded_secret),
encoded_secret[:30] + '\n' + encoded_secret[30:]):
encoded_secret,
encoded_secret.decode('ascii'),
encoded_secret[:30] + b'\n' + encoded_secret[30:]):
try:
app = keymaster.KeyMaster(
self.swift, {'encryption_root_secret': conf_val,
@ -208,7 +209,7 @@ class TestKeymaster(unittest.TestCase):
root_key = base64.b64decode(conf_inner['encryption_root_secret_22'])
self.assertIn('container', keys)
self.assertEqual(keys.pop('container'),
hmac.new(root_key, '/a/c',
hmac.new(root_key, b'/a/c',
digestmod=hashlib.sha256).digest())
self.assertIn('all_ids', keys)
all_keys = set()
@ -249,11 +250,11 @@ class TestKeymaster(unittest.TestCase):
root_key = base64.b64decode(conf_inner['encryption_root_secret_22'])
self.assertIn('container', keys)
self.assertEqual(keys.pop('container'),
hmac.new(root_key, '/a/c',
hmac.new(root_key, b'/a/c',
digestmod=hashlib.sha256).digest())
self.assertIn('object', keys)
self.assertEqual(keys.pop('object'),
hmac.new(root_key, '/a/c/o',
hmac.new(root_key, b'/a/c/o',
digestmod=hashlib.sha256).digest())
self.assertIn('all_ids', keys)
at_least_one_old_style_id = False
@ -324,9 +325,9 @@ class TestKeymaster(unittest.TestCase):
self.app = keymaster.KeyMaster(self.swift, conf)
keys = self.verify_keys_for_path('/a/c/o', ('container', 'object'))
expected_keys = {
'container': hmac.new(secrets[None], '/a/c',
'container': hmac.new(secrets[None], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[None], '/a/c/o',
'object': hmac.new(secrets[None], b'/a/c/o',
digestmod=hashlib.sha256).digest()}
self.assertEqual(expected_keys, keys)
@ -335,9 +336,9 @@ class TestKeymaster(unittest.TestCase):
self.app = keymaster.KeyMaster(self.swift, conf)
keys = self.verify_keys_for_path('/a/c/o', ('container', 'object'))
expected_keys = {
'container': hmac.new(secrets['22'], '/a/c',
'container': hmac.new(secrets['22'], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets['22'], '/a/c/o',
'object': hmac.new(secrets['22'], b'/a/c/o',
digestmod=hashlib.sha256).digest()}
self.assertEqual(expected_keys, keys)
@ -346,9 +347,9 @@ class TestKeymaster(unittest.TestCase):
keys = self.verify_keys_for_path('/a/c/o', ('container', 'object'),
key_id={'secret_id': secret_id})
expected_keys = {
'container': hmac.new(secrets[secret_id], '/a/c',
'container': hmac.new(secrets[secret_id], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[secret_id], '/a/c/o',
'object': hmac.new(secrets[secret_id], b'/a/c/o',
digestmod=hashlib.sha256).digest()}
self.assertEqual(expected_keys, keys)
@ -374,9 +375,9 @@ class TestKeymaster(unittest.TestCase):
with mock.patch.object(self.app, 'create_key', mock_create_key):
keys = context.fetch_crypto_keys()
expected_keys = {
'container': hmac.new(secrets['22'], '/a/c',
'container': hmac.new(secrets['22'], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets['22'], '/a/c/o',
'object': hmac.new(secrets['22'], b'/a/c/o',
digestmod=hashlib.sha256).digest(),
'id': {'path': '/a/c/o', 'secret_id': '22', 'v': '1'},
'all_ids': [
@ -393,9 +394,9 @@ class TestKeymaster(unittest.TestCase):
with mock.patch.object(self.app, 'create_key', mock_create_key):
keys = context.fetch_crypto_keys(key_id={'secret_id': None})
expected_keys = {
'container': hmac.new(secrets[None], '/a/c',
'container': hmac.new(secrets[None], b'/a/c',
digestmod=hashlib.sha256).digest(),
'object': hmac.new(secrets[None], '/a/c/o',
'object': hmac.new(secrets[None], b'/a/c/o',
digestmod=hashlib.sha256).digest(),
'id': {'path': '/a/c/o', 'v': '1'},
'all_ids': [
@ -411,9 +412,10 @@ class TestKeymaster(unittest.TestCase):
def test_keymaster_config_path(self, mock_readconf):
for secret in (os.urandom(32), os.urandom(33), os.urandom(50)):
enc_secret = base64.b64encode(secret)
for conf_val in (bytes(enc_secret), unicode(enc_secret),
enc_secret[:30] + '\n' + enc_secret[30:],
enc_secret[:30] + '\r\n' + enc_secret[30:]):
self.assertIsInstance(enc_secret, bytes)
for conf_val in (enc_secret, enc_secret.decode('ascii'),
enc_secret[:30] + b'\n' + enc_secret[30:],
enc_secret[:30] + b'\r\n' + enc_secret[30:]):
mock_readconf.reset_mock()
mock_readconf.return_value = {
'encryption_root_secret': conf_val}
@ -428,8 +430,8 @@ class TestKeymaster(unittest.TestCase):
self.fail(str(err) + ' for secret %r' % secret)
def test_invalid_root_secret(self):
for secret in (bytes(base64.b64encode(os.urandom(31))), # too short
unicode(base64.b64encode(os.urandom(31))),
for secret in (base64.b64encode(os.urandom(31)), # too short
base64.b64encode(os.urandom(31)).decode('ascii'),
u'a' * 44 + u'????', b'a' * 44 + b'????', # not base64
u'a' * 45, b'a' * 45, # bad padding
99, None):
@ -447,7 +449,7 @@ class TestKeymaster(unittest.TestCase):
@mock.patch('swift.common.middleware.crypto.keymaster.readconf')
def test_root_secret_path_invalid_secret(self, mock_readconf):
for secret in (bytes(base64.b64encode(os.urandom(31))), # too short
unicode(base64.b64encode(os.urandom(31))),
base64.b64encode(os.urandom(31)).decode('ascii'),
u'a' * 44 + u'????', b'a' * 44 + b'????', # not base64
u'a' * 45, b'a' * 45, # bad padding
99, None):

View File

@ -147,7 +147,7 @@ class TestKmipKeymaster(unittest.TestCase):
key_id = 4321
"""
km_config_file = os.path.join(self.tempdir, 'km.conf')
with open(km_config_file, 'wb') as fd:
with open(km_config_file, 'wt') as fd:
fd.write(dedent(km_conf))
conf = {'__file__': '/etc/swift/proxy-server.conf',
@ -174,7 +174,7 @@ class TestKmipKeymaster(unittest.TestCase):
active_root_secret_id = secret_id
"""
km_config_file = os.path.join(self.tempdir, 'km.conf')
with open(km_config_file, 'wb') as fd:
with open(km_config_file, 'wt') as fd:
fd.write(dedent(km_conf))
conf = {'__file__': '/etc/swift/proxy-server.conf',
@ -219,7 +219,7 @@ class TestKmipKeymaster(unittest.TestCase):
key_id = 789
""" % km_config_file
with open(km_config_file, 'wb') as fd:
with open(km_config_file, 'wt') as fd:
fd.write(dedent(km_conf))
conf = {'__file__': proxy_server_conf_dir,

View File

@ -324,7 +324,7 @@ class TestKmsKeymaster(unittest.TestCase):
self.swift, TEST_PROXYSERVER_CONF_EXTERNAL_KEYMASTER_CONF)
raise Exception('Success even though key id invalid')
except ValueError as e:
self.assertEqual(e.message,
self.assertEqual(e.args[0],
ERR_MESSAGE_SECRET_INCORRECTLY_SPECIFIED)
except Exception:
print("Unexpected error: %s" % sys.exc_info()[0])
@ -369,7 +369,7 @@ class TestKmsKeymaster(unittest.TestCase):
except Exception as e:
expected_message = ('Key not found, uuid: ' +
TEST_KMS_NONEXISTENT_KEY_ID)
self.assertEqual(e.message, expected_message)
self.assertEqual(e.args[0], expected_message)
@mock.patch('swift.common.middleware.crypto.kms_keymaster.'
'keystone_password.KeystonePassword')
@ -453,7 +453,7 @@ class TestKmsKeymaster(unittest.TestCase):
except Exception as e:
expected_message = ('keymaster_config_path is set, but there are '
'other config options specified:')
self.assertTrue(e.message.startswith(expected_message),
self.assertTrue(e.args[0].startswith(expected_message),
"Error message does not start with '%s'" %
expected_message)

View File

@ -121,7 +121,7 @@ class FakeSwift(object):
# simulate object PUT
if method == 'PUT' and obj:
put_body = ''.join(iter(env['wsgi.input'].read, ''))
put_body = b''.join(iter(env['wsgi.input'].read, b''))
if 'swift.callback.update_footers' in env:
footers = HeaderKeyDict()
env['swift.callback.update_footers'](footers)
@ -202,7 +202,7 @@ class FakeSwift(object):
def call_count(self):
return len(self._calls)
def register(self, method, path, response_class, headers, body=''):
def register(self, method, path, response_class, headers, body=b''):
self._responses[(method, path)] = (response_class, headers, body)
def register_responses(self, method, path, responses):
@ -210,7 +210,7 @@ class FakeSwift(object):
class FakeAppThatExcepts(object):
MESSAGE = "We take exception to that!"
MESSAGE = b"We take exception to that!"
def __init__(self, exception_class=Exception):
self.exception_class = exception_class

View File

@ -254,49 +254,49 @@ def setup_servers(the_object_server=object_server, extra_conf=None):
assert(resp.status == 201)
# Create containers, 1 per test policy
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd = sock.makefile('rwb')
fd.write(b'PUT /v1/a/c HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nX-Auth-Token: t\r\n'
b'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
exp = b'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
# Create container in other account
# used for account-to-account tests
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd.write('PUT /v1/a1/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\n'
'Content-Length: 0\r\n\r\n')
fd = sock.makefile('rwb')
fd.write(b'PUT /v1/a1/c1 HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nX-Auth-Token: t\r\n'
b'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
exp = b'HTTP/1.1 201'
assert headers[:len(exp)] == exp, "Expected '%s', encountered '%s'" % (
exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd = sock.makefile('rwb')
fd.write(
'PUT /v1/a/c1 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: one\r\n'
'Content-Length: 0\r\n\r\n')
b'PUT /v1/a/c1 HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: one\r\n'
b'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
exp = b'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
"Expected %r, encountered %r" % (exp, headers[:len(exp)])
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
fd = sock.makefile()
fd = sock.makefile('rwb')
fd.write(
'PUT /v1/a/c2 HTTP/1.1\r\nHost: localhost\r\n'
'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: two\r\n'
'Content-Length: 0\r\n\r\n')
b'PUT /v1/a/c2 HTTP/1.1\r\nHost: localhost\r\n'
b'Connection: close\r\nX-Auth-Token: t\r\nX-Storage-Policy: two\r\n'
b'Content-Length: 0\r\n\r\n')
fd.flush()
headers = readuntil2crlfs(fd)
exp = 'HTTP/1.1 201'
exp = b'HTTP/1.1 201'
assert headers[:len(exp)] == exp, \
"Expected '%s', encountered '%s'" % (exp, headers[:len(exp)])
return context

View File

@ -41,6 +41,7 @@ commands =
test/unit/cli/test_relinker.py \
test/unit/cli/test_ring_builder_analyzer.py \
test/unit/cli/test_ringbuilder.py \
test/unit/common/middleware/crypto \
test/unit/common/middleware/test_catch_errors.py \
test/unit/common/middleware/test_crossdomain.py \
test/unit/common/middleware/test_domain_remap.py \