Fix encryption-delimiter interaction

Previously, if a container listing produced `subdir` elements
the decrypter would raise a KeyError.

Additionally, update the functests so this sort of thing would
have been caught at the gate.

Closes-Bug: 1609904
Change-Id: Idc1907d19f90af7a086f45f8faecee9fbc3c69c2
This commit is contained in:
Tim Burke 2016-08-02 21:50:45 -07:00
parent ab2b844669
commit fb5fcb189e
4 changed files with 70 additions and 30 deletions

View File

@ -395,6 +395,7 @@ class DecrypterContContext(BaseDecrypterContext):
return [new_body] return [new_body]
def decrypt_obj_dict(self, obj_dict, key): def decrypt_obj_dict(self, obj_dict, key):
if 'hash' in obj_dict:
ciphertext = obj_dict['hash'] ciphertext = obj_dict['hash']
obj_dict['hash'] = self.decrypt_value_with_meta(ciphertext, key) obj_dict['hash'] = self.decrypt_value_with_meta(ciphertext, key)
return obj_dict return obj_dict

View File

@ -565,23 +565,34 @@ class Container(Base):
files = json.loads(self.conn.response.read()) files = json.loads(self.conn.response.read())
for file_item in files: for file_item in files:
file_item['name'] = file_item['name'].encode('utf-8') for key in ('name', 'subdir', 'content_type'):
file_item['content_type'] = file_item['content_type'].\ if key in file_item:
encode('utf-8') file_item[key] = file_item[key].encode('utf-8')
return files return files
elif format_type == 'xml': elif format_type == 'xml':
files = [] files = []
tree = minidom.parseString(self.conn.response.read()) tree = minidom.parseString(self.conn.response.read())
for x in tree.getElementsByTagName('object'): container = tree.getElementsByTagName('container')[0]
for x in container.childNodes:
file_item = {} file_item = {}
if x.tagName == 'object':
for key in ['name', 'hash', 'bytes', 'content_type', for key in ['name', 'hash', 'bytes', 'content_type',
'last_modified']: 'last_modified']:
file_item[key] = x.getElementsByTagName(key)[0].\ file_item[key] = x.getElementsByTagName(key)[0].\
childNodes[0].nodeValue childNodes[0].nodeValue
elif x.tagName == 'subdir':
file_item['subdir'] = x.getElementsByTagName(
'name')[0].childNodes[0].nodeValue
else:
raise ValueError('Found unexpected element %s'
% x.tagName)
files.append(file_item) files.append(file_item)
for file_item in files: for file_item in files:
if 'subdir' in file_item:
file_item['subdir'] = file_item['subdir'].\
encode('utf-8')
else:
file_item['name'] = file_item['name'].encode('utf-8') file_item['name'] = file_item['name'].encode('utf-8')
file_item['content_type'] = file_item['content_type'].\ file_item['content_type'] = file_item['content_type'].\
encode('utf-8') encode('utf-8')

View File

@ -573,13 +573,19 @@ class TestContainer(Base):
for format_type in [None, 'json', 'xml']: for format_type in [None, 'json', 'xml']:
for prefix in prefixs: for prefix in prefixs:
files = cont.files(parms={'prefix': prefix}) files = cont.files(parms={'prefix': prefix,
'format': format_type})
if isinstance(files[0], dict):
files = [x.get('name', x.get('subdir')) for x in files]
self.assertEqual(files, sorted(prefix_files[prefix])) self.assertEqual(files, sorted(prefix_files[prefix]))
for format_type in [None, 'json', 'xml']: for format_type in [None, 'json', 'xml']:
for prefix in prefixs: for prefix in prefixs:
files = cont.files(parms={'limit': limit_count, files = cont.files(parms={'limit': limit_count,
'prefix': prefix}) 'prefix': prefix,
'format': format_type})
if isinstance(files[0], dict):
files = [x.get('name', x.get('subdir')) for x in files]
self.assertEqual(len(files), limit_count) self.assertEqual(len(files), limit_count)
for file_item in files: for file_item in files:
@ -596,11 +602,23 @@ class TestContainer(Base):
file_item = cont.file(f) file_item = cont.file(f)
self.assertTrue(file_item.write_random()) self.assertTrue(file_item.write_random())
results = cont.files() for format_type in [None, 'json', 'xml']:
results = cont.files(parms={'delimiter': delimiter}) results = cont.files(parms={'format': format_type})
if isinstance(results[0], dict):
results = [x.get('name', x.get('subdir')) for x in results]
self.assertEqual(results, ['test', 'test-bar', 'test-foo'])
results = cont.files(parms={'delimiter': delimiter,
'format': format_type})
if isinstance(results[0], dict):
results = [x.get('name', x.get('subdir')) for x in results]
self.assertEqual(results, ['test', 'test-']) self.assertEqual(results, ['test', 'test-'])
results = cont.files(parms={'delimiter': delimiter, 'reverse': 'yes'}) results = cont.files(parms={'delimiter': delimiter,
'format': format_type,
'reverse': 'yes'})
if isinstance(results[0], dict):
results = [x.get('name', x.get('subdir')) for x in results]
self.assertEqual(results, ['test-', 'test']) self.assertEqual(results, ['test-', 'test'])
def testListDelimiterAndPrefix(self): def testListDelimiterAndPrefix(self):

