fixed pattern matching/globbing in swift-log-uploader

This commit is contained in:
Clay Gerrard 2011-03-12 02:17:00 +00:00 committed by Tarmac
commit ca51e87967
3 changed files with 369 additions and 114 deletions

View File

@ -14,7 +14,7 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = log_data
source_filename_format = access-%Y%m%d%H
source_filename_pattern = access-%Y%m%d%H
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.access_processor.AccessLogProcessor
@ -31,9 +31,9 @@ class_path = swift.stats.access_processor.AccessLogProcessor
# log_dir = /var/log/swift/
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
container_name = account_stats
source_filename_format = stats-%Y%m%d%H_*
source_filename_pattern = stats-%Y%m%d%H_.*
# new_log_cutoff = 7200
# unlink_log = True
class_path = swift.stats.stats_processor.StatsLogProcessor
# account_server_conf = /etc/swift/account-server.conf
# user = swift
# user = swift

View File

@ -18,7 +18,7 @@ import os
import hashlib
import time
import gzip
import glob
import re
from paste.deploy import appconfig
from swift.common.internal_proxy import InternalProxy
@ -44,29 +44,30 @@ class LogUploader(Daemon):
def __init__(self, uploader_conf, plugin_name):
super(LogUploader, self).__init__(uploader_conf)
log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
swift_account = uploader_conf['swift_account']
container_name = uploader_conf['container_name']
source_filename_format = uploader_conf['source_filename_format']
log_name = '%s-log-uploader' % plugin_name
self.logger = utils.get_logger(uploader_conf, log_name,
log_route=plugin_name)
self.log_dir = uploader_conf.get('log_dir', '/var/log/swift/')
self.swift_account = uploader_conf['swift_account']
self.container_name = uploader_conf['container_name']
proxy_server_conf_loc = uploader_conf.get('proxy_server_conf',
'/etc/swift/proxy-server.conf')
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
name='proxy-server')
new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
('true', 'on', '1', 'yes')
self.unlink_log = unlink_log
self.new_log_cutoff = new_log_cutoff
if not log_dir.endswith('/'):
log_dir = log_dir + '/'
self.log_dir = log_dir
self.swift_account = swift_account
self.container_name = container_name
self.filename_format = source_filename_format
self.internal_proxy = InternalProxy(proxy_server_conf)
log_name = '%s-log-uploader' % plugin_name
self.logger = utils.get_logger(uploader_conf, log_name,
log_route=plugin_name)
self.new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
self.unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
utils.TRUE_VALUES
# source_filename_format is deprecated
source_filename_format = uploader_conf.get('source_filename_format')
source_filename_pattern = uploader_conf.get('source_filename_pattern')
if source_filename_format and not source_filename_pattern:
self.logger.warning(_('source_filename_format is unreliable and '
'deprecated; use source_filename_pattern'))
self.pattern = self.convert_glob_to_regex(source_filename_format)
else:
self.pattern = source_filename_pattern or '%Y%m%d%H'
def run_once(self, *args, **kwargs):
self.logger.info(_("Uploading logs"))
@ -75,70 +76,114 @@ class LogUploader(Daemon):
self.logger.info(_("Uploading logs complete (%0.2f minutes)") %
((time.time() - start) / 60))
def convert_glob_to_regex(self, glob):
"""
Make a best effort to support old style config globs
:param : old style config source_filename_format
:returns : new style config source_filename_pattern
"""
pattern = glob
pattern = pattern.replace('.', r'\.')
pattern = pattern.replace('*', r'.*')
pattern = pattern.replace('?', r'.?')
return pattern
def validate_filename_pattern(self):
"""
Validate source_filename_pattern
:returns : valid regex pattern based on soruce_filename_pattern with
group matches substituded for date fmt markers
"""
pattern = self.pattern
markers = {
'%Y': ('year', '(?P<year>[0-9]{4})'),
'%m': ('month', '(?P<month>[0-1][0-9])'),
'%d': ('day', '(?P<day>[0-3][0-9])'),
'%H': ('hour', '(?P<hour>[0-2][0-9])'),
}
for marker, (type, group) in markers.items():
if marker not in self.pattern:
self.logger.error(_('source_filename_pattern much contain a '
'marker %(marker)s to match the '
'%(type)s') % {'marker': marker,
'type': type})
return
pattern = pattern.replace(marker, group)
return pattern
def get_relpath_to_files_under_log_dir(self):
"""
Look under log_dir recursively and return all filenames as relpaths
:returns : list of strs, the relpath to all filenames under log_dir
"""
all_files = []
for path, dirs, files in os.walk(self.log_dir):
all_files.extend(os.path.join(path, f) for f in files)
return [os.path.relpath(f, start=self.log_dir) for f in all_files]
def filter_files(self, all_files, pattern):
"""
Filter files based on regex pattern
:param all_files: list of strs, relpath of the filenames under log_dir
:param pattern: regex pattern to match against filenames
:returns : dict mapping full path of file to match group dict
"""
filename2match = {}
found_match = False
for filename in all_files:
match = re.match(pattern, filename)
if match:
found_match = True
full_path = os.path.join(self.log_dir, filename)
filename2match[full_path] = match.groupdict()
else:
self.logger.debug(_('%(filename)s does not match '
'%(pattern)s') % {'filename': filename,
'pattern': pattern})
return filename2match
def upload_all_logs(self):
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
i.sort()
year_offset = month_offset = day_offset = hour_offset = None
base_offset = len(self.log_dir.rstrip('/')) + 1
for start, c in i:
offset = base_offset + start
if c == '%Y':
year_offset = offset, offset + 4
# Add in the difference between len(%Y) and the expanded
# version of %Y (????). This makes sure the codes after this
# one will align properly in the final filename.
base_offset += 2
elif c == '%m':
month_offset = offset, offset + 2
elif c == '%d':
day_offset = offset, offset + 2
elif c == '%H':
hour_offset = offset, offset + 2
if not (year_offset and month_offset and day_offset and hour_offset):
# don't have all the parts, can't upload anything
"""
Match files under log_dir to source_filename_pattern and upload to swift
"""
pattern = self.validate_filename_pattern()
if not pattern:
self.logger.error(_('Invalid filename_format'))
return
glob_pattern = self.filename_format
glob_pattern = glob_pattern.replace('%Y', '????', 1)
glob_pattern = glob_pattern.replace('%m', '??', 1)
glob_pattern = glob_pattern.replace('%d', '??', 1)
glob_pattern = glob_pattern.replace('%H', '??', 1)
filelist = glob.iglob(os.path.join(self.log_dir, glob_pattern))
current_hour = int(time.strftime('%H'))
today = int(time.strftime('%Y%m%d'))
self.internal_proxy.create_container(self.swift_account,
self.container_name)
for filename in filelist:
try:
# From the filename, we need to derive the year, month, day,
# and hour for the file. These values are used in the uploaded
# object's name, so they should be a reasonably accurate
# representation of the time for which the data in the file was
# collected. The file's last modified time is not a reliable
# representation of the data in the file. For example, an old
# log file (from hour A) may be uploaded or moved into the
# log_dir in hour Z. The file's modified time will be for hour
# Z, and therefore the object's name in the system will not
# represent the data in it.
# If the filename doesn't match the format, it shouldn't be
# uploaded.
year = filename[slice(*year_offset)]
month = filename[slice(*month_offset)]
day = filename[slice(*day_offset)]
hour = filename[slice(*hour_offset)]
except IndexError:
# unexpected filename format, move on
self.logger.error(_("Unexpected log: %s") % filename)
all_files = self.get_relpath_to_files_under_log_dir()
filename2match = self.filter_files(all_files, pattern)
if not filename2match:
self.logger.info(_('No files in %(log_dir)s match %(pattern)s') %
{'log_dir': self.log_dir, 'pattern': pattern})
return
if not self.internal_proxy.create_container(self.swift_account,
self.container_name):
self.logger.error(_('Unable to create container for '
'%(account)s/%(container)s') % {
'account': self.swift_account,
'container': self.container_name})
return
for filename, match in filename2match.items():
# don't process very new logs
seconds_since_mtime = time.time() - os.stat(filename).st_mtime
if seconds_since_mtime < self.new_log_cutoff:
self.logger.debug(_("Skipping log: %(file)s "
"(< %(cutoff)d seconds old)") % {
'file': filename,
'cutoff': self.new_log_cutoff})
continue
if ((time.time() - os.stat(filename).st_mtime) <
self.new_log_cutoff):
# don't process very new logs
self.logger.debug(
_("Skipping log: %(file)s (< %(cutoff)d seconds old)") %
{'file': filename, 'cutoff': self.new_log_cutoff})
continue
self.upload_one_log(filename, year, month, day, hour)
self.upload_one_log(filename, **match)
def upload_one_log(self, filename, year, month, day, hour):
"""
Upload one file to swift
"""
if os.path.getsize(filename) == 0:
self.logger.debug(_("Log %s is 0 length, skipping") % filename)
return

