Merge "Add a Timeout when getting Memcache connections"
This commit is contained in:
commit
db89ff9b2e
@ -60,6 +60,7 @@ from swift.common.utils import json
|
||||
DEFAULT_MEMCACHED_PORT = 11211
|
||||
|
||||
CONN_TIMEOUT = 0.3
|
||||
POOL_TIMEOUT = 1.0 # WAG
|
||||
IO_TIMEOUT = 2.0
|
||||
PICKLE_FLAG = 1
|
||||
JSON_FLAG = 2
|
||||
@ -94,6 +95,10 @@ class MemcacheConnectionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class MemcachePoolTimeout(Timeout):
|
||||
pass
|
||||
|
||||
|
||||
class MemcacheConnPool(Pool):
|
||||
"""Connection pool for Memcache Connections"""
|
||||
|
||||
@ -128,8 +133,8 @@ class MemcacheRing(object):
|
||||
"""
|
||||
|
||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
||||
io_timeout=IO_TIMEOUT, tries=TRY_COUNT,
|
||||
allow_pickle=False, allow_unpickle=False,
|
||||
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||
max_conns=2):
|
||||
self._ring = {}
|
||||
self._errors = dict(((serv, []) for serv in servers))
|
||||
@ -145,11 +150,12 @@ class MemcacheRing(object):
|
||||
for server in servers))
|
||||
self._connect_timeout = connect_timeout
|
||||
self._io_timeout = io_timeout
|
||||
self._pool_timeout = pool_timeout
|
||||
self._allow_pickle = allow_pickle
|
||||
self._allow_unpickle = allow_unpickle or allow_pickle
|
||||
|
||||
def _exception_occurred(self, server, e, action='talking',
|
||||
sock=None, fp=None):
|
||||
sock=None, fp=None, got_connection=True):
|
||||
if isinstance(e, Timeout):
|
||||
logging.error(_("Timeout %(action)s to memcached: %(server)s"),
|
||||
{'action': action, 'server': server})
|
||||
@ -168,9 +174,10 @@ class MemcacheRing(object):
|
||||
del sock
|
||||
except Exception:
|
||||
pass
|
||||
# We need to return something to the pool
|
||||
# A new connection will be created the next time it is retreived
|
||||
self._return_conn(server, None, None)
|
||||
if got_connection:
|
||||
# We need to return something to the pool
|
||||
# A new connection will be created the next time it is retreived
|
||||
self._return_conn(server, None, None)
|
||||
now = time.time()
|
||||
self._errors[server].append(time.time())
|
||||
if len(self._errors[server]) > ERROR_LIMIT_COUNT:
|
||||
@ -197,17 +204,13 @@ class MemcacheRing(object):
|
||||
continue
|
||||
sock = None
|
||||
try:
|
||||
# NOTE: We do NOT place a Timeout over the MemcacheConnPool's
|
||||
# get() method. The MemcacheConnPool's create() method already
|
||||
# places a timeout around the connect() system call, which we
|
||||
# catch below. It is possible for the underlying Queue of the
|
||||
# MemcacheConnPool to be contended such that this greenlet is
|
||||
# waiting for one or more connections to be freed up. If there
|
||||
# is a particularly slow memcache server causing that problme,
|
||||
# then the IO_TIMEOUT will catch that behavior, so we do not
|
||||
# have to place a Timeout here.
|
||||
fp, sock = self._client_cache[server].get()
|
||||
with MemcachePoolTimeout(self._pool_timeout):
|
||||
fp, sock = self._client_cache[server].get()
|
||||
yield server, fp, sock
|
||||
except MemcachePoolTimeout as e:
|
||||
self._exception_occurred(
|
||||
server, e, action='getting a connection',
|
||||
got_connection=False)
|
||||
except (Exception, Timeout) as e:
|
||||
# Typically a Timeout exception caught here is the one raised
|
||||
# by the create() method of this server's MemcacheConnPool
|
||||
|
@ -17,6 +17,7 @@
|
||||
"""Tests for swift.common.utils"""
|
||||
|
||||
from __future__ import with_statement
|
||||
from collections import defaultdict
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
@ -27,7 +28,7 @@ from eventlet import GreenPool, sleep, Queue
|
||||
from eventlet.pools import Pool
|
||||
|
||||
from swift.common import memcached
|
||||
from mock import patch
|
||||
from mock import patch, MagicMock
|
||||
from test.unit import NullLoggingHandler
|
||||
|
||||
|
||||
@ -407,6 +408,65 @@ class TestMemcached(unittest.TestCase):
|
||||
connections.get_nowait()
|
||||
self.assertTrue(connections.empty())
|
||||
|
||||
def test_connection_pool_timeout(self):
|
||||
orig_conn_pool = memcached.MemcacheConnPool
|
||||
try:
|
||||
connections = defaultdict(Queue)
|
||||
pending = defaultdict(int)
|
||||
served = defaultdict(int)
|
||||
|
||||
class MockConnectionPool(orig_conn_pool):
|
||||
def get(self):
|
||||
pending[self.server] += 1
|
||||
conn = connections[self.server].get()
|
||||
pending[self.server] -= 1
|
||||
return conn
|
||||
|
||||
def put(self, *args, **kwargs):
|
||||
connections[self.server].put(*args, **kwargs)
|
||||
served[self.server] += 1
|
||||
|
||||
memcached.MemcacheConnPool = MockConnectionPool
|
||||
|
||||
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211',
|
||||
'1.2.3.5:11211'],
|
||||
io_timeout=0.5,
|
||||
pool_timeout=0.1)
|
||||
|
||||
p = GreenPool()
|
||||
for i in range(10):
|
||||
p.spawn(memcache_client.set, 'key', 'value')
|
||||
|
||||
# let everyone block
|
||||
sleep(0)
|
||||
self.assertEqual(pending['1.2.3.5:11211'], 10)
|
||||
|
||||
# hand out a couple slow connection
|
||||
mock_conn = MagicMock(), MagicMock()
|
||||
mock_conn[1].sendall = lambda x: sleep(0.2)
|
||||
connections['1.2.3.5:11211'].put(mock_conn)
|
||||
connections['1.2.3.5:11211'].put(mock_conn)
|
||||
|
||||
# so far so good, everyone is still waiting
|
||||
sleep(0)
|
||||
self.assertEqual(pending['1.2.3.5:11211'], 8)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 0)
|
||||
|
||||
# but they won't wait longer than pool_timeout
|
||||
mock_conn = MagicMock(), MagicMock()
|
||||
connections['1.2.3.4:11211'].put(mock_conn)
|
||||
connections['1.2.3.4:11211'].put(mock_conn)
|
||||
p.waitall()
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.5:11211']), 8)
|
||||
self.assertEqual(served['1.2.3.5:11211'], 2)
|
||||
self.assertEqual(len(memcache_client._errors['1.2.3.4:11211']), 0)
|
||||
self.assertEqual(served['1.2.3.4:11211'], 8)
|
||||
|
||||
# and we never got more put in that we gave out
|
||||
self.assertEqual(connections['1.2.3.5:11211'].qsize(), 2)
|
||||
self.assertEqual(connections['1.2.3.4:11211'].qsize(), 2)
|
||||
finally:
|
||||
memcached.MemcacheConnPool = orig_conn_pool
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user