# Copyright (c) 2015-2016 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import hashlib import hmac import json import os import unittest import mock from six.moves.urllib import parse as urlparse from swift.common.middleware.crypto import encrypter from swift.common.middleware.crypto.crypto_utils import ( CRYPTO_KEY_CALLBACK, Crypto) from swift.common.swob import ( Request, HTTPException, HTTPCreated, HTTPAccepted, HTTPOk, HTTPBadRequest) from swift.common.utils import FileLikeIter from test.unit import FakeLogger, EMPTY_ETAG from test.unit.common.middleware.crypto.crypto_helpers import ( fetch_crypto_keys, md5hex, FAKE_IV, encrypt) from test.unit.common.middleware.helpers import FakeSwift, FakeAppThatExcepts @mock.patch('swift.common.middleware.crypto.crypto_utils.Crypto.create_iv', lambda *args: FAKE_IV) class TestEncrypter(unittest.TestCase): def setUp(self): self.app = FakeSwift() self.encrypter = encrypter.Encrypter(self.app, {}) self.encrypter.logger = FakeLogger() def _verify_user_metadata(self, req_hdrs, name, value, key): # verify encrypted version of user metadata self.assertNotIn('X-Object-Meta-' + name, req_hdrs) expected_hdr = 'X-Object-Transient-Sysmeta-Crypto-Meta-' + name self.assertIn(expected_hdr, req_hdrs) enc_val, param = req_hdrs[expected_hdr].split(';') param = param.strip() self.assertTrue(param.startswith('swift_meta=')) actual_meta = json.loads( urlparse.unquote_plus(param[len('swift_meta='):])) self.assertEqual(Crypto.cipher, actual_meta['cipher']) meta_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(FAKE_IV, meta_iv) self.assertEqual( base64.b64encode(encrypt(value, key, meta_iv)), enc_val) # if there is any encrypted user metadata then this header should exist self.assertIn('X-Object-Transient-Sysmeta-Crypto-Meta', req_hdrs) common_meta = json.loads(urlparse.unquote_plus( req_hdrs['X-Object-Transient-Sysmeta-Crypto-Meta'])) self.assertDictEqual({'cipher': Crypto.cipher, 'key_id': {'v': 'fake', 'path': '/a/c/fake'}}, common_meta) def test_PUT_req(self): body_key = os.urandom(32) object_key = fetch_crypto_keys()['object'] plaintext = 'FAKE APP' plaintext_etag = md5hex(plaintext) ciphertext = encrypt(plaintext, body_key, FAKE_IV) ciphertext_etag = md5hex(ciphertext) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'etag': plaintext_etag, 'content-type': 'text/plain', 'content-length': str(len(plaintext)), 'x-object-meta-etag': 'not to be confused with the Etag!', 'x-object-meta-test': 'encrypt me', 'x-object-sysmeta-test': 'do not encrypt me'} req = Request.blank( '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) with mock.patch( 'swift.common.middleware.crypto.crypto_utils.' 'Crypto.create_random_key', return_value=body_key): resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual('PUT', self.app.calls[0][0]) req_hdrs = self.app.headers[0] # verify body crypto meta actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] actual = json.loads(urlparse.unquote_plus(actual)) self.assertEqual(Crypto().cipher, actual['cipher']) self.assertEqual(FAKE_IV, base64.b64decode(actual['iv'])) # verify wrapped body key expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV) self.assertEqual(expected_wrapped_key, base64.b64decode(actual['body_key']['key'])) self.assertEqual(FAKE_IV, base64.b64decode(actual['body_key']['iv'])) self.assertEqual(fetch_crypto_keys()['id'], actual['key_id']) # verify etag self.assertEqual(ciphertext_etag, req_hdrs['Etag']) encrypted_etag, _junk, etag_meta = \ req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=') # verify crypto_meta was appended to this etag self.assertTrue(etag_meta) actual_meta = json.loads(urlparse.unquote_plus(etag_meta)) self.assertEqual(Crypto().cipher, actual_meta['cipher']) # verify encrypted version of plaintext etag actual = base64.b64decode(encrypted_etag) etag_iv = base64.b64decode(actual_meta['iv']) enc_etag = encrypt(plaintext_etag, object_key, etag_iv) self.assertEqual(enc_etag, actual) # verify etag MAC for conditional requests actual_hmac = base64.b64decode( req_hdrs['X-Object-Sysmeta-Crypto-Etag-Mac']) self.assertEqual(actual_hmac, hmac.new( object_key, plaintext_etag, hashlib.sha256).digest()) # verify encrypted etag for container update self.assertIn( 'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs) parts = req_hdrs[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) # extract crypto_meta from end of etag for container update param = parts[1].strip() crypto_meta_tag = 'swift_meta=' self.assertTrue(param.startswith(crypto_meta_tag), param) actual_meta = json.loads( urlparse.unquote_plus(param[len(crypto_meta_tag):])) self.assertEqual(Crypto().cipher, actual_meta['cipher']) self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id']) cont_key = fetch_crypto_keys()['container'] cont_etag_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(FAKE_IV, cont_etag_iv) self.assertEqual(encrypt(plaintext_etag, cont_key, cont_etag_iv), base64.b64decode(parts[0])) # content-type is not encrypted self.assertEqual('text/plain', req_hdrs['Content-Type']) # user meta is encrypted self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key) self._verify_user_metadata( req_hdrs, 'Etag', 'not to be confused with the Etag!', object_key) # sysmeta is not encrypted self.assertEqual('do not encrypt me', req_hdrs['X-Object-Sysmeta-Test']) # verify object is encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) resp = get_req.get_response(self.app) self.assertEqual(ciphertext, resp.body) self.assertEqual(ciphertext_etag, resp.headers['Etag']) def test_PUT_zero_size_object(self): # object body encryption should be skipped for zero sized object body object_key = fetch_crypto_keys()['object'] plaintext_etag = EMPTY_ETAG env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'etag': EMPTY_ETAG, 'content-type': 'text/plain', 'content-length': '0', 'x-object-meta-etag': 'not to be confused with the Etag!', 'x-object-meta-test': 'encrypt me', 'x-object-sysmeta-test': 'do not encrypt me'} req = Request.blank( '/v1/a/c/o', environ=env, body='', headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual('PUT', self.app.calls[0][0]) req_hdrs = self.app.headers[0] # verify that there is no body crypto meta self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', req_hdrs) # verify etag is md5 of plaintext self.assertEqual(EMPTY_ETAG, req_hdrs['Etag']) # verify there is no etag crypto meta self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', req_hdrs) # verify there is no container update override for etag self.assertNotIn( 'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs) # user meta is still encrypted self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key) self._verify_user_metadata( req_hdrs, 'Etag', 'not to be confused with the Etag!', object_key) # sysmeta is not encrypted self.assertEqual('do not encrypt me', req_hdrs['X-Object-Sysmeta-Test']) # verify object is empty by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) resp = get_req.get_response(self.app) self.assertEqual('', resp.body) self.assertEqual(EMPTY_ETAG, resp.headers['Etag']) def _test_PUT_with_other_footers(self, override_etag): # verify handling of another middleware's footer callback cont_key = fetch_crypto_keys()['container'] body_key = os.urandom(32) object_key = fetch_crypto_keys()['object'] plaintext = 'FAKE APP' plaintext_etag = md5hex(plaintext) ciphertext = encrypt(plaintext, body_key, FAKE_IV) ciphertext_etag = md5hex(ciphertext) other_footers = { 'Etag': plaintext_etag, 'X-Object-Sysmeta-Other': 'other sysmeta', 'X-Object-Sysmeta-Container-Update-Override-Size': 'other override', 'X-Object-Sysmeta-Container-Update-Override-Etag': override_etag} env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'swift.callback.update_footers': lambda footers: footers.update(other_footers)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(plaintext)), 'Etag': 'correct etag is in footers'} req = Request.blank( '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) with mock.patch( 'swift.common.middleware.crypto.crypto_utils.' 'Crypto.create_random_key', lambda *args: body_key): resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual('PUT', self.app.calls[0][0]) req_hdrs = self.app.headers[0] # verify that other middleware's footers made it to app, including any # container update overrides but nothing Etag-related other_footers.pop('Etag') other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag') for k, v in other_footers.items(): self.assertEqual(v, req_hdrs[k]) # verify encryption footers are ok encrypted_etag, _junk, etag_meta = \ req_hdrs['X-Object-Sysmeta-Crypto-Etag'].partition('; swift_meta=') self.assertTrue(etag_meta) actual_meta = json.loads(urlparse.unquote_plus(etag_meta)) self.assertEqual(Crypto().cipher, actual_meta['cipher']) self.assertEqual(ciphertext_etag, req_hdrs['Etag']) actual = base64.b64decode(encrypted_etag) etag_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(encrypt(plaintext_etag, object_key, etag_iv), actual) # verify encrypted etag for container update self.assertIn( 'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs) parts = req_hdrs[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) # extract crypto_meta from end of etag for container update param = parts[1].strip() crypto_meta_tag = 'swift_meta=' self.assertTrue(param.startswith(crypto_meta_tag), param) actual_meta = json.loads( urlparse.unquote_plus(param[len(crypto_meta_tag):])) self.assertEqual(Crypto().cipher, actual_meta['cipher']) cont_key = fetch_crypto_keys()['container'] cont_etag_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(FAKE_IV, cont_etag_iv) self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv), base64.b64decode(parts[0])) # verify body crypto meta actual = req_hdrs['X-Object-Sysmeta-Crypto-Body-Meta'] actual = json.loads(urlparse.unquote_plus(actual)) self.assertEqual(Crypto().cipher, actual['cipher']) self.assertEqual(FAKE_IV, base64.b64decode(actual['iv'])) # verify wrapped body key expected_wrapped_key = encrypt(body_key, object_key, FAKE_IV) self.assertEqual(expected_wrapped_key, base64.b64decode(actual['body_key']['key'])) self.assertEqual(FAKE_IV, base64.b64decode(actual['body_key']['iv'])) self.assertEqual(fetch_crypto_keys()['id'], actual['key_id']) def test_PUT_with_other_footers(self): self._test_PUT_with_other_footers('override etag') def test_PUT_with_other_footers_and_empty_etag(self): # verify that an override etag value of EMPTY_ETAG will be encrypted # when there was a non-zero body length self._test_PUT_with_other_footers(EMPTY_ETAG) def _test_PUT_with_etag_override_in_headers(self, override_etag): # verify handling of another middleware's # container-update-override-etag in headers plaintext = 'FAKE APP' plaintext_etag = md5hex(plaintext) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'content-type': 'text/plain', 'content-length': str(len(plaintext)), 'Etag': plaintext_etag, 'X-Object-Sysmeta-Container-Update-Override-Etag': override_etag} req = Request.blank( '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0]) req_hdrs = self.app.headers[0] # verify encrypted etag for container update self.assertIn( 'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs) parts = req_hdrs[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) cont_key = fetch_crypto_keys()['container'] # extract crypto_meta from end of etag for container update param = parts[1].strip() crypto_meta_tag = 'swift_meta=' self.assertTrue(param.startswith(crypto_meta_tag), param) actual_meta = json.loads( urlparse.unquote_plus(param[len(crypto_meta_tag):])) self.assertEqual(Crypto().cipher, actual_meta['cipher']) self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id']) cont_etag_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(FAKE_IV, cont_etag_iv) self.assertEqual(encrypt(override_etag, cont_key, cont_etag_iv), base64.b64decode(parts[0])) def test_PUT_with_etag_override_in_headers(self): self._test_PUT_with_etag_override_in_headers('override_etag') def test_PUT_with_etag_override_in_headers_and_empty_etag(self): # verify that an override etag value of EMPTY_ETAG will be encrypted # when there was a non-zero body length self._test_PUT_with_etag_override_in_headers(EMPTY_ETAG) def test_PUT_with_bad_etag_in_other_footers(self): # verify that etag supplied in footers from other middleware overrides # header etag when validating inbound plaintext etags plaintext = 'FAKE APP' plaintext_etag = md5hex(plaintext) other_footers = { 'Etag': 'bad etag', 'X-Object-Sysmeta-Other': 'other sysmeta', 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other override'} env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'swift.callback.update_footers': lambda footers: footers.update(other_footers)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(plaintext)), 'Etag': plaintext_etag} req = Request.blank( '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('422 Unprocessable Entity', resp.status) self.assertNotIn('Etag', resp.headers) def test_PUT_with_bad_etag_in_headers_and_other_footers(self): # verify that etag supplied in headers from other middleware is used if # none is supplied in footers when validating inbound plaintext etags plaintext = 'FAKE APP' other_footers = { 'X-Object-Sysmeta-Other': 'other sysmeta', 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other override'} env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'swift.callback.update_footers': lambda footers: footers.update(other_footers)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(plaintext)), 'Etag': 'bad etag'} req = Request.blank( '/v1/a/c/o', environ=env, body=plaintext, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('422 Unprocessable Entity', resp.status) self.assertNotIn('Etag', resp.headers) def test_PUT_nothing_read(self): # simulate an artificial scenario of a downstream filter/app not # actually reading the input stream from encrypter. class NonReadingApp(object): def __call__(self, env, start_response): # note: no read from wsgi.input req = Request(env) env['swift.callback.update_footers'](req.headers) call_headers.append(req.headers) resp = HTTPCreated(req=req, headers={'Etag': 'response etag'}) return resp(env, start_response) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'content-type': 'text/plain', 'content-length': 0, 'etag': 'etag from client'} req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs) call_headers = [] resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {})) self.assertEqual('201 Created', resp.status) self.assertEqual('response etag', resp.headers['Etag']) self.assertEqual(1, len(call_headers)) self.assertEqual('etag from client', call_headers[0]['etag']) # verify no encryption footers for k in call_headers[0]: self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-')) # check that an upstream footer callback gets called other_footers = { 'Etag': EMPTY_ETAG, 'X-Object-Sysmeta-Other': 'other sysmeta', 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other override'} env.update({'swift.callback.update_footers': lambda footers: footers.update(other_footers)}) req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs) call_headers = [] resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {})) self.assertEqual('201 Created', resp.status) self.assertEqual('response etag', resp.headers['Etag']) self.assertEqual(1, len(call_headers)) # verify encrypted override etag for container update. self.assertIn( 'X-Object-Sysmeta-Container-Update-Override-Etag', call_headers[0]) parts = call_headers[0][ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) cont_key = fetch_crypto_keys()['container'] param = parts[1].strip() crypto_meta_tag = 'swift_meta=' self.assertTrue(param.startswith(crypto_meta_tag), param) actual_meta = json.loads( urlparse.unquote_plus(param[len(crypto_meta_tag):])) self.assertEqual(Crypto().cipher, actual_meta['cipher']) self.assertEqual(fetch_crypto_keys()['id'], actual_meta['key_id']) cont_etag_iv = base64.b64decode(actual_meta['iv']) self.assertEqual(FAKE_IV, cont_etag_iv) self.assertEqual(encrypt('other override', cont_key, cont_etag_iv), base64.b64decode(parts[0])) # verify that other middleware's footers made it to app other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag') for k, v in other_footers.items(): self.assertEqual(v, call_headers[0][k]) # verify no encryption footers for k in call_headers[0]: self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-')) # if upstream footer override etag is for an empty body then check that # it is not encrypted other_footers = { 'Etag': EMPTY_ETAG, 'X-Object-Sysmeta-Container-Update-Override-Etag': EMPTY_ETAG} env.update({'swift.callback.update_footers': lambda footers: footers.update(other_footers)}) req = Request.blank('/v1/a/c/o', environ=env, body='', headers=hdrs) call_headers = [] resp = req.get_response(encrypter.Encrypter(NonReadingApp(), {})) self.assertEqual('201 Created', resp.status) self.assertEqual('response etag', resp.headers['Etag']) self.assertEqual(1, len(call_headers)) # verify that other middleware's footers made it to app for k, v in other_footers.items(): self.assertEqual(v, call_headers[0][k]) # verify no encryption footers for k in call_headers[0]: self.assertFalse(k.lower().startswith('x-object-sysmeta-crypto-')) def test_POST_req(self): body = 'FAKE APP' env = {'REQUEST_METHOD': 'POST', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'x-object-meta-test': 'encrypt me', 'x-object-meta-test2': '', 'x-object-sysmeta-test': 'do not encrypt me'} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) key = fetch_crypto_keys()['object'] self.app.register('POST', '/v1/a/c/o', HTTPAccepted, {}) resp = req.get_response(self.encrypter) self.assertEqual('202 Accepted', resp.status) self.assertNotIn('Etag', resp.headers) # verify metadata items self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual('POST', self.app.calls[0][0]) req_hdrs = self.app.headers[0] # user meta is encrypted self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', key) # unless it had no value self.assertEqual('', req_hdrs['X-Object-Meta-Test2']) # sysmeta is not encrypted self.assertEqual('do not encrypt me', req_hdrs['X-Object-Sysmeta-Test']) def _test_no_user_metadata(self, method): # verify that x-object-transient-sysmeta-crypto-meta is not set when # there is no user metadata env = {'REQUEST_METHOD': method, CRYPTO_KEY_CALLBACK: fetch_crypto_keys} req = Request.blank('/v1/a/c/o', environ=env, body='body') self.app.register(method, '/v1/a/c/o', HTTPAccepted, {}) resp = req.get_response(self.encrypter) self.assertEqual('202 Accepted', resp.status) self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual(method, self.app.calls[0][0]) self.assertNotIn('x-object-transient-sysmeta-crypto-meta', self.app.headers[0]) def test_PUT_no_user_metadata(self): self._test_no_user_metadata('PUT') def test_POST_no_user_metadata(self): self._test_no_user_metadata('POST') def _test_if_match(self, method, match_header_name): def do_test(method, plain_etags, expected_plain_etags=None): env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys} match_header_value = ', '.join(plain_etags) req = Request.blank( '/v1/a/c/o', environ=env, method=method, headers={match_header_name: match_header_value}) app = FakeSwift() app.register(method, '/v1/a/c/o', HTTPOk, {}) resp = req.get_response(encrypter.Encrypter(app, {})) self.assertEqual('200 OK', resp.status) self.assertEqual(1, len(app.calls), app.calls) self.assertEqual(method, app.calls[0][0]) actual_headers = app.headers[0] # verify the alternate etag location has been specified if match_header_value and match_header_value != '*': self.assertIn('X-Backend-Etag-Is-At', actual_headers) self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac', actual_headers['X-Backend-Etag-Is-At']) # verify etags have been supplemented with masked values self.assertIn(match_header_name, actual_headers) actual_etags = set(actual_headers[match_header_name].split(', ')) key = fetch_crypto_keys()['object'] masked_etags = [ '"%s"' % base64.b64encode(hmac.new( key, etag.strip('"'), hashlib.sha256).digest()) for etag in plain_etags if etag not in ('*', '')] expected_etags = set((expected_plain_etags or plain_etags) + masked_etags) self.assertEqual(expected_etags, actual_etags) # check that the request environ was returned to original state self.assertEqual(set(plain_etags), set(req.headers[match_header_name].split(', '))) do_test(method, ['']) do_test(method, ['"an etag"']) do_test(method, ['"an etag"', '"another_etag"']) do_test(method, ['*']) # rfc2616 does not allow wildcard *and* etag but test it anyway do_test(method, ['*', '"an etag"']) # etags should be quoted but check we can cope if they are not do_test( method, ['*', 'an etag', 'another_etag'], expected_plain_etags=['*', '"an etag"', '"another_etag"']) def test_GET_if_match(self): self._test_if_match('GET', 'If-Match') def test_HEAD_if_match(self): self._test_if_match('HEAD', 'If-Match') def test_GET_if_none_match(self): self._test_if_match('GET', 'If-None-Match') def test_HEAD_if_none_match(self): self._test_if_match('HEAD', 'If-None-Match') def _test_existing_etag_is_at_header(self, method, match_header_name): # if another middleware has already set X-Backend-Etag-Is-At then # encrypter should not override that value env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys} req = Request.blank( '/v1/a/c/o', environ=env, method=method, headers={match_header_name: "an etag", 'X-Backend-Etag-Is-At': 'X-Object-Sysmeta-Other-Etag'}) self.app.register(method, '/v1/a/c/o', HTTPOk, {}) resp = req.get_response(self.encrypter) self.assertEqual('200 OK', resp.status) self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual(method, self.app.calls[0][0]) actual_headers = self.app.headers[0] self.assertIn('X-Backend-Etag-Is-At', actual_headers) self.assertEqual( 'X-Object-Sysmeta-Other-Etag,X-Object-Sysmeta-Crypto-Etag-Mac', actual_headers['X-Backend-Etag-Is-At']) actual_etags = set(actual_headers[match_header_name].split(', ')) self.assertIn('"an etag"', actual_etags) def test_GET_if_match_with_existing_etag_is_at_header(self): self._test_existing_etag_is_at_header('GET', 'If-Match') def test_HEAD_if_match_with_existing_etag_is_at_header(self): self._test_existing_etag_is_at_header('HEAD', 'If-Match') def test_GET_if_none_match_with_existing_etag_is_at_header(self): self._test_existing_etag_is_at_header('GET', 'If-None-Match') def test_HEAD_if_none_match_with_existing_etag_is_at_header(self): self._test_existing_etag_is_at_header('HEAD', 'If-None-Match') def _test_etag_is_at_not_duplicated(self, method): # verify only one occurrence of X-Object-Sysmeta-Crypto-Etag-Mac in # X-Backend-Etag-Is-At key = fetch_crypto_keys()['object'] env = {CRYPTO_KEY_CALLBACK: fetch_crypto_keys} req = Request.blank( '/v1/a/c/o', environ=env, method=method, headers={'If-Match': '"an etag"', 'If-None-Match': '"another etag"'}) self.app.register(method, '/v1/a/c/o', HTTPOk, {}) resp = req.get_response(self.encrypter) self.assertEqual('200 OK', resp.status) self.assertEqual(1, len(self.app.calls), self.app.calls) self.assertEqual(method, self.app.calls[0][0]) actual_headers = self.app.headers[0] self.assertIn('X-Backend-Etag-Is-At', actual_headers) self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac', actual_headers['X-Backend-Etag-Is-At']) self.assertIn('"%s"' % base64.b64encode( hmac.new(key, 'an etag', hashlib.sha256).digest()), actual_headers['If-Match']) self.assertIn('"another etag"', actual_headers['If-None-Match']) self.assertIn('"%s"' % base64.b64encode( hmac.new(key, 'another etag', hashlib.sha256).digest()), actual_headers['If-None-Match']) def test_GET_etag_is_at_not_duplicated(self): self._test_etag_is_at_not_duplicated('GET') def test_HEAD_etag_is_at_not_duplicated(self): self._test_etag_is_at_not_duplicated('HEAD') def test_PUT_response_inconsistent_etag_is_not_replaced(self): # if response is success but etag does not match the ciphertext md5 # then verify that we do *not* replace it with the plaintext etag body = 'FAKE APP' env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {'Etag': 'not the ciphertext etag'}) resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) self.assertEqual('not the ciphertext etag', resp.headers['Etag']) def test_PUT_multiseg_no_client_etag(self): body_key = os.urandom(32) chunks = ['some', 'chunks', 'of data'] body = ''.join(chunks) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'wsgi.input': FileLikeIter(chunks)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) with mock.patch( 'swift.common.middleware.crypto.crypto_utils.' 'Crypto.create_random_key', lambda *args: body_key): resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) # verify object is encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) self.assertEqual(encrypt(body, body_key, FAKE_IV), get_req.get_response(self.app).body) def test_PUT_multiseg_good_client_etag(self): body_key = os.urandom(32) chunks = ['some', 'chunks', 'of data'] body = ''.join(chunks) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'wsgi.input': FileLikeIter(chunks)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body)), 'Etag': md5hex(body)} req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) with mock.patch( 'swift.common.middleware.crypto.crypto_utils.' 'Crypto.create_random_key', lambda *args: body_key): resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) # verify object is encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) self.assertEqual(encrypt(body, body_key, FAKE_IV), get_req.get_response(self.app).body) def test_PUT_multiseg_bad_client_etag(self): chunks = ['some', 'chunks', 'of data'] body = ''.join(chunks) env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: fetch_crypto_keys, 'wsgi.input': FileLikeIter(chunks)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body)), 'Etag': 'badclientetag'} req = Request.blank('/v1/a/c/o', environ=env, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('422 Unprocessable Entity', resp.status) def test_PUT_missing_key_callback(self): body = 'FAKE APP' env = {'REQUEST_METHOD': 'PUT'} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) resp = req.get_response(self.encrypter) self.assertEqual('500 Internal Error', resp.status) self.assertIn('missing callback', self.encrypter.logger.get_lines_for_level('error')[0]) self.assertEqual('Unable to retrieve encryption keys.', resp.body) def test_PUT_error_in_key_callback(self): def raise_exc(): raise Exception('Testing') body = 'FAKE APP' env = {'REQUEST_METHOD': 'PUT', CRYPTO_KEY_CALLBACK: raise_exc} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) resp = req.get_response(self.encrypter) self.assertEqual('500 Internal Error', resp.status) self.assertIn('from callback: Testing', self.encrypter.logger.get_lines_for_level('error')[0]) self.assertEqual('Unable to retrieve encryption keys.', resp.body) def test_PUT_encryption_override(self): # set crypto override to disable encryption. # simulate another middleware wanting to set footers other_footers = { 'Etag': 'other etag', 'X-Object-Sysmeta-Other': 'other sysmeta', 'X-Object-Sysmeta-Container-Update-Override-Etag': 'other override'} body = 'FAKE APP' env = {'REQUEST_METHOD': 'PUT', 'swift.crypto.override': True, 'swift.callback.update_footers': lambda footers: footers.update(other_footers)} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) self.app.register('PUT', '/v1/a/c/o', HTTPCreated, {}) resp = req.get_response(self.encrypter) self.assertEqual('201 Created', resp.status) # verify that other middleware's footers made it to app req_hdrs = self.app.headers[0] for k, v in other_footers.items(): self.assertEqual(v, req_hdrs[k]) # verify object is NOT encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) self.assertEqual(body, get_req.get_response(self.app).body) def _test_constraints_checking(self, method): # verify that the check_metadata function is called on PUT and POST body = 'FAKE APP' env = {'REQUEST_METHOD': method, CRYPTO_KEY_CALLBACK: fetch_crypto_keys} hdrs = {'content-type': 'text/plain', 'content-length': str(len(body))} req = Request.blank('/v1/a/c/o', environ=env, body=body, headers=hdrs) mocked_func = 'swift.common.middleware.crypto.encrypter.check_metadata' with mock.patch(mocked_func) as mocked: mocked.side_effect = [HTTPBadRequest('testing')] resp = req.get_response(self.encrypter) self.assertEqual('400 Bad Request', resp.status) self.assertEqual(1, mocked.call_count) mocked.assert_called_once_with(mock.ANY, 'object') self.assertEqual(req.headers, mocked.call_args_list[0][0][0].headers) def test_PUT_constraints_checking(self): self._test_constraints_checking('PUT') def test_POST_constraints_checking(self): self._test_constraints_checking('POST') def test_config_true_value_on_disable_encryption(self): app = FakeSwift() self.assertFalse(encrypter.Encrypter(app, {}).disable_encryption) for val in ('true', '1', 'yes', 'on', 't', 'y'): app = encrypter.Encrypter(app, {'disable_encryption': val}) self.assertTrue(app.disable_encryption) def test_PUT_app_exception(self): app = encrypter.Encrypter(FakeAppThatExcepts(HTTPException), {}) req = Request.blank('/', environ={'REQUEST_METHOD': 'PUT'}) with self.assertRaises(HTTPException) as catcher: req.get_response(app) self.assertEqual(FakeAppThatExcepts.MESSAGE, catcher.exception.body) def test_encrypt_header_val(self): # Prepare key and Crypto instance object_key = fetch_crypto_keys()['object'] # - Normal string can be crypted encrypted = encrypter.encrypt_header_val(Crypto(), 'aaa', object_key) # sanity: return value is 2 item tuple self.assertEqual(2, len(encrypted)) crypted_val, crypt_info = encrypted expected_crypt_val = base64.b64encode( encrypt('aaa', object_key, FAKE_IV)) expected_crypt_info = { 'cipher': 'AES_CTR_256', 'iv': 'This is an IV123'} self.assertEqual(expected_crypt_val, crypted_val) self.assertEqual(expected_crypt_info, crypt_info) # - Empty string raises a ValueError for safety with self.assertRaises(ValueError) as cm: encrypter.encrypt_header_val(Crypto(), '', object_key) self.assertEqual('empty value is not acceptable', cm.exception.message) # - None also raises a ValueError for safety with self.assertRaises(ValueError) as cm: encrypter.encrypt_header_val(Crypto(), None, object_key) self.assertEqual('empty value is not acceptable', cm.exception.message) if __name__ == '__main__': unittest.main()