Merge "memcache: Add an item_size_warning_threshold option"
This commit is contained in:
commit
36d32b907c
etc
swift/common
test/unit/common
@ -56,3 +56,9 @@
|
|||||||
# specified in tls_certfile. If tls_enabled is False, this option
|
# specified in tls_certfile. If tls_enabled is False, this option
|
||||||
# is ignored.
|
# is ignored.
|
||||||
# tls_keyfile =
|
# tls_keyfile =
|
||||||
|
#
|
||||||
|
# If an item size ever gets above item_size_warning_threshold then a warning will be
|
||||||
|
# logged. This can be used to alert when memcache item sizes are getting to their limit.
|
||||||
|
# It's an absolute size in bytes. Setting the value to 0 will warn on every memcache set.
|
||||||
|
# A value of -1 disables the warning.
|
||||||
|
# item_size_warning_threshold = -1
|
||||||
|
@ -56,7 +56,7 @@ from eventlet.pools import Pool
|
|||||||
from eventlet import Timeout
|
from eventlet import Timeout
|
||||||
from six.moves import range
|
from six.moves import range
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import md5
|
from swift.common.utils import md5, human_readable
|
||||||
|
|
||||||
DEFAULT_MEMCACHED_PORT = 11211
|
DEFAULT_MEMCACHED_PORT = 11211
|
||||||
|
|
||||||
@ -73,6 +73,7 @@ TRY_COUNT = 3
|
|||||||
# will be considered failed for ERROR_LIMIT_DURATION seconds.
|
# will be considered failed for ERROR_LIMIT_DURATION seconds.
|
||||||
ERROR_LIMIT_COUNT = 10
|
ERROR_LIMIT_COUNT = 10
|
||||||
ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60
|
ERROR_LIMIT_TIME = ERROR_LIMIT_DURATION = 60
|
||||||
|
DEFAULT_ITEM_SIZE_WARNING_THRESHOLD = -1
|
||||||
|
|
||||||
|
|
||||||
def md5hash(key):
|
def md5hash(key):
|
||||||
@ -172,13 +173,15 @@ class MemcacheRing(object):
|
|||||||
Simple, consistent-hashed memcache client.
|
Simple, consistent-hashed memcache client.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, servers, connect_timeout=CONN_TIMEOUT,
|
def __init__(
|
||||||
|
self, servers, connect_timeout=CONN_TIMEOUT,
|
||||||
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
io_timeout=IO_TIMEOUT, pool_timeout=POOL_TIMEOUT,
|
||||||
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
tries=TRY_COUNT, allow_pickle=False, allow_unpickle=False,
|
||||||
max_conns=2, tls_context=None, logger=None,
|
max_conns=2, tls_context=None, logger=None,
|
||||||
error_limit_count=ERROR_LIMIT_COUNT,
|
error_limit_count=ERROR_LIMIT_COUNT,
|
||||||
error_limit_time=ERROR_LIMIT_TIME,
|
error_limit_time=ERROR_LIMIT_TIME,
|
||||||
error_limit_duration=ERROR_LIMIT_DURATION):
|
error_limit_duration=ERROR_LIMIT_DURATION,
|
||||||
|
item_size_warning_threshold=DEFAULT_ITEM_SIZE_WARNING_THRESHOLD):
|
||||||
self._ring = {}
|
self._ring = {}
|
||||||
self._errors = dict(((serv, []) for serv in servers))
|
self._errors = dict(((serv, []) for serv in servers))
|
||||||
self._error_limited = dict(((serv, 0) for serv in servers))
|
self._error_limited = dict(((serv, 0) for serv in servers))
|
||||||
@ -203,6 +206,7 @@ class MemcacheRing(object):
|
|||||||
self.logger = logging.getLogger()
|
self.logger = logging.getLogger()
|
||||||
else:
|
else:
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
|
self.item_size_warning_threshold = item_size_warning_threshold
|
||||||
|
|
||||||
def _exception_occurred(self, server, e, action='talking',
|
def _exception_occurred(self, server, e, action='talking',
|
||||||
sock=None, fp=None, got_connection=True):
|
sock=None, fp=None, got_connection=True):
|
||||||
@ -325,6 +329,13 @@ class MemcacheRing(object):
|
|||||||
"Error setting value in memcached: "
|
"Error setting value in memcached: "
|
||||||
"%(server)s: %(msg)s",
|
"%(server)s: %(msg)s",
|
||||||
{'server': server, 'msg': msg})
|
{'server': server, 'msg': msg})
|
||||||
|
if 0 <= self.item_size_warning_threshold <= len(value):
|
||||||
|
self.logger.warning(
|
||||||
|
"Item size larger than warning threshold: "
|
||||||
|
"%d (%s) >= %d (%s)", len(value),
|
||||||
|
human_readable(len(value)),
|
||||||
|
self.item_size_warning_threshold,
|
||||||
|
human_readable(self.item_size_warning_threshold))
|
||||||
self._return_conn(server, fp, sock)
|
self._return_conn(server, fp, sock)
|
||||||
return
|
return
|
||||||
except (Exception, Timeout) as e:
|
except (Exception, Timeout) as e:
|
||||||
|
@ -20,7 +20,7 @@ from six.moves.configparser import ConfigParser, NoSectionError, NoOptionError
|
|||||||
|
|
||||||
from swift.common.memcached import (
|
from swift.common.memcached import (
|
||||||
MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT,
|
MemcacheRing, CONN_TIMEOUT, POOL_TIMEOUT, IO_TIMEOUT, TRY_COUNT,
|
||||||
ERROR_LIMIT_COUNT, ERROR_LIMIT_TIME)
|
ERROR_LIMIT_COUNT, ERROR_LIMIT_TIME, DEFAULT_ITEM_SIZE_WARNING_THRESHOLD)
|
||||||
from swift.common.utils import get_logger, config_true_value
|
from swift.common.utils import get_logger, config_true_value
|
||||||
|
|
||||||
|
|
||||||
@ -103,6 +103,9 @@ class MemcacheMiddleware(object):
|
|||||||
'error_suppression_interval', ERROR_LIMIT_TIME))
|
'error_suppression_interval', ERROR_LIMIT_TIME))
|
||||||
error_suppression_limit = float(memcache_options.get(
|
error_suppression_limit = float(memcache_options.get(
|
||||||
'error_suppression_limit', ERROR_LIMIT_COUNT))
|
'error_suppression_limit', ERROR_LIMIT_COUNT))
|
||||||
|
item_size_warning_threshold = int(memcache_options.get(
|
||||||
|
'item_size_warning_threshold',
|
||||||
|
DEFAULT_ITEM_SIZE_WARNING_THRESHOLD))
|
||||||
|
|
||||||
if not self.memcache_servers:
|
if not self.memcache_servers:
|
||||||
self.memcache_servers = '127.0.0.1:11211'
|
self.memcache_servers = '127.0.0.1:11211'
|
||||||
@ -126,7 +129,8 @@ class MemcacheMiddleware(object):
|
|||||||
logger=self.logger,
|
logger=self.logger,
|
||||||
error_limit_count=error_suppression_limit,
|
error_limit_count=error_suppression_limit,
|
||||||
error_limit_time=error_suppression_interval,
|
error_limit_time=error_suppression_interval,
|
||||||
error_limit_duration=error_suppression_interval)
|
error_limit_duration=error_suppression_interval,
|
||||||
|
item_size_warning_threshold=item_size_warning_threshold)
|
||||||
|
|
||||||
def __call__(self, env, start_response):
|
def __call__(self, env, start_response):
|
||||||
env['swift.cache'] = self.memcache
|
env['swift.cache'] = self.memcache
|
||||||
|
@ -49,11 +49,13 @@ class EmptyConfigParser(object):
|
|||||||
def get_config_parser(memcache_servers='1.2.3.4:5',
|
def get_config_parser(memcache_servers='1.2.3.4:5',
|
||||||
memcache_serialization_support='1',
|
memcache_serialization_support='1',
|
||||||
memcache_max_connections='4',
|
memcache_max_connections='4',
|
||||||
section='memcache'):
|
section='memcache',
|
||||||
|
item_size_warning_threshold='75'):
|
||||||
_srvs = memcache_servers
|
_srvs = memcache_servers
|
||||||
_sers = memcache_serialization_support
|
_sers = memcache_serialization_support
|
||||||
_maxc = memcache_max_connections
|
_maxc = memcache_max_connections
|
||||||
_section = section
|
_section = section
|
||||||
|
_warn_threshold = item_size_warning_threshold
|
||||||
|
|
||||||
class SetConfigParser(object):
|
class SetConfigParser(object):
|
||||||
|
|
||||||
@ -64,7 +66,7 @@ def get_config_parser(memcache_servers='1.2.3.4:5',
|
|||||||
'memcache_servers': memcache_servers,
|
'memcache_servers': memcache_servers,
|
||||||
'memcache_serialization_support':
|
'memcache_serialization_support':
|
||||||
memcache_serialization_support,
|
memcache_serialization_support,
|
||||||
'memcache_max_connections': memcache_max_connections,
|
'memcache_max_connections': memcache_max_connections
|
||||||
}
|
}
|
||||||
|
|
||||||
def read(self, path):
|
def read(self, path):
|
||||||
@ -85,6 +87,10 @@ def get_config_parser(memcache_servers='1.2.3.4:5',
|
|||||||
if _maxc == 'error':
|
if _maxc == 'error':
|
||||||
raise NoOptionError(option, section)
|
raise NoOptionError(option, section)
|
||||||
return _maxc
|
return _maxc
|
||||||
|
elif option == 'item_size_warning_threshold':
|
||||||
|
if _warn_threshold == 'error':
|
||||||
|
raise NoOptionError(option, section)
|
||||||
|
return _warn_threshold
|
||||||
else:
|
else:
|
||||||
raise NoOptionError(option, section)
|
raise NoOptionError(option, section)
|
||||||
else:
|
else:
|
||||||
@ -114,12 +120,19 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
{'memcache_servers': '6.7.8.9:10'},
|
{'memcache_servers': '6.7.8.9:10'},
|
||||||
{'memcache_serialization_support': '0'},
|
{'memcache_serialization_support': '0'},
|
||||||
{'memcache_max_connections': '30'},
|
{'memcache_max_connections': '30'},
|
||||||
|
{'item_size_warning_threshold': 75},
|
||||||
{'memcache_servers': '6.7.8.9:10',
|
{'memcache_servers': '6.7.8.9:10',
|
||||||
'memcache_serialization_support': '0'},
|
'memcache_serialization_support': '0'},
|
||||||
{'memcache_servers': '6.7.8.9:10',
|
{'memcache_servers': '6.7.8.9:10',
|
||||||
'memcache_max_connections': '30'},
|
'memcache_max_connections': '30'},
|
||||||
{'memcache_serialization_support': '0',
|
{'memcache_serialization_support': '0',
|
||||||
'memcache_max_connections': '30'}
|
'memcache_max_connections': '30'},
|
||||||
|
{'memcache_servers': '6.7.8.9:10',
|
||||||
|
'item_size_warning_threshold': '75'},
|
||||||
|
{'memcache_serialization_support': '0',
|
||||||
|
'item_size_warning_threshold': '75'},
|
||||||
|
{'item_size_warning_threshold': '75',
|
||||||
|
'memcache_max_connections': '30'},
|
||||||
):
|
):
|
||||||
with self.assertRaises(Exception) as catcher:
|
with self.assertRaises(Exception) as catcher:
|
||||||
memcache.MemcacheMiddleware(FakeApp(), d)
|
memcache.MemcacheMiddleware(FakeApp(), d)
|
||||||
@ -134,7 +147,8 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
memcache.MemcacheMiddleware(
|
memcache.MemcacheMiddleware(
|
||||||
FakeApp(), {'memcache_servers': '1.2.3.4:5',
|
FakeApp(), {'memcache_servers': '1.2.3.4:5',
|
||||||
'memcache_serialization_support': '2',
|
'memcache_serialization_support': '2',
|
||||||
'memcache_max_connections': '30'})
|
'memcache_max_connections': '30',
|
||||||
|
'item_size_warning_threshold': '80'})
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
exc = err
|
exc = err
|
||||||
self.assertIsNone(exc)
|
self.assertIsNone(exc)
|
||||||
@ -147,6 +161,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertEqual(app.memcache._allow_unpickle, False)
|
self.assertEqual(app.memcache._allow_unpickle, False)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
app.memcache._client_cache['127.0.0.1:11211'].max_size, 2)
|
app.memcache._client_cache['127.0.0.1:11211'].max_size, 2)
|
||||||
|
self.assertEqual(app.memcache.item_size_warning_threshold, -1)
|
||||||
|
|
||||||
def test_conf_inline(self):
|
def test_conf_inline(self):
|
||||||
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
||||||
@ -154,12 +169,14 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
FakeApp(),
|
FakeApp(),
|
||||||
{'memcache_servers': '6.7.8.9:10',
|
{'memcache_servers': '6.7.8.9:10',
|
||||||
'memcache_serialization_support': '0',
|
'memcache_serialization_support': '0',
|
||||||
'memcache_max_connections': '5'})
|
'memcache_max_connections': '5',
|
||||||
|
'item_size_warning_threshold': '75'})
|
||||||
self.assertEqual(app.memcache_servers, '6.7.8.9:10')
|
self.assertEqual(app.memcache_servers, '6.7.8.9:10')
|
||||||
self.assertEqual(app.memcache._allow_pickle, True)
|
self.assertEqual(app.memcache._allow_pickle, True)
|
||||||
self.assertEqual(app.memcache._allow_unpickle, True)
|
self.assertEqual(app.memcache._allow_unpickle, True)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
app.memcache._client_cache['6.7.8.9:10'].max_size, 5)
|
app.memcache._client_cache['6.7.8.9:10'].max_size, 5)
|
||||||
|
self.assertEqual(app.memcache.item_size_warning_threshold, 75)
|
||||||
|
|
||||||
def test_conf_inline_ratelimiting(self):
|
def test_conf_inline_ratelimiting(self):
|
||||||
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
||||||
@ -235,6 +252,17 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
app.memcache._client_cache['6.7.8.9:10'].max_size, 4)
|
app.memcache._client_cache['6.7.8.9:10'].max_size, 4)
|
||||||
|
|
||||||
|
def test_conf_inline_bad_item_warning_threshold(self):
|
||||||
|
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
||||||
|
with self.assertRaises(ValueError) as err:
|
||||||
|
memcache.MemcacheMiddleware(
|
||||||
|
FakeApp(),
|
||||||
|
{'memcache_servers': '6.7.8.9:10',
|
||||||
|
'memcache_serialization_support': '0',
|
||||||
|
'item_size_warning_threshold': 'bad42'})
|
||||||
|
self.assertIn('invalid literal for int() with base 10:',
|
||||||
|
str(err.exception))
|
||||||
|
|
||||||
def test_conf_from_extra_conf(self):
|
def test_conf_from_extra_conf(self):
|
||||||
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
with mock.patch.object(memcache, 'ConfigParser', get_config_parser()):
|
||||||
app = memcache.MemcacheMiddleware(FakeApp(), {})
|
app = memcache.MemcacheMiddleware(FakeApp(), {})
|
||||||
@ -332,6 +360,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
# tries is limited to server count
|
# tries is limited to server count
|
||||||
self.assertEqual(memcache_ring._tries, 1)
|
self.assertEqual(memcache_ring._tries, 1)
|
||||||
self.assertEqual(memcache_ring._io_timeout, 2.0)
|
self.assertEqual(memcache_ring._io_timeout, 2.0)
|
||||||
|
self.assertEqual(memcache_ring.item_size_warning_threshold, -1)
|
||||||
|
|
||||||
@with_tempdir
|
@with_tempdir
|
||||||
def test_real_config_with_options(self, tempdir):
|
def test_real_config_with_options(self, tempdir):
|
||||||
@ -351,6 +380,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
tries = 4
|
tries = 4
|
||||||
io_timeout = 1.0
|
io_timeout = 1.0
|
||||||
tls_enabled = true
|
tls_enabled = true
|
||||||
|
item_size_warning_threshold = 1000
|
||||||
"""
|
"""
|
||||||
config_path = os.path.join(tempdir, 'test.conf')
|
config_path = os.path.join(tempdir, 'test.conf')
|
||||||
with open(config_path, 'w') as f:
|
with open(config_path, 'w') as f:
|
||||||
@ -370,6 +400,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertIsInstance(
|
self.assertIsInstance(
|
||||||
list(memcache_ring._client_cache.values())[0]._tls_context,
|
list(memcache_ring._client_cache.values())[0]._tls_context,
|
||||||
ssl.SSLContext)
|
ssl.SSLContext)
|
||||||
|
self.assertEqual(memcache_ring.item_size_warning_threshold, 1000)
|
||||||
|
|
||||||
@with_tempdir
|
@with_tempdir
|
||||||
def test_real_memcache_config(self, tempdir):
|
def test_real_memcache_config(self, tempdir):
|
||||||
@ -399,6 +430,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
io_timeout = 1.0
|
io_timeout = 1.0
|
||||||
error_suppression_limit = 0
|
error_suppression_limit = 0
|
||||||
error_suppression_interval = 1.5
|
error_suppression_interval = 1.5
|
||||||
|
item_size_warning_threshold = 50
|
||||||
"""
|
"""
|
||||||
memcache_config_path = os.path.join(tempdir, 'memcache.conf')
|
memcache_config_path = os.path.join(tempdir, 'memcache.conf')
|
||||||
with open(memcache_config_path, 'w') as f:
|
with open(memcache_config_path, 'w') as f:
|
||||||
@ -415,6 +447,7 @@ class TestCacheMiddleware(unittest.TestCase):
|
|||||||
self.assertEqual(memcache_ring._error_limit_count, 0)
|
self.assertEqual(memcache_ring._error_limit_count, 0)
|
||||||
self.assertEqual(memcache_ring._error_limit_time, 1.5)
|
self.assertEqual(memcache_ring._error_limit_time, 1.5)
|
||||||
self.assertEqual(memcache_ring._error_limit_duration, 1.5)
|
self.assertEqual(memcache_ring._error_limit_duration, 1.5)
|
||||||
|
self.assertEqual(memcache_ring.item_size_warning_threshold, 50)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -15,7 +15,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
"""Tests for swift.common.utils"""
|
"""Tests for swift.common.utils"""
|
||||||
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import errno
|
import errno
|
||||||
import io
|
import io
|
||||||
@ -33,7 +32,7 @@ from eventlet import GreenPool, sleep, Queue
|
|||||||
from eventlet.pools import Pool
|
from eventlet.pools import Pool
|
||||||
|
|
||||||
from swift.common import memcached
|
from swift.common import memcached
|
||||||
from swift.common.utils import md5
|
from swift.common.utils import md5, human_readable
|
||||||
from mock import patch, MagicMock
|
from mock import patch, MagicMock
|
||||||
from test.debug_logger import debug_logger
|
from test.debug_logger import debug_logger
|
||||||
|
|
||||||
@ -947,6 +946,56 @@ class TestMemcached(unittest.TestCase):
|
|||||||
|
|
||||||
self.assertEqual(1, mock_sock.close.call_count)
|
self.assertEqual(1, mock_sock.close.call_count)
|
||||||
|
|
||||||
|
def test_item_size_warning_threshold(self):
|
||||||
|
mock = MockMemcached()
|
||||||
|
mocked_pool = MockedMemcachePool([(mock, mock)] * 2)
|
||||||
|
|
||||||
|
def do_test(d, threshold, should_warn, error=False):
|
||||||
|
self.logger.clear()
|
||||||
|
try:
|
||||||
|
memcache_client = memcached.MemcacheRing(
|
||||||
|
['1.2.3.4:11211'], item_size_warning_threshold=threshold,
|
||||||
|
logger=self.logger)
|
||||||
|
memcache_client._client_cache['1.2.3.4:11211'] = mocked_pool
|
||||||
|
memcache_client.set('some_key', d, serialize=False)
|
||||||
|
warning_lines = self.logger.get_lines_for_level('warning')
|
||||||
|
if should_warn:
|
||||||
|
self.assertIn(
|
||||||
|
'Item size larger than warning threshold: '
|
||||||
|
'%d (%s) >= %d (%s)' % (
|
||||||
|
len(d), human_readable(len(d)), threshold,
|
||||||
|
human_readable(threshold)),
|
||||||
|
warning_lines[0])
|
||||||
|
else:
|
||||||
|
self.assertFalse(warning_lines)
|
||||||
|
except ValueError as err:
|
||||||
|
if not err:
|
||||||
|
self.fail(err)
|
||||||
|
else:
|
||||||
|
self.assertIn(
|
||||||
|
'Config option must be a number, greater than 0, '
|
||||||
|
'less than 100, not "%s".' % threshold,
|
||||||
|
str(err))
|
||||||
|
|
||||||
|
data = '1' * 100
|
||||||
|
# let's start with something easy, say warning at 80
|
||||||
|
for data_size, warn in ((79, False), (80, True), (81, True),
|
||||||
|
(99, True), (100, True)):
|
||||||
|
do_test(data[:data_size], 80, warn)
|
||||||
|
|
||||||
|
# if we set the threshold to -1 will turn off the warning
|
||||||
|
for data_size, warn in ((79, False), (80, False), (81, False),
|
||||||
|
(99, False), (100, False)):
|
||||||
|
do_test(data[:data_size], -1, warn)
|
||||||
|
|
||||||
|
# Changing to 0 should warn on everything
|
||||||
|
for data_size, warn in ((0, True), (1, True), (50, True),
|
||||||
|
(99, True), (100, True)):
|
||||||
|
do_test(data[:data_size], 0, warn)
|
||||||
|
|
||||||
|
# Let's do a big number
|
||||||
|
do_test('1' * 2048576, 1000000, True)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user