View File

@ -874,6 +874,8 @@ class TestDecrypterContainerRequests(unittest.TestCase):
pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10' pt_etag2 = 'ac0374ed4d43635f803c82469d0b5a10'
key = fetch_crypto_keys()['container'] key = fetch_crypto_keys()['container']
subdir = {"subdir": "pseudo-dir/"}
obj_dict_1 = {"bytes": 16, obj_dict_1 = {"bytes": 16,
"last_modified": "2015-04-14T23:33:06.439040", "last_modified": "2015-04-14T23:33:06.439040",
"hash": encrypt_and_append_meta( "hash": encrypt_and_append_meta(
@ -888,7 +890,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
"name": "testfile2", "name": "testfile2",
"content_type": content_type_2} "content_type": content_type_2}
listing = [obj_dict_1, obj_dict_2] listing = [subdir, obj_dict_1, obj_dict_2]
fake_body = json.dumps(listing) fake_body = json.dumps(listing)
resp = self._make_cont_get_req(fake_body, 'json') resp = self._make_cont_get_req(fake_body, 'json')
@ -897,11 +899,12 @@ class TestDecrypterContainerRequests(unittest.TestCase):
body = resp.body body = resp.body
self.assertEqual(len(body), int(resp.headers['Content-Length'])) self.assertEqual(len(body), int(resp.headers['Content-Length']))
body_json = json.loads(body) body_json = json.loads(body)
self.assertEqual(2, len(body_json)) self.assertEqual(3, len(body_json))
self.assertDictEqual(subdir, body_json[0])
obj_dict_1['hash'] = pt_etag1 obj_dict_1['hash'] = pt_etag1
self.assertDictEqual(obj_dict_1, body_json[0]) self.assertDictEqual(obj_dict_1, body_json[1])
obj_dict_2['hash'] = pt_etag2 obj_dict_2['hash'] = pt_etag2
self.assertDictEqual(obj_dict_2, body_json[1]) self.assertDictEqual(obj_dict_2, body_json[2])
def test_GET_container_json_with_crypto_override(self): def test_GET_container_json_with_crypto_override(self):
content_type_1 = 'image/jpeg' content_type_1 = 'image/jpeg'
@ -958,6 +961,10 @@ class TestDecrypterContainerRequests(unittest.TestCase):
self.assertIn("Cipher must be AES_CTR_256", self.assertIn("Cipher must be AES_CTR_256",
self.decrypter.logger.get_lines_for_level('error')[0]) self.decrypter.logger.get_lines_for_level('error')[0])
def _assert_element(self, name, expected, element):
self.assertEqual(element.tagName, name)
self._assert_element_contains_dict(expected, element)
def _assert_element_contains_dict(self, expected, element): def _assert_element_contains_dict(self, expected, element):
for k, v in expected.items(): for k, v in expected.items():
entry = element.getElementsByTagName(k) entry = element.getElementsByTagName(k)
@ -976,6 +983,7 @@ class TestDecrypterContainerRequests(unittest.TestCase):
fake_body = '''<?xml version="1.0" encoding="UTF-8"?> fake_body = '''<?xml version="1.0" encoding="UTF-8"?>
<container name="testc">\ <container name="testc">\
<subdir name="test-subdir"><name>test-subdir</name></subdir>\
<object><hash>\ <object><hash>\
''' + encrypt_and_append_meta(pt_etag1.encode('utf8'), key) + '''\ ''' + encrypt_and_append_meta(pt_etag1.encode('utf8'), key) + '''\
</hash><content_type>\ </hash><content_type>\
@ -1001,21 +1009,23 @@ class TestDecrypterContainerRequests(unittest.TestCase):
self.assertEqual('testc', self.assertEqual('testc',
containers[0].attributes.getNamedItem("name").value) containers[0].attributes.getNamedItem("name").value)
objs = tree.getElementsByTagName('object') results = containers[0].childNodes
self.assertEqual(2, len(objs)) self.assertEqual(3, len(results))
self._assert_element('subdir', {"name": "test-subdir"}, results[0])
obj_dict_1 = {"bytes": "16", obj_dict_1 = {"bytes": "16",
"last_modified": "2015-04-19T02:37:39.601660", "last_modified": "2015-04-19T02:37:39.601660",
"hash": pt_etag1, "hash": pt_etag1,
"name": "testfile", "name": "testfile",
"content_type": content_type_1} "content_type": content_type_1}
self._assert_element_contains_dict(obj_dict_1, objs[0]) self._assert_element('object', obj_dict_1, results[1])
obj_dict_2 = {"bytes": "24", obj_dict_2 = {"bytes": "24",
"last_modified": "2015-04-19T02:37:39.684740", "last_modified": "2015-04-19T02:37:39.684740",
"hash": pt_etag2, "hash": pt_etag2,
"name": "testfile2", "name": "testfile2",
"content_type": content_type_2} "content_type": content_type_2}
self._assert_element_contains_dict(obj_dict_2, objs[1]) self._assert_element('object', obj_dict_2, results[2])
def test_GET_container_xml_with_crypto_override(self): def test_GET_container_xml_with_crypto_override(self):
content_type_1 = 'image/jpeg' content_type_1 = 'image/jpeg'