Merge "close socket on exceptions"

This commit is contained in:
Jenkins 2013-05-22 13:12:14 +00:00 committed by Gerrit Code Review
commit 6b4cba8371
2 changed files with 31 additions and 8 deletions

View File

@ -88,13 +88,26 @@ class MemcacheRing(object):
self._allow_pickle = allow_pickle self._allow_pickle = allow_pickle
self._allow_unpickle = allow_unpickle or allow_pickle self._allow_unpickle = allow_unpickle or allow_pickle
def _exception_occurred(self, server, e, action='talking'): def _exception_occurred(self, server, e, action='talking',
sock=None, fp=None):
if isinstance(e, socket.timeout): if isinstance(e, socket.timeout):
logging.error(_("Timeout %(action)s to memcached: %(server)s"), logging.error(_("Timeout %(action)s to memcached: %(server)s"),
{'action': action, 'server': server}) {'action': action, 'server': server})
else: else:
logging.exception(_("Error %(action)s to memcached: %(server)s"), logging.exception(_("Error %(action)s to memcached: %(server)s"),
{'action': action, 'server': server}) {'action': action, 'server': server})
try:
if fp:
fp.close()
del fp
except Exception:
pass
try:
if sock:
sock.close()
del sock
except Exception:
pass
now = time.time() now = time.time()
self._errors[server].append(time.time()) self._errors[server].append(time.time())
if len(self._errors[server]) > ERROR_LIMIT_COUNT: if len(self._errors[server]) > ERROR_LIMIT_COUNT:
@ -119,6 +132,7 @@ class MemcacheRing(object):
served.append(server) served.append(server)
if self._error_limited[server] > time.time(): if self._error_limited[server] > time.time():
continue continue
sock = None
try: try:
fp, sock = self._client_cache[server].pop() fp, sock = self._client_cache[server].pop()
yield server, fp, sock yield server, fp, sock
@ -136,7 +150,8 @@ class MemcacheRing(object):
sock.settimeout(self._io_timeout) sock.settimeout(self._io_timeout)
yield server, sock.makefile(), sock yield server, sock.makefile(), sock
except Exception, e: except Exception, e:
self._exception_occurred(server, e, 'connecting') self._exception_occurred(
server, e, action='connecting', sock=sock)
def _return_conn(self, server, fp, sock): def _return_conn(self, server, fp, sock):
""" Returns a server connection to the pool """ """ Returns a server connection to the pool """
@ -182,7 +197,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)
def get(self, key): def get(self, key):
""" """
@ -215,7 +230,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return value return value
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)
def incr(self, key, delta=1, time=0, timeout=0): def incr(self, key, delta=1, time=0, timeout=0):
""" """
@ -267,7 +282,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return ret return ret
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)
raise MemcacheConnectionError("No Memcached connections succeeded.") raise MemcacheConnectionError("No Memcached connections succeeded.")
def decr(self, key, delta=1, time=0, timeout=0): def decr(self, key, delta=1, time=0, timeout=0):
@ -304,7 +319,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)
def set_multi(self, mapping, server_key, serialize=True, timeout=0, def set_multi(self, mapping, server_key, serialize=True, timeout=0,
time=0, min_compress_len=0): time=0, min_compress_len=0):
@ -352,7 +367,7 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return return
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)
def get_multi(self, keys, server_key): def get_multi(self, keys, server_key):
""" """
@ -393,4 +408,4 @@ class MemcacheRing(object):
self._return_conn(server, fp, sock) self._return_conn(server, fp, sock)
return values return values
except Exception, e: except Exception, e:
self._exception_occurred(server, e) self._exception_occurred(server, e, sock=sock, fp=fp)

View File

@ -42,6 +42,8 @@ class ExplodingMockMemcached(object):
self.exploded = True self.exploded = True
raise socket.error() raise socket.error()
def close(self):
pass
class MockMemcached(object): class MockMemcached(object):
@ -52,6 +54,7 @@ class MockMemcached(object):
self.down = False self.down = False
self.exc_on_delete = False self.exc_on_delete = False
self.read_return_none = False self.read_return_none = False
self.close_called = False
def sendall(self, string): def sendall(self, string):
if self.down: if self.down:
@ -130,6 +133,10 @@ class MockMemcached(object):
self.outbuf = self.outbuf[size:] self.outbuf = self.outbuf[size:]
return response return response
def close(self):
self.close_called = True
pass
class TestMemcached(unittest.TestCase): class TestMemcached(unittest.TestCase):
""" Tests for swift.common.memcached""" """ Tests for swift.common.memcached"""
@ -206,6 +213,7 @@ class TestMemcached(unittest.TestCase):
mock.read_return_none = True mock.read_return_none = True
self.assertRaises(memcached.MemcacheConnectionError, self.assertRaises(memcached.MemcacheConnectionError,
memcache_client.incr, 'some_key', delta=-15) memcache_client.incr, 'some_key', delta=-15)
self.assertTrue(mock.close_called)
def test_incr_w_timeout(self): def test_incr_w_timeout(self):
memcache_client = memcached.MemcacheRing(['1.2.3.4:11211']) memcache_client = memcached.MemcacheRing(['1.2.3.4:11211'])