View File

@ -13,14 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO: More tests
import unittest
import os
from datetime import datetime
from tempfile import mkdtemp
from shutil import rmtree
from functools import partial
from collections import defaultdict
import random
import string
from test.unit import temptree
from swift.stats import log_uploader
import logging
@ -29,34 +32,62 @@ LOGGER = logging.getLogger()
DEFAULT_GLOB = '%Y%m%d%H'
COMPRESSED_DATA = '\x1f\x8b\x08\x08\x87\xa5zM\x02\xffdata\x00KI,I\x04\x00c' \
'\xf3\xf3\xad\x04\x00\x00\x00'
def mock_appconfig(*args, **kwargs):
pass
class MockInternalProxy():
def __init__(self, *args, **kwargs):
pass
def create_container(self, *args, **kwargs):
return True
def upload_file(self, *args, **kwargs):
return True
_orig_LogUploader = log_uploader.LogUploader
class MockLogUploader(_orig_LogUploader):
def __init__(self, conf, logger=LOGGER):
conf['swift_account'] = conf.get('swift_account', '')
conf['container_name'] = conf.get('container_name', '')
conf['new_log_cutoff'] = conf.get('new_log_cutoff', '0')
conf['source_filename_format'] = conf.get(
'source_filename_format', conf.get('filename_format'))
log_uploader.LogUploader.__init__(self, conf, 'plugin')
self.logger = logger
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
_orig_LogUploader.upload_one_log(self, filename, year, month,
day, hour)
class TestLogUploader(unittest.TestCase):
def test_upload_all_logs(self):
def setUp(self):
# mock internal proxy
self._orig_InternalProxy = log_uploader.InternalProxy
self._orig_appconfig = log_uploader.appconfig
log_uploader.InternalProxy = MockInternalProxy
log_uploader.appconfig = mock_appconfig
class MockInternalProxy():
def create_container(self, *args, **kwargs):
pass
class MonkeyLogUploader(log_uploader.LogUploader):
def __init__(self, conf, logger=LOGGER):
self.log_dir = conf['log_dir']
self.filename_format = conf.get('filename_format',
DEFAULT_GLOB)
self.new_log_cutoff = 0
self.logger = logger
self.internal_proxy = MockInternalProxy()
self.swift_account = ''
self.container_name = ''
self.uploaded_files = []
def upload_one_log(self, filename, year, month, day, hour):
d = {'year': year, 'month': month, 'day': day, 'hour': hour}
self.uploaded_files.append((filename, d))
def tearDown(self):
log_uploader.appconfig = self._orig_appconfig
log_uploader.InternalProxy = self._orig_InternalProxy
def test_deprecated_glob_style_upload_all_logs(self):
tmpdir = mkdtemp()
try:
today = datetime.now()
@ -72,7 +103,7 @@ class TestLogUploader(unittest.TestCase):
open(os.path.join(tmpdir, ts), 'w').close()
conf = {'log_dir': tmpdir}
uploader = MonkeyLogUploader(conf)
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
@ -112,7 +143,7 @@ class TestLogUploader(unittest.TestCase):
'log_dir': '%s/' % tmpdir,
'filename_format': 'swift-blah_98764.%Y%m%d-%H*.tar.gz',
}
uploader = MonkeyLogUploader(conf)
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
@ -146,22 +177,201 @@ class TestLogUploader(unittest.TestCase):
'log_dir': tmpdir,
'filename_format': '*.%Y%m%d%H.log',
}
uploader = MonkeyLogUploader(conf)
uploader = MockLogUploader(conf)
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 24)
for i, file_date in enumerate(sorted(uploader.uploaded_files)):
fname_to_int = lambda x: int(os.path.basename(x[0]).split('.')[0])
numerically = lambda x, y: cmp(fname_to_int(x),
fname_to_int(y))
for i, file_date in enumerate(sorted(uploader.uploaded_files,
cmp=numerically)):
d = {'year': year, 'month': month, 'day': day, 'hour': i}
for k, v in d.items():
d[k] = '%0.2d' % v
expected = (os.path.join(tmpdir, '%s.%s%0.2d.log' %
(i, today_str, i)), d)
# TODO: support wildcards before the date pattern
# (i.e. relative offsets)
#print file_date
#self.assertEquals(file_date, expected)
self.assertEquals(file_date, expected)
finally:
rmtree(tmpdir)
def test_bad_pattern_in_config(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
# invalid pattern
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%h'} # should be %H
uploader = MockLogUploader(conf)
self.assertFalse(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(uploader.uploaded_files, [])
conf = {'log_dir': t, 'source_filename_pattern': '%Y%m%d%H'}
uploader = MockLogUploader(conf)
self.assert_(uploader.validate_filename_pattern())
uploader.upload_all_logs()
self.assertEquals(len(uploader.uploaded_files), 1)
# deprecated warning on source_filename_format
class MockLogger():
def __init__(self):
self.msgs = defaultdict(list)
def log(self, level, msg):
self.msgs[level].append(msg)
def __getattr__(self, attr):
return partial(self.log, attr)
logger = MockLogger.logger = MockLogger()
def mock_get_logger(*args, **kwargs):
return MockLogger.logger
_orig_get_logger = log_uploader.utils.get_logger
try:
log_uploader.utils.get_logger = mock_get_logger
conf = {'source_filename_format': '%Y%m%d%H'}
uploader = MockLogUploader(conf, logger=logger)
self.assert_([m for m in logger.msgs['warning']
if 'deprecated' in m])
finally:
log_uploader.utils.get_logger = _orig_get_logger
# convert source_filename_format to regex
conf = {'source_filename_format': 'pattern-*.%Y%m%d%H.*.gz'}
uploader = MockLogUploader(conf)
expected = r'pattern-.*\.%Y%m%d%H\..*\.gz'
self.assertEquals(uploader.pattern, expected)
# use source_filename_pattern if we have the choice!
conf = {
'source_filename_format': 'bad',
'source_filename_pattern': 'good',
}
uploader = MockLogUploader(conf)
self.assertEquals(uploader.pattern, 'good')
def test_pattern_upload_all_logs(self):
# test empty dir
with temptree([]) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
def get_random_length_str(max_len=10, chars=string.ascii_letters):
return ''.join(random.choice(chars) for x in
range(random.randint(1, max_len)))
template = 'prefix_%(random)s_%(digits)s.blah.' \
'%(datestr)s%(hour)0.2d00-%(next_hour)0.2d00-%(number)s.gz'
pattern = r'prefix_.*_[0-9]+\.blah\.%Y%m%d%H00-[0-9]{2}00' \
'-[0-9]?[0-9]\.gz'
files_that_should_match = []
# add some files that match
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m%d'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files_that_should_match.append(fname)
# add some files that don't match
files = list(files_that_should_match)
for i in range(24):
fname = template % {
'random': get_random_length_str(),
'digits': get_random_length_str(16, string.digits),
'datestr': datetime.now().strftime('%Y%m'),
'hour': i,
'next_hour': i + 1,
'number': random.randint(0, 20),
}
files.append(fname)
for fname in files:
print fname
with temptree(files, contents=[COMPRESSED_DATA] * len(files)) as t:
self.assertEquals(len(os.listdir(t)), 48)
conf = {'source_filename_pattern': pattern, 'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(os.listdir(t)), 24)
self.assertEquals(len(uploader.uploaded_files), 24)
files_that_were_uploaded = set(x[0] for x in
uploader.uploaded_files)
for f in files_that_should_match:
self.assert_(os.path.join(t, f) in files_that_were_uploaded)
def test_log_cutoff(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t, 'new_log_cutoff': '7200'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
conf = {'log_dir': t, 'new_log_cutoff': '0'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
def test_create_container_fail(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
with temptree(files) as t:
conf = {'log_dir': t}
uploader = MockLogUploader(conf)
# mock create_container to fail
uploader.internal_proxy.create_container = lambda *args: False
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
def test_unlink_log(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'false'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
conf = {'log_dir': t, 'unlink_log': 'true'}
uploader = MockLogUploader(conf)
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 1)
# file gone
self.assertEquals(len(os.listdir(t)), 0)
def test_upload_file_failed(self):
files = [datetime.now().strftime('%Y%m%d%H')]
with temptree(files, contents=[COMPRESSED_DATA]) as t:
conf = {'log_dir': t, 'unlink_log': 'true'}
uploader = MockLogUploader(conf)
# mock upload_file to fail, and clean up mock
def mock_upload_file(self, *args, **kwargs):
uploader.uploaded_files.pop()
return False
uploader.internal_proxy.upload_file = mock_upload_file
uploader.run_once()
self.assertEquals(len(uploader.uploaded_files), 0)
# file still there
self.assertEquals(len(os.listdir(t)), 1)
if __name__ == '__main__':
unittest.main()