Honor sample_rate in StatsD logging.
It's there to let administrators turn down the barrage of stats data that StatsD must cope with, but it wasn't actually honored. Worse, if the sample rate was set to e.g. 0.2, the stats would all be multiplied by its inverse, e.g. 2. This patch actually drops packets when sample_rate < 1, so you get correct measurements. Fortunately, the default sample rate is 1 (i.e. drop nothing), and multiplying by 1/1 doesn't change anything, so stats with the default sample rate of 1.0 are, and have been, just fine. Fixes bug 1065643. Also, make the two touched files compliant with pep8 v1.3.3. Change-Id: I66663144009ae4c9ee96f6a111745d8f5d2f5ca3
This commit is contained in:
parent
8cacf5aaf8
commit
4cf96b3791
@ -23,7 +23,7 @@ import sys
|
||||
import time
|
||||
import functools
|
||||
from hashlib import md5
|
||||
from random import shuffle
|
||||
from random import random, shuffle
|
||||
from urllib import quote
|
||||
from contextlib import contextmanager, closing
|
||||
import ctypes
|
||||
@ -233,7 +233,7 @@ def drop_buffer_cache(fd, offset, length):
|
||||
_posix_fadvise = load_libc_function('posix_fadvise64')
|
||||
# 4 means "POSIX_FADV_DONTNEED"
|
||||
ret = _posix_fadvise(fd, ctypes.c_uint64(offset),
|
||||
ctypes.c_uint64(length), 4)
|
||||
ctypes.c_uint64(length), 4)
|
||||
if ret != 0:
|
||||
logging.warn("posix_fadvise64(%s, %s, %s, 4) -> %s"
|
||||
% (fd, offset, length, ret))
|
||||
@ -311,16 +311,17 @@ def split_path(path, minsegs=1, maxsegs=None, rest_with_last=False):
|
||||
minsegs += 1
|
||||
maxsegs += 1
|
||||
count = len(segs)
|
||||
if segs[0] or count < minsegs or count > maxsegs or \
|
||||
'' in segs[1:minsegs]:
|
||||
if (segs[0] or count < minsegs or count > maxsegs or
|
||||
'' in segs[1:minsegs]):
|
||||
raise ValueError('Invalid path: %s' % quote(path))
|
||||
else:
|
||||
minsegs += 1
|
||||
maxsegs += 1
|
||||
segs = path.split('/', maxsegs)
|
||||
count = len(segs)
|
||||
if segs[0] or count < minsegs or count > maxsegs + 1 or \
|
||||
'' in segs[1:minsegs] or (count == maxsegs + 1 and segs[maxsegs]):
|
||||
if (segs[0] or count < minsegs or count > maxsegs + 1 or
|
||||
'' in segs[1:minsegs] or
|
||||
(count == maxsegs + 1 and segs[maxsegs])):
|
||||
raise ValueError('Invalid path: %s' % quote(path))
|
||||
segs = segs[1:maxsegs]
|
||||
segs.extend([None] * (maxsegs - 1 - len(segs)))
|
||||
@ -407,6 +408,7 @@ class StatsdClient(object):
|
||||
self.set_prefix(tail_prefix)
|
||||
self._default_sample_rate = default_sample_rate
|
||||
self._target = (self._host, self._port)
|
||||
self.random = random
|
||||
|
||||
def set_prefix(self, new_prefix):
|
||||
if new_prefix and self._base_prefix:
|
||||
@ -423,12 +425,18 @@ class StatsdClient(object):
|
||||
sample_rate = self._default_sample_rate
|
||||
parts = ['%s%s:%s' % (self._prefix, m_name, m_value), m_type]
|
||||
if sample_rate < 1:
|
||||
parts.append('@%s' % (sample_rate,))
|
||||
if self.random() < sample_rate:
|
||||
parts.append('@%s' % (sample_rate,))
|
||||
else:
|
||||
return
|
||||
# Ideally, we'd cache a sending socket in self, but that
|
||||
# results in a socket getting shared by multiple green threads.
|
||||
with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock:
|
||||
with closing(self._open_socket()) as sock:
|
||||
return sock.sendto('|'.join(parts), self._target)
|
||||
|
||||
def _open_socket(self):
|
||||
return socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
|
||||
def update_stats(self, m_name, m_value, sample_rate=None):
|
||||
return self._send(m_name, m_value, 'c', sample_rate)
|
||||
|
||||
@ -587,10 +595,10 @@ class SwiftLogFormatter(logging.Formatter):
|
||||
def format(self, record):
|
||||
msg = logging.Formatter.format(self, record)
|
||||
if (record.txn_id and record.levelno != logging.INFO and
|
||||
record.txn_id not in msg):
|
||||
record.txn_id not in msg):
|
||||
msg = "%s (txn: %s)" % (msg, record.txn_id)
|
||||
if (record.client_ip and record.levelno != logging.INFO and
|
||||
record.client_ip not in msg):
|
||||
record.client_ip not in msg):
|
||||
msg = "%s (client_ip: %s)" % (msg, record.client_ip)
|
||||
return msg
|
||||
|
||||
@ -1084,7 +1092,7 @@ def readconf(conffile, section_name=None, log_name=None, defaults=None,
|
||||
conf = dict(c.items(section_name))
|
||||
else:
|
||||
print _("Unable to find %s config section in %s") % \
|
||||
(section_name, conffile)
|
||||
(section_name, conffile)
|
||||
sys.exit(1)
|
||||
if "log_name" not in conf:
|
||||
if log_name is not None:
|
||||
|
@ -21,6 +21,7 @@ import errno
|
||||
import logging
|
||||
import mimetools
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import socket
|
||||
import sys
|
||||
@ -36,8 +37,8 @@ from tempfile import TemporaryFile, NamedTemporaryFile
|
||||
|
||||
from eventlet import sleep
|
||||
|
||||
from swift.common.exceptions import Timeout, MessageTimeout, \
|
||||
ConnectionTimeout
|
||||
from swift.common.exceptions import (Timeout, MessageTimeout,
|
||||
ConnectionTimeout)
|
||||
from swift.common import utils
|
||||
|
||||
|
||||
@ -82,6 +83,17 @@ class MockOs():
|
||||
return getattr(os, name)
|
||||
|
||||
|
||||
class MockUdpSocket():
|
||||
def __init__(self):
|
||||
self.sent = []
|
||||
|
||||
def sendto(self, data, target):
|
||||
self.sent.append((data, target))
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class MockSys():
|
||||
|
||||
def __init__(self):
|
||||
@ -197,21 +209,30 @@ class TestUtils(unittest.TestCase):
|
||||
def test_validate_device_partition(self):
|
||||
""" Test swift.common.utils.validate_device_partition """
|
||||
utils.validate_device_partition('foo', 'bar')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, '', '')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, '', 'foo')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, 'foo/bar', 'foo')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, 'foo', 'foo/bar')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, '.', 'foo')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, '..', 'foo')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '.')
|
||||
self.assertRaises(ValueError, utils.validate_device_partition, 'foo', '..')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, '', '')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, '', 'foo')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, 'foo', '')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, 'foo/bar', 'foo')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, 'foo', 'foo/bar')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, '.', 'foo')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, '..', 'foo')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, 'foo', '.')
|
||||
self.assertRaises(ValueError,
|
||||
utils.validate_device_partition, 'foo', '..')
|
||||
try:
|
||||
utils.validate_device_partition,('o\nn e', 'foo')
|
||||
utils.validate_device_partition('o\nn e', 'foo')
|
||||
except ValueError, err:
|
||||
self.assertEquals(str(err), 'Invalid device: o%0An%20e')
|
||||
try:
|
||||
utils.validate_device_partition,('foo', 'o\nn e')
|
||||
utils.validate_device_partition('foo', 'o\nn e')
|
||||
except ValueError, err:
|
||||
self.assertEquals(str(err), 'Invalid partition: o%0An%20e')
|
||||
|
||||
@ -243,21 +264,21 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n')
|
||||
print >> sys.stderr, 'test6'
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||
'STDOUT: test6\n')
|
||||
'STDOUT: test6\n')
|
||||
sys.stderr = orig_stderr
|
||||
print 'test8'
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||
'STDOUT: test6\n')
|
||||
'STDOUT: test6\n')
|
||||
lfo.writelines(['a', 'b', 'c'])
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\n')
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\n')
|
||||
lfo.close()
|
||||
lfo.write('d')
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n')
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n')
|
||||
lfo.flush()
|
||||
self.assertEquals(sio.getvalue(), 'STDOUT: test2\nSTDOUT: test4\n'
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n')
|
||||
'STDOUT: test6\nSTDOUT: a#012b#012c\nSTDOUT: d\n')
|
||||
got_exc = False
|
||||
try:
|
||||
for line in lfo:
|
||||
@ -503,7 +524,7 @@ class TestUtils(unittest.TestCase):
|
||||
|
||||
def test_storage_directory(self):
|
||||
self.assertEquals(utils.storage_directory('objects', '1', 'ABCDEF'),
|
||||
'objects/1/DEF/ABCDEF')
|
||||
'objects/1/DEF/ABCDEF')
|
||||
|
||||
def test_whataremyips(self):
|
||||
myips = utils.whataremyips()
|
||||
@ -522,7 +543,8 @@ class TestUtils(unittest.TestCase):
|
||||
self.assertEquals(utils.hash_path('a', 'c', 'o', raw_digest=False),
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83')
|
||||
self.assertEquals(utils.hash_path('a', 'c', 'o', raw_digest=True),
|
||||
'\x06\xfb\xf0\xb5\x14\xe5\x19\x9d\xfcN\x00\xf4.\xb5\xea\x83')
|
||||
'\x06\xfb\xf0\xb5\x14\xe5\x19\x9d\xfcN'
|
||||
'\x00\xf4.\xb5\xea\x83')
|
||||
self.assertRaises(ValueError, utils.hash_path, 'a', object='o')
|
||||
|
||||
def test_load_libc_function(self):
|
||||
@ -870,9 +892,9 @@ log_name = %(yarr)s'''
|
||||
'http://1.1.1.1/v1/a/c/o?query=param',
|
||||
'http://1.1.1.1/v1/a/c/o?query=param#frag',
|
||||
'http://1.1.1.2/v1/a/c/o'):
|
||||
self.assertNotEquals(utils.validate_sync_to(badurl,
|
||||
['1.1.1.1', '2.2.2.2']),
|
||||
None)
|
||||
self.assertNotEquals(
|
||||
utils.validate_sync_to(badurl, ['1.1.1.1', '2.2.2.2']),
|
||||
None)
|
||||
|
||||
def test_TRUE_VALUES(self):
|
||||
for v in utils.TRUE_VALUES:
|
||||
@ -906,7 +928,8 @@ class TestStatsdLogging(unittest.TestCase):
|
||||
logger = utils.get_logger({'log_statsd_host': 'some.host.com'},
|
||||
'some-name', log_route='some-route')
|
||||
# white-box construction validation
|
||||
self.assert_(isinstance(logger.logger.statsd_client, utils.StatsdClient))
|
||||
self.assert_(isinstance(logger.logger.statsd_client,
|
||||
utils.StatsdClient))
|
||||
self.assertEqual(logger.logger.statsd_client._host, 'some.host.com')
|
||||
self.assertEqual(logger.logger.statsd_client._port, 8125)
|
||||
self.assertEqual(logger.logger.statsd_client._prefix, 'some-name.')
|
||||
@ -934,7 +957,29 @@ class TestStatsdLogging(unittest.TestCase):
|
||||
self.assertEqual(logger.logger.statsd_client._prefix, 'tomato.sauce.')
|
||||
self.assertEqual(logger.logger.statsd_client._host, 'another.host.com')
|
||||
self.assertEqual(logger.logger.statsd_client._port, 9876)
|
||||
self.assertEqual(logger.logger.statsd_client._default_sample_rate, 0.75)
|
||||
self.assertEqual(logger.logger.statsd_client._default_sample_rate,
|
||||
0.75)
|
||||
|
||||
def test_sample_rates(self):
|
||||
logger = utils.get_logger({'log_statsd_host': 'some.host.com'})
|
||||
|
||||
mock_socket = MockUdpSocket()
|
||||
# encapsulation? what's that?
|
||||
statsd_client = logger.logger.statsd_client
|
||||
self.assertTrue(statsd_client.random is random.random)
|
||||
|
||||
statsd_client._open_socket = lambda *_: mock_socket
|
||||
statsd_client.random = lambda: 0.50001
|
||||
|
||||
logger.increment('tribbles', sample_rate=0.5)
|
||||
self.assertEqual(len(mock_socket.sent), 0)
|
||||
|
||||
statsd_client.random = lambda: 0.49999
|
||||
logger.increment('tribbles', sample_rate=0.5)
|
||||
self.assertEqual(len(mock_socket.sent), 1)
|
||||
|
||||
payload = mock_socket.sent[0][0]
|
||||
self.assertTrue(payload.endswith("|@0.5"))
|
||||
|
||||
|
||||
class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
@ -1003,7 +1048,8 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
# Delegate methods are no-ops
|
||||
self.assertEqual(None, logger.update_stats('foo', 88))
|
||||
self.assertEqual(None, logger.update_stats('foo', 88, 0.57))
|
||||
self.assertEqual(None, logger.update_stats('foo', 88, sample_rate=0.61))
|
||||
self.assertEqual(None, logger.update_stats('foo', 88,
|
||||
sample_rate=0.61))
|
||||
self.assertEqual(None, logger.increment('foo'))
|
||||
self.assertEqual(None, logger.increment('foo', 0.57))
|
||||
self.assertEqual(None, logger.increment('foo', sample_rate=0.61))
|
||||
@ -1175,7 +1221,7 @@ class TestStatsdLoggingDelegation(unittest.TestCase):
|
||||
self.assertEquals(logger.thread_locals, ('5678', '5.6.7.8'))
|
||||
finally:
|
||||
logger.thread_locals = orig_thread_locals
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user