py3: port common/memcached.py

Change-Id: I7f04b3977971f0581b04180e5372686d8186346f
This commit is contained in:
Tim Burke 2018-02-01 14:30:32 -08:00
parent 4b19ac7723
commit 5cb0869743
3 changed files with 132 additions and 105 deletions

View File

@ -76,7 +76,9 @@ ERROR_LIMIT_DURATION = 60
def md5hash(key):
return md5(key).hexdigest()
if not isinstance(key, bytes):
key = key.encode('utf-8')
return md5(key).hexdigest().encode('ascii')
def sanitize_timeout(timeout):
@ -88,7 +90,21 @@ def sanitize_timeout(timeout):
"""
if timeout > (30 * 24 * 60 * 60):
timeout += time.time()
return timeout
return int(timeout)
def set_msg(key, flags, timeout, value):
if not isinstance(key, bytes):
raise TypeError('key must be bytes')
if not isinstance(value, bytes):
raise TypeError('value must be bytes')
return b' '.join([
b'set',
key,
str(flags).encode('ascii'),
str(timeout).encode('ascii'),
str(len(value)).encode('ascii'),
]) + (b'\r\n' + value + b'\r\n')
class MemcacheConnectionError(Exception):
@ -253,13 +269,15 @@ class MemcacheRing(object):
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
elif serialize:
value = json.dumps(value)
value = json.dumps(value).encode('ascii')
flags |= JSON_FLAG
elif not isinstance(value, bytes):
value = str(value).encode('utf-8')
for (server, fp, sock) in self._get_conns(key):
try:
with Timeout(self._io_timeout):
sock.sendall('set %s %d %d %s\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
sock.sendall(set_msg(key, flags, timeout, value))
# Wait for the set to complete
fp.readline()
self._return_conn(server, fp, sock)
@ -281,14 +299,14 @@ class MemcacheRing(object):
for (server, fp, sock) in self._get_conns(key):
try:
with Timeout(self._io_timeout):
sock.sendall('get %s\r\n' % key)
sock.sendall(b'get ' + key + b'\r\n')
line = fp.readline().strip().split()
while True:
if not line:
raise MemcacheConnectionError('incomplete read')
if line[0].upper() == 'END':
if line[0].upper() == b'END':
break
if line[0].upper() == 'VALUE' and line[1] == key:
if line[0].upper() == b'VALUE' and line[1] == key:
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
@ -297,7 +315,7 @@ class MemcacheRing(object):
else:
value = None
elif int(line[2]) & JSON_FLAG:
value = json.loads(value)
value = json.loads(value.decode('ascii'))
fp.readline()
line = fp.readline().strip().split()
self._return_conn(server, fp, sock)
@ -323,28 +341,31 @@ class MemcacheRing(object):
:raises MemcacheConnectionError:
"""
key = md5hash(key)
command = 'incr'
command = b'incr'
if delta < 0:
command = 'decr'
delta = str(abs(int(delta)))
command = b'decr'
delta = str(abs(int(delta))).encode('ascii')
timeout = sanitize_timeout(time)
for (server, fp, sock) in self._get_conns(key):
try:
with Timeout(self._io_timeout):
sock.sendall('%s %s %s\r\n' % (command, key, delta))
sock.sendall(b' '.join([
command, key, delta]) + b'\r\n')
line = fp.readline().strip().split()
if not line:
raise MemcacheConnectionError('incomplete read')
if line[0].upper() == 'NOT_FOUND':
if line[0].upper() == b'NOT_FOUND':
add_val = delta
if command == 'decr':
add_val = '0'
sock.sendall('add %s %d %d %s\r\n%s\r\n' %
(key, 0, timeout, len(add_val), add_val))
if command == b'decr':
add_val = b'0'
sock.sendall(b' '.join([
b'add', key, b'0', str(timeout).encode('ascii'),
str(len(add_val)).encode('ascii')
]) + b'\r\n' + add_val + b'\r\n')
line = fp.readline().strip().split()
if line[0].upper() == 'NOT_STORED':
sock.sendall('%s %s %s\r\n' % (command, key,
delta))
if line[0].upper() == b'NOT_STORED':
sock.sendall(b' '.join([
command, key, delta]) + b'\r\n')
line = fp.readline().strip().split()
ret = int(line[0].strip())
else:
@ -382,7 +403,7 @@ class MemcacheRing(object):
for (server, fp, sock) in self._get_conns(key):
try:
with Timeout(self._io_timeout):
sock.sendall('delete %s\r\n' % key)
sock.sendall(b'delete ' + key + b'\r\n')
# Wait for the delete to complete
fp.readline()
self._return_conn(server, fp, sock)
@ -409,7 +430,7 @@ class MemcacheRing(object):
"""
server_key = md5hash(server_key)
timeout = sanitize_timeout(time)
msg = ''
msg = []
for key, value in mapping.items():
key = md5hash(key)
flags = 0
@ -417,14 +438,13 @@ class MemcacheRing(object):
value = pickle.dumps(value, PICKLE_PROTOCOL)
flags |= PICKLE_FLAG
elif serialize:
value = json.dumps(value)
value = json.dumps(value).encode('ascii')
flags |= JSON_FLAG
msg += ('set %s %d %d %s\r\n%s\r\n' %
(key, flags, timeout, len(value), value))
msg.append(set_msg(key, flags, timeout, value))
for (server, fp, sock) in self._get_conns(server_key):
try:
with Timeout(self._io_timeout):
sock.sendall(msg)
sock.sendall(b''.join(msg))
# Wait for the set to complete
for line in range(len(mapping)):
fp.readline()
@ -447,15 +467,15 @@ class MemcacheRing(object):
for (server, fp, sock) in self._get_conns(server_key):
try:
with Timeout(self._io_timeout):
sock.sendall('get %s\r\n' % ' '.join(keys))
sock.sendall(b'get ' + b' '.join(keys) + b'\r\n')
line = fp.readline().strip().split()
responses = {}
while True:
if not line:
raise MemcacheConnectionError('incomplete read')
if line[0].upper() == 'END':
if line[0].upper() == b'END':
break
if line[0].upper() == 'VALUE':
if line[0].upper() == b'VALUE':
size = int(line[3])
value = fp.read(size)
if int(line[2]) & PICKLE_FLAG:
@ -464,7 +484,7 @@ class MemcacheRing(object):
else:
value = None
elif int(line[2]) & JSON_FLAG:
value = json.loads(value)
value = json.loads(value.decode('ascii'))
responses[line[1]] = value
fp.readline()
line = fp.readline().strip().split()

View File

@ -71,8 +71,8 @@ class MockMemcached(object):
# In particular, the "Storage commands" section may be interesting.
def __init__(self):
self.inbuf = ''
self.outbuf = ''
self.inbuf = b''
self.outbuf = b''
self.cache = {}
self.down = False
self.exc_on_delete = False
@ -84,81 +84,86 @@ class MockMemcached(object):
if self.down:
raise Exception('mock is down')
self.inbuf += string
while '\n' in self.inbuf:
cmd, self.inbuf = self.inbuf.split('\n', 1)
while b'\n' in self.inbuf:
cmd, self.inbuf = self.inbuf.split(b'\n', 1)
parts = cmd.split()
handler = getattr(self, 'handle_%s' % parts[0].lower(), None)
cmd_name = parts[0].decode('ascii').lower()
handler = getattr(self, 'handle_%s' % cmd_name, None)
if handler:
handler(*parts[1:])
else:
raise ValueError('Unhandled command: %s' % parts[0])
def handle_set(self, key, flags, exptime, num_bytes, noreply=''):
def handle_set(self, key, flags, exptime, num_bytes, noreply=b''):
self.cache[key] = flags, exptime, self.inbuf[:int(num_bytes)]
self.inbuf = self.inbuf[int(num_bytes) + 2:]
if noreply != 'noreply':
self.outbuf += 'STORED\r\n'
if noreply != b'noreply':
self.outbuf += b'STORED\r\n'
def handle_add(self, key, flags, exptime, num_bytes, noreply=''):
def handle_add(self, key, flags, exptime, num_bytes, noreply=b''):
value = self.inbuf[:int(num_bytes)]
self.inbuf = self.inbuf[int(num_bytes) + 2:]
if key in self.cache:
if noreply != 'noreply':
self.outbuf += 'NOT_STORED\r\n'
if noreply != b'noreply':
self.outbuf += b'NOT_STORED\r\n'
else:
self.cache[key] = flags, exptime, value
if noreply != 'noreply':
self.outbuf += 'STORED\r\n'
if noreply != b'noreply':
self.outbuf += b'STORED\r\n'
def handle_delete(self, key, noreply=''):
def handle_delete(self, key, noreply=b''):
if self.exc_on_delete:
raise Exception('mock is has exc_on_delete set')
if key in self.cache:
del self.cache[key]
if noreply != 'noreply':
self.outbuf += 'DELETED\r\n'
elif noreply != 'noreply':
self.outbuf += 'NOT_FOUND\r\n'
if noreply != b'noreply':
self.outbuf += b'DELETED\r\n'
elif noreply != b'noreply':
self.outbuf += b'NOT_FOUND\r\n'
def handle_get(self, *keys):
for key in keys:
if key in self.cache:
val = self.cache[key]
self.outbuf += 'VALUE %s %s %s\r\n' % (
key, val[0], len(val[2]))
self.outbuf += val[2] + '\r\n'
self.outbuf += 'END\r\n'
self.outbuf += b' '.join([
b'VALUE',
key,
val[0],
str(len(val[2])).encode('ascii')
]) + b'\r\n'
self.outbuf += val[2] + b'\r\n'
self.outbuf += b'END\r\n'
def handle_incr(self, key, value, noreply=''):
def handle_incr(self, key, value, noreply=b''):
if key in self.cache:
current = self.cache[key][2]
new_val = str(int(current) + int(value))
new_val = str(int(current) + int(value)).encode('ascii')
self.cache[key] = self.cache[key][:2] + (new_val, )
self.outbuf += str(new_val) + '\r\n'
self.outbuf += new_val + b'\r\n'
else:
self.outbuf += 'NOT_FOUND\r\n'
self.outbuf += b'NOT_FOUND\r\n'
def handle_decr(self, key, value, noreply=''):
def handle_decr(self, key, value, noreply=b''):
if key in self.cache:
current = self.cache[key][2]
new_val = str(int(current) - int(value))
if new_val[0] == '-': # ie, val is negative
new_val = '0'
new_val = str(int(current) - int(value)).encode('ascii')
if new_val[:1] == b'-': # ie, val is negative
new_val = b'0'
self.cache[key] = self.cache[key][:2] + (new_val, )
self.outbuf += str(new_val) + '\r\n'
self.outbuf += new_val + b'\r\n'
else:
self.outbuf += 'NOT_FOUND\r\n'
self.outbuf += b'NOT_FOUND\r\n'
def readline(self):
if self.read_return_empty_str:
return ''
return b''
if self.read_return_none:
return None
if self.down:
raise Exception('mock is down')
if '\n' in self.outbuf:
response, self.outbuf = self.outbuf.split('\n', 1)
return response + '\n'
if b'\n' in self.outbuf:
response, self.outbuf = self.outbuf.split(b'\n', 1)
return response + b'\n'
def read(self, size):
if self.down:
@ -199,7 +204,7 @@ class TestMemcached(unittest.TestCase):
memcache_client = memcached.MemcacheRing([sock1ipport, sock2ip])
one = two = True
while one or two: # Run until we match hosts one and two
key = uuid4().hex
key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key):
peeripport = '%s:%s' % conn[2].getpeername()
self.assertTrue(peeripport in (sock1ipport, sock2ipport))
@ -222,7 +227,7 @@ class TestMemcached(unittest.TestCase):
sock_addr = sock.getsockname()
server_socket = '[%s]:%s' % (sock_addr[0], sock_addr[1])
memcache_client = memcached.MemcacheRing([server_socket])
key = uuid4().hex
key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key):
peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
@ -243,7 +248,7 @@ class TestMemcached(unittest.TestCase):
server_host = '[%s]' % sock_addr[0]
memcached.DEFAULT_MEMCACHED_PORT = sock_addr[1]
memcache_client = memcached.MemcacheRing([server_host])
key = uuid4().hex
key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key):
peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0], peer_sockaddr[1])
@ -271,7 +276,7 @@ class TestMemcached(unittest.TestCase):
socket.SOCK_STREAM, 0, '',
('127.0.0.1', sock_addr[1]))]
memcache_client = memcached.MemcacheRing([server_socket])
key = uuid4().hex
key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key):
peer_sockaddr = conn[2].getpeername()
peer_socket = '%s:%s' % (peer_sockaddr[0],
@ -296,7 +301,7 @@ class TestMemcached(unittest.TestCase):
socket.SOCK_STREAM, 0, '',
('::1', sock_addr[1]))]
memcache_client = memcached.MemcacheRing([server_socket])
key = uuid4().hex
key = uuid4().hex.encode('ascii')
for conn in memcache_client._get_conns(key):
peer_sockaddr = conn[2].getpeername()
peer_socket = '[%s]:%s' % (peer_sockaddr[0],
@ -312,16 +317,16 @@ class TestMemcached(unittest.TestCase):
mock = MockMemcached()
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock, mock)] * 2)
cache_key = md5('some_key').hexdigest()
cache_key = md5(b'some_key').hexdigest().encode('ascii')
memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
# See JSON_FLAG
self.assertEqual(mock.cache, {cache_key: ('2', '0', '[1, 2, 3]')})
self.assertEqual(mock.cache, {cache_key: (b'2', b'0', b'[1, 2, 3]')})
memcache_client.set('some_key', [4, 5, 6])
self.assertEqual(memcache_client.get('some_key'), [4, 5, 6])
self.assertEqual(mock.cache, {cache_key: ('2', '0', '[4, 5, 6]')})
self.assertEqual(mock.cache, {cache_key: (b'2', b'0', b'[4, 5, 6]')})
memcache_client.set('some_key', ['simple str', 'utf8 str éà'])
# As per http://wiki.openstack.org/encoding,
@ -329,10 +334,10 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(
memcache_client.get('some_key'), ['simple str', u'utf8 str éà'])
self.assertEqual(mock.cache, {cache_key: (
'2', '0', '["simple str", "utf8 str \\u00e9\\u00e0"]')})
b'2', b'0', b'["simple str", "utf8 str \\u00e9\\u00e0"]')})
memcache_client.set('some_key', [1, 2, 3], time=20)
self.assertEqual(mock.cache, {cache_key: ('2', '20', '[1, 2, 3]')})
self.assertEqual(mock.cache, {cache_key: (b'2', b'20', b'[1, 2, 3]')})
sixtydays = 60 * 24 * 60 * 60
esttimeout = time.time() + sixtydays
@ -347,7 +352,8 @@ class TestMemcached(unittest.TestCase):
[(mock, mock)] * 2)
memcache_client.set('some_key', [1, 2, 3])
self.assertEqual(memcache_client.get('some_key'), [1, 2, 3])
self.assertEqual(mock.cache.values()[0][1], '0')
self.assertEqual(list(mock.cache.values()),
[(b'2', b'0', b'[1, 2, 3]')])
# Now lets return an empty string, and make sure we aren't logging
# the error.
@ -371,15 +377,15 @@ class TestMemcached(unittest.TestCase):
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock, mock)] * 2)
self.assertEqual(memcache_client.incr('some_key', delta=5), 5)
self.assertEqual(memcache_client.get('some_key'), '5')
self.assertEqual(memcache_client.get('some_key'), b'5')
self.assertEqual(memcache_client.incr('some_key', delta=5), 10)
self.assertEqual(memcache_client.get('some_key'), '10')
self.assertEqual(memcache_client.get('some_key'), b'10')
self.assertEqual(memcache_client.incr('some_key', delta=1), 11)
self.assertEqual(memcache_client.get('some_key'), '11')
self.assertEqual(memcache_client.get('some_key'), b'11')
self.assertEqual(memcache_client.incr('some_key', delta=-5), 6)
self.assertEqual(memcache_client.get('some_key'), '6')
self.assertEqual(memcache_client.get('some_key'), b'6')
self.assertEqual(memcache_client.incr('some_key', delta=-15), 0)
self.assertEqual(memcache_client.get('some_key'), '0')
self.assertEqual(memcache_client.get('some_key'), b'0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15)
@ -391,9 +397,9 @@ class TestMemcached(unittest.TestCase):
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock, mock)] * 2)
self.assertEqual(memcache_client.incr('some_key', delta=5), 5)
self.assertEqual(memcache_client.get('some_key'), '5')
self.assertEqual(memcache_client.get('some_key'), b'5')
self.assertEqual(memcache_client.incr('some_key', delta=5), 10)
self.assertEqual(memcache_client.get('some_key'), '10')
self.assertEqual(memcache_client.get('some_key'), b'10')
# Now lets return an empty string, and make sure we aren't logging
# the error.
@ -417,11 +423,11 @@ class TestMemcached(unittest.TestCase):
mock = MockMemcached()
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock, mock)] * 2)
cache_key = md5('some_key').hexdigest()
cache_key = md5(b'some_key').hexdigest().encode('ascii')
memcache_client.incr('some_key', delta=5, time=55)
self.assertEqual(memcache_client.get('some_key'), '5')
self.assertEqual(mock.cache, {cache_key: ('0', '55', '5')})
self.assertEqual(memcache_client.get('some_key'), b'5')
self.assertEqual(mock.cache, {cache_key: (b'0', b'55', b'5')})
memcache_client.delete('some_key')
self.assertIsNone(memcache_client.get('some_key'))
@ -429,7 +435,7 @@ class TestMemcached(unittest.TestCase):
fiftydays = 50 * 24 * 60 * 60
esttimeout = time.time() + fiftydays
memcache_client.incr('some_key', delta=5, time=fiftydays)
self.assertEqual(memcache_client.get('some_key'), '5')
self.assertEqual(memcache_client.get('some_key'), b'5')
_junk, cache_timeout, _junk = mock.cache[cache_key]
self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1)
@ -437,12 +443,12 @@ class TestMemcached(unittest.TestCase):
self.assertIsNone(memcache_client.get('some_key'))
memcache_client.incr('some_key', delta=5)
self.assertEqual(memcache_client.get('some_key'), '5')
self.assertEqual(mock.cache, {cache_key: ('0', '0', '5')})
self.assertEqual(memcache_client.get('some_key'), b'5')
self.assertEqual(mock.cache, {cache_key: (b'0', b'0', b'5')})
memcache_client.incr('some_key', delta=5, time=55)
self.assertEqual(memcache_client.get('some_key'), '10')
self.assertEqual(mock.cache, {cache_key: ('0', '0', '10')})
self.assertEqual(memcache_client.get('some_key'), b'10')
self.assertEqual(mock.cache, {cache_key: (b'0', b'0', b'10')})
def test_decr(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])
@ -450,13 +456,13 @@ class TestMemcached(unittest.TestCase):
memcache_client._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock, mock)] * 2)
self.assertEqual(memcache_client.decr('some_key', delta=5), 0)
self.assertEqual(memcache_client.get('some_key'), '0')
self.assertEqual(memcache_client.get('some_key'), b'0')
self.assertEqual(memcache_client.incr('some_key', delta=15), 15)
self.assertEqual(memcache_client.get('some_key'), '15')
self.assertEqual(memcache_client.get('some_key'), b'15')
self.assertEqual(memcache_client.decr('some_key', delta=4), 11)
self.assertEqual(memcache_client.get('some_key'), '11')
self.assertEqual(memcache_client.get('some_key'), b'11')
self.assertEqual(memcache_client.decr('some_key', delta=15), 0)
self.assertEqual(memcache_client.get('some_key'), '0')
self.assertEqual(memcache_client.get('some_key'), b'0')
mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.decr, 'some_key', delta=15)
@ -510,27 +516,27 @@ class TestMemcached(unittest.TestCase):
self.assertEqual(
memcache_client.get_multi(('some_key2', 'some_key1'), 'multi_key'),
[[4, 5, 6], [1, 2, 3]])
for key in ('some_key1', 'some_key2'):
key = md5(key).hexdigest()
for key in (b'some_key1', b'some_key2'):
key = md5(key).hexdigest().encode('ascii')
self.assertIn(key, mock.cache)
_junk, cache_timeout, _junk = mock.cache[key]
self.assertEqual(cache_timeout, '0')
self.assertEqual(cache_timeout, b'0')
memcache_client.set_multi(
{'some_key1': [1, 2, 3], 'some_key2': [4, 5, 6]}, 'multi_key',
time=20)
for key in ('some_key1', 'some_key2'):
key = md5(key).hexdigest()
for key in (b'some_key1', b'some_key2'):
key = md5(key).hexdigest().encode('ascii')
_junk, cache_timeout, _junk = mock.cache[key]
self.assertEqual(cache_timeout, '20')
self.assertEqual(cache_timeout, b'20')
fortydays = 50 * 24 * 60 * 60
esttimeout = time.time() + fortydays
memcache_client.set_multi(
{'some_key1': [1, 2, 3], 'some_key2': [4, 5, 6]}, 'multi_key',
time=fortydays)
for key in ('some_key1', 'some_key2'):
key = md5(key).hexdigest()
for key in (b'some_key1', b'some_key2'):
key = md5(key).hexdigest().encode('ascii')
_junk, cache_timeout, _junk = mock.cache[key]
self.assertAlmostEqual(float(cache_timeout), esttimeout, delta=1)
self.assertEqual(memcache_client.get_multi(

View File

@ -36,6 +36,7 @@ commands =
test/unit/common/test_exceptions.py \
test/unit/common/test_header_key_dict.py \
test/unit/common/test_linkat.py \
test/unit/common/test_memcached.py \
test/unit/common/test_manager.py \
test/unit/common/test_splice.py \
test/unit/common/test_storage_policy.py \