py3: Fix title-casing in HeaderKeyDict
Change-Id: I1152c47c52f6482ec877142c96845b00bf6dcc5b Related-Change: I130ba5014b7eff458d87ab29eb42fe45607c9a12
This commit is contained in:
parent
d6e790d1b7
commit
60c27d3aef
@ -16,10 +16,20 @@
|
||||
import six
|
||||
|
||||
|
||||
def _title(s):
|
||||
if six.PY2:
|
||||
return s.title()
|
||||
else:
|
||||
return s.encode('latin1').title().decode('latin1')
|
||||
|
||||
|
||||
class HeaderKeyDict(dict):
|
||||
"""
|
||||
A dict that title-cases all keys on the way in, so as to be
|
||||
case-insensitive.
|
||||
|
||||
Note that all keys and values are expected to be wsgi strings,
|
||||
though some allowances are made when setting values.
|
||||
"""
|
||||
def __init__(self, base_headers=None, **kwargs):
|
||||
if base_headers:
|
||||
@ -29,32 +39,32 @@ class HeaderKeyDict(dict):
|
||||
def update(self, other):
|
||||
if hasattr(other, 'keys'):
|
||||
for key in other.keys():
|
||||
self[key.title()] = other[key]
|
||||
self[_title(key)] = other[key]
|
||||
else:
|
||||
for key, value in other:
|
||||
self[key.title()] = value
|
||||
self[_title(key)] = value
|
||||
|
||||
def __getitem__(self, key):
|
||||
return dict.get(self, key.title())
|
||||
return dict.get(self, _title(key))
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if value is None:
|
||||
self.pop(key.title(), None)
|
||||
self.pop(_title(key), None)
|
||||
elif six.PY2 and isinstance(value, six.text_type):
|
||||
return dict.__setitem__(self, key.title(), value.encode('utf-8'))
|
||||
return dict.__setitem__(self, _title(key), value.encode('utf-8'))
|
||||
elif six.PY3 and isinstance(value, six.binary_type):
|
||||
return dict.__setitem__(self, key.title(), value.decode('latin-1'))
|
||||
return dict.__setitem__(self, _title(key), value.decode('latin-1'))
|
||||
else:
|
||||
return dict.__setitem__(self, key.title(), str(value))
|
||||
return dict.__setitem__(self, _title(key), str(value))
|
||||
|
||||
def __contains__(self, key):
|
||||
return dict.__contains__(self, key.title())
|
||||
return dict.__contains__(self, _title(key))
|
||||
|
||||
def __delitem__(self, key):
|
||||
return dict.__delitem__(self, key.title())
|
||||
return dict.__delitem__(self, _title(key))
|
||||
|
||||
def get(self, key, default=None):
|
||||
return dict.get(self, key.title(), default)
|
||||
return dict.get(self, _title(key), default)
|
||||
|
||||
def setdefault(self, key, value=None):
|
||||
if key not in self:
|
||||
@ -62,4 +72,4 @@ class HeaderKeyDict(dict):
|
||||
return self[key]
|
||||
|
||||
def pop(self, key, default=None):
|
||||
return dict.pop(self, key.title(), default)
|
||||
return dict.pop(self, _title(key), default)
|
||||
|
@ -133,7 +133,7 @@ class ObjectController(Controller):
|
||||
|
||||
# delete object metadata from response
|
||||
for key in list(resp.headers.keys()):
|
||||
if key.startswith('x-amz-meta-'):
|
||||
if key.lower().startswith('x-amz-meta-'):
|
||||
del resp.headers[key]
|
||||
|
||||
resp.status = HTTP_OK
|
||||
|
@ -582,9 +582,9 @@ class TestS3ApiObj(S3ApiTestCase):
|
||||
self.assertEqual('200 ', status[:4], body)
|
||||
# Check that s3api does not return an etag header,
|
||||
# specified copy source.
|
||||
self.assertTrue(headers.get('etag') is None)
|
||||
self.assertNotIn('etag', headers)
|
||||
# Check that s3api does not return custom metadata in response
|
||||
self.assertTrue(headers.get('x-amz-meta-something') is None)
|
||||
self.assertNotIn('x-amz-meta-something', headers)
|
||||
|
||||
_, _, headers = self.swift.calls_with_headers[-1]
|
||||
# Check that s3api converts a Content-MD5 header into an etag.
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import unittest
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.swob import bytes_to_wsgi
|
||||
|
||||
|
||||
class TestHeaderKeyDict(unittest.TestCase):
|
||||
@ -27,6 +28,20 @@ class TestHeaderKeyDict(unittest.TestCase):
|
||||
self.assertEqual(headers['content-length'], '20')
|
||||
self.assertEqual(headers['CONTENT-LENGTH'], '20')
|
||||
|
||||
def test_unicode(self):
|
||||
def mkstr(prefix):
|
||||
return bytes_to_wsgi((prefix + u'\U0001f44d').encode('utf8'))
|
||||
|
||||
headers = HeaderKeyDict()
|
||||
headers[mkstr('x-object-meta-')] = 'ok'
|
||||
self.assertIn(mkstr('x-object-meta-'), headers)
|
||||
self.assertIn(mkstr('X-Object-Meta-'), headers)
|
||||
self.assertIn(mkstr('X-OBJECT-META-'), headers)
|
||||
keys = list(headers)
|
||||
self.assertNotIn(mkstr('x-object-meta-'), keys)
|
||||
self.assertIn(mkstr('X-Object-Meta-'), keys)
|
||||
self.assertNotIn(mkstr('X-OBJECT-META-'), keys)
|
||||
|
||||
def test_setdefault(self):
|
||||
headers = HeaderKeyDict()
|
||||
|
||||
|
@ -38,6 +38,7 @@ from swift.obj.diskfile import (
|
||||
from swift.common.ring import RingData
|
||||
from swift.common import utils
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.swob import bytes_to_wsgi
|
||||
from swift.common.utils import (
|
||||
hash_path, normalize_timestamp, mkdirs, write_pickle)
|
||||
from swift.common.storage_policy import StoragePolicy, POLICIES
|
||||
@ -504,13 +505,13 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assertEqual(inc.readline(),
|
||||
b'PUT /sda1/0/a/c/o HTTP/1.1\r\n')
|
||||
headers = HeaderKeyDict()
|
||||
line = inc.readline()
|
||||
while line and line != b'\r\n':
|
||||
headers[line.split(b':')[0]] = \
|
||||
line.split(b':')[1].strip()
|
||||
line = inc.readline()
|
||||
self.assertIn(b'x-container-timestamp', headers)
|
||||
self.assertIn(b'X-Backend-Storage-Policy-Index',
|
||||
line = bytes_to_wsgi(inc.readline())
|
||||
while line and line != '\r\n':
|
||||
headers[line.split(':')[0]] = \
|
||||
line.split(':')[1].strip()
|
||||
line = bytes_to_wsgi(inc.readline())
|
||||
self.assertIn('x-container-timestamp', headers)
|
||||
self.assertIn('X-Backend-Storage-Policy-Index',
|
||||
headers)
|
||||
except BaseException as err:
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user