updated with changes and suggestions from code review
This commit is contained in:
commit
ea690795df
@ -24,4 +24,4 @@ if __name__ == '__main__':
|
|||||||
print "Usage: swift-log-stats-collector CONFIG_FILE"
|
print "Usage: swift-log-stats-collector CONFIG_FILE"
|
||||||
sys.exit()
|
sys.exit()
|
||||||
conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
|
conf = utils.readconf(sys.argv[1], log_name='log-stats-collector')
|
||||||
stats = LogProcessorDaemon(conf).run(once=True, capture_stdout=False)
|
stats = LogProcessorDaemon(conf).run(once=True)
|
||||||
|
@ -15,6 +15,8 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
|
|||||||
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
|
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
|
||||||
container_name = log_data
|
container_name = log_data
|
||||||
source_filename_format = access-%Y%m%d%H
|
source_filename_format = access-%Y%m%d%H
|
||||||
|
# new_log_cutoff = 7200
|
||||||
|
# unlink_log = True
|
||||||
class_path = swift.stats.access_processor.AccessLogProcessor
|
class_path = swift.stats.access_processor.AccessLogProcessor
|
||||||
# service ips is for client ip addresses that should be counted as servicenet
|
# service ips is for client ip addresses that should be counted as servicenet
|
||||||
# service_ips =
|
# service_ips =
|
||||||
@ -28,7 +30,9 @@ class_path = swift.stats.access_processor.AccessLogProcessor
|
|||||||
# log_dir = /var/log/swift/
|
# log_dir = /var/log/swift/
|
||||||
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
|
swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31
|
||||||
container_name = account_stats
|
container_name = account_stats
|
||||||
source_filename_format = stats-%Y%m%d%H
|
source_filename_format = stats-%Y%m%d%H_*
|
||||||
|
# new_log_cutoff = 7200
|
||||||
|
# unlink_log = True
|
||||||
class_path = swift.stats.stats_processor.StatsLogProcessor
|
class_path = swift.stats.stats_processor.StatsLogProcessor
|
||||||
# account_server_conf = /etc/swift/account-server.conf
|
# account_server_conf = /etc/swift/account-server.conf
|
||||||
# user = swift
|
# user = swift
|
@ -16,8 +16,7 @@
|
|||||||
import zlib
|
import zlib
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
|
class CompressingFileReader(object):
|
||||||
class CompressedFileReader(object):
|
|
||||||
'''
|
'''
|
||||||
Wraps a file object and provides a read method that returns gzip'd data.
|
Wraps a file object and provides a read method that returns gzip'd data.
|
||||||
|
|
@ -17,7 +17,7 @@ import webob
|
|||||||
from urllib import quote, unquote
|
from urllib import quote, unquote
|
||||||
from json import loads as json_loads
|
from json import loads as json_loads
|
||||||
|
|
||||||
from swift.common.compressed_file_reader import CompressedFileReader
|
from swift.common.compressing_file_reader import CompressingFileReader
|
||||||
from swift.proxy.server import BaseApplication
|
from swift.proxy.server import BaseApplication
|
||||||
|
|
||||||
class MemcacheStub(object):
|
class MemcacheStub(object):
|
||||||
@ -45,7 +45,8 @@ class InternalProxy(object):
|
|||||||
self.retries = retries
|
self.retries = retries
|
||||||
|
|
||||||
def upload_file(self, source_file, account, container, object_name,
|
def upload_file(self, source_file, account, container, object_name,
|
||||||
compress=True, content_type='application/x-gzip'):
|
compress=True, content_type='application/x-gzip',
|
||||||
|
etag=None):
|
||||||
"""
|
"""
|
||||||
Upload a file to cloud files.
|
Upload a file to cloud files.
|
||||||
|
|
||||||
@ -69,9 +70,9 @@ class InternalProxy(object):
|
|||||||
headers={'Transfer-Encoding': 'chunked'})
|
headers={'Transfer-Encoding': 'chunked'})
|
||||||
if compress:
|
if compress:
|
||||||
if hasattr(source_file, 'read'):
|
if hasattr(source_file, 'read'):
|
||||||
compressed_file = CompressedFileReader(source_file)
|
compressed_file = CompressingFileReader(source_file)
|
||||||
else:
|
else:
|
||||||
compressed_file = CompressedFileReader(open(source_file, 'rb'))
|
compressed_file = CompressingFileReader(open(source_file, 'rb'))
|
||||||
req.body_file = compressed_file
|
req.body_file = compressed_file
|
||||||
else:
|
else:
|
||||||
if not hasattr(source_file, 'read'):
|
if not hasattr(source_file, 'read'):
|
||||||
@ -80,6 +81,8 @@ class InternalProxy(object):
|
|||||||
req.account = account
|
req.account = account
|
||||||
req.content_type = content_type
|
req.content_type = content_type
|
||||||
req.content_length = None # to make sure we send chunked data
|
req.content_length = None # to make sure we send chunked data
|
||||||
|
if etag:
|
||||||
|
req.etag = etag
|
||||||
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
|
resp = self.upload_app.handle_request(self.upload_app.update_request(req))
|
||||||
tries = 1
|
tries = 1
|
||||||
while (resp.status_int < 200 or resp.status_int > 299) \
|
while (resp.status_int < 200 or resp.status_int > 299) \
|
||||||
|
@ -21,7 +21,9 @@ from swift.common.utils import split_path
|
|||||||
|
|
||||||
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
|
month_map = '_ Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec'.split()
|
||||||
|
|
||||||
|
|
||||||
class AccessLogProcessor(object):
|
class AccessLogProcessor(object):
|
||||||
|
"""Transform proxy server access logs"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
self.server_name = conf.get('server_name', 'proxy')
|
self.server_name = conf.get('server_name', 'proxy')
|
||||||
@ -112,8 +114,8 @@ class AccessLogProcessor(object):
|
|||||||
d['account'] = account
|
d['account'] = account
|
||||||
d['container_name'] = container_name
|
d['container_name'] = container_name
|
||||||
d['object_name'] = object_name
|
d['object_name'] = object_name
|
||||||
d['bytes_out'] = int(d['bytes_out'].replace('-','0'))
|
d['bytes_out'] = int(d['bytes_out'].replace('-', '0'))
|
||||||
d['bytes_in'] = int(d['bytes_in'].replace('-','0'))
|
d['bytes_in'] = int(d['bytes_in'].replace('-', '0'))
|
||||||
d['code'] = int(d['code'])
|
d['code'] = int(d['code'])
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -151,12 +153,12 @@ class AccessLogProcessor(object):
|
|||||||
source = 'service'
|
source = 'service'
|
||||||
else:
|
else:
|
||||||
source = 'public'
|
source = 'public'
|
||||||
|
|
||||||
if line_data['client_ip'] in self.service_ips:
|
if line_data['client_ip'] in self.service_ips:
|
||||||
source = 'service'
|
source = 'service'
|
||||||
|
|
||||||
d[(source, 'bytes_out')] = d.setdefault((source, 'bytes_out'), 0) + \
|
d[(source, 'bytes_out')] = d.setdefault((
|
||||||
bytes_out
|
source, 'bytes_out'), 0) + bytes_out
|
||||||
d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
|
d[(source, 'bytes_in')] = d.setdefault((source, 'bytes_in'), 0) + \
|
||||||
bytes_in
|
bytes_in
|
||||||
|
|
||||||
@ -171,7 +173,7 @@ class AccessLogProcessor(object):
|
|||||||
path = line_data.get('path', 0)
|
path = line_data.get('path', 0)
|
||||||
d['path_query'] = d.setdefault('path_query', 0) + path
|
d['path_query'] = d.setdefault('path_query', 0) + path
|
||||||
|
|
||||||
code = '%dxx' % (code/100)
|
code = '%dxx' % (code / 100)
|
||||||
key = (source, op_level, method, code)
|
key = (source, op_level, method, code)
|
||||||
d[key] = d.setdefault(key, 0) + 1
|
d[key] = d.setdefault(key, 0) + 1
|
||||||
|
|
||||||
@ -220,5 +222,5 @@ class AccessLogProcessor(object):
|
|||||||
keylist_mapping[code].add(
|
keylist_mapping[code].add(
|
||||||
(source, level, verb, code))
|
(source, level, verb, code))
|
||||||
keylist_mapping['ops_count'].add(
|
keylist_mapping['ops_count'].add(
|
||||||
(source,level,verb,code))
|
(source, level, verb, code))
|
||||||
return keylist_mapping
|
return keylist_mapping
|
||||||
|
@ -16,18 +16,26 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
from paste.deploy import appconfig
|
from paste.deploy import appconfig
|
||||||
|
import shutil
|
||||||
|
import hashlib
|
||||||
|
|
||||||
from swift.account.server import DATADIR as account_server_data_dir
|
from swift.account.server import DATADIR as account_server_data_dir
|
||||||
from swift.common.db import AccountBroker
|
from swift.common.db import AccountBroker
|
||||||
from swift.common.internal_proxy import InternalProxy
|
from swift.common.internal_proxy import InternalProxy
|
||||||
from swift.common.utils import renamer, get_logger, readconf
|
from swift.common.utils import renamer, get_logger, readconf, mkdirs
|
||||||
|
from swift.common.constraints import check_mount
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
|
|
||||||
|
|
||||||
class AccountStat(Daemon):
|
class AccountStat(Daemon):
|
||||||
|
"""
|
||||||
|
Extract storage stats from account databases on the account
|
||||||
|
storage nodes
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, stats_conf):
|
def __init__(self, stats_conf):
|
||||||
super(AccountStat, self).__init__(stats_conf)
|
super(AccountStat, self).__init__(stats_conf)
|
||||||
target_dir = stats_conf.get('log_dir', '/var/log/swift')
|
target_dir = stats_conf.get('log_dir', '/var/log/swift')
|
||||||
#TODO: figure out the server configs. also figure out internal_proxy
|
|
||||||
account_server_conf_loc = stats_conf.get('account_server_conf',
|
account_server_conf_loc = stats_conf.get('account_server_conf',
|
||||||
'/etc/swift/account-server.conf')
|
'/etc/swift/account-server.conf')
|
||||||
server_conf = appconfig('config:%s' % account_server_conf_loc,
|
server_conf = appconfig('config:%s' % account_server_conf_loc,
|
||||||
@ -35,6 +43,7 @@ class AccountStat(Daemon):
|
|||||||
filename_format = stats_conf['source_filename_format']
|
filename_format = stats_conf['source_filename_format']
|
||||||
self.filename_format = filename_format
|
self.filename_format = filename_format
|
||||||
self.target_dir = target_dir
|
self.target_dir = target_dir
|
||||||
|
mkdirs(self.target_dir)
|
||||||
self.devices = server_conf.get('devices', '/srv/node')
|
self.devices = server_conf.get('devices', '/srv/node')
|
||||||
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
|
self.mount_check = server_conf.get('mount_check', 'true').lower() in \
|
||||||
('true', 't', '1', 'on', 'yes', 'y')
|
('true', 't', '1', 'on', 'yes', 'y')
|
||||||
@ -45,18 +54,19 @@ class AccountStat(Daemon):
|
|||||||
start = time.time()
|
start = time.time()
|
||||||
self.find_and_process()
|
self.find_and_process()
|
||||||
self.logger.info("Gathering account stats complete (%0.2f minutes)" %
|
self.logger.info("Gathering account stats complete (%0.2f minutes)" %
|
||||||
((time.time()-start)/60))
|
((time.time() - start) / 60))
|
||||||
|
|
||||||
def find_and_process(self):
|
def find_and_process(self):
|
||||||
#TODO: handle a counter in the filename to prevent overwrites?
|
|
||||||
src_filename = time.strftime(self.filename_format)
|
src_filename = time.strftime(self.filename_format)
|
||||||
#TODO: don't use /tmp?
|
working_dir = os.path.join(self.target_dir, '.stats_tmp')
|
||||||
tmp_filename = os.path.join('/tmp', src_filename)
|
shutil.rmtree(working_dir, ignore_errors=True)
|
||||||
|
tmp_filename = os.path.join(working_dir, src_filename)
|
||||||
|
hasher = hashlib.md5()
|
||||||
with open(tmp_filename, 'wb') as statfile:
|
with open(tmp_filename, 'wb') as statfile:
|
||||||
#statfile.write('Account Name, Container Count, Object Count, Bytes Used\n')
|
# csv has the following columns:
|
||||||
|
# Account Name, Container Count, Object Count, Bytes Used
|
||||||
for device in os.listdir(self.devices):
|
for device in os.listdir(self.devices):
|
||||||
if self.mount_check and \
|
if self.mount_check and not check_mount(self.devices, device):
|
||||||
not os.path.ismount(os.path.join(self.devices, device)):
|
|
||||||
self.logger.error("Device %s is not mounted, skipping." %
|
self.logger.error("Device %s is not mounted, skipping." %
|
||||||
device)
|
device)
|
||||||
continue
|
continue
|
||||||
@ -70,7 +80,8 @@ class AccountStat(Daemon):
|
|||||||
for root, dirs, files in os.walk(accounts, topdown=False):
|
for root, dirs, files in os.walk(accounts, topdown=False):
|
||||||
for filename in files:
|
for filename in files:
|
||||||
if filename.endswith('.db'):
|
if filename.endswith('.db'):
|
||||||
broker = AccountBroker(os.path.join(root, filename))
|
db_path = os.path.join(root, filename)
|
||||||
|
broker = AccountBroker(db_path)
|
||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
(account_name,
|
(account_name,
|
||||||
_, _, _,
|
_, _, _,
|
||||||
@ -78,9 +89,12 @@ class AccountStat(Daemon):
|
|||||||
object_count,
|
object_count,
|
||||||
bytes_used,
|
bytes_used,
|
||||||
_, _) = broker.get_info()
|
_, _) = broker.get_info()
|
||||||
line_data = '"%s",%d,%d,%d\n' % (account_name,
|
line_data = '"%s",%d,%d,%d\n' % (
|
||||||
container_count,
|
account_name, container_count,
|
||||||
object_count,
|
object_count, bytes_used)
|
||||||
bytes_used)
|
|
||||||
statfile.write(line_data)
|
statfile.write(line_data)
|
||||||
|
hasher.update(line_data)
|
||||||
|
file_hash = hasher.hexdigest()
|
||||||
|
src_filename = '_'.join([src_filename, file_hash])
|
||||||
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
|
renamer(tmp_filename, os.path.join(self.target_dir, src_filename))
|
||||||
|
shutil.rmtree(working_dir, ignore_errors=True)
|
||||||
|
@ -23,32 +23,29 @@ from paste.deploy import appconfig
|
|||||||
import multiprocessing
|
import multiprocessing
|
||||||
import Queue
|
import Queue
|
||||||
import cPickle
|
import cPickle
|
||||||
|
import hashlib
|
||||||
|
|
||||||
from swift.common.internal_proxy import InternalProxy
|
from swift.common.internal_proxy import InternalProxy
|
||||||
from swift.common.exceptions import ChunkReadTimeout
|
from swift.common.exceptions import ChunkReadTimeout
|
||||||
from swift.common.utils import get_logger, readconf
|
from swift.common.utils import get_logger, readconf
|
||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
|
|
||||||
|
|
||||||
class BadFileDownload(Exception):
|
class BadFileDownload(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class LogProcessor(object):
|
class LogProcessor(object):
|
||||||
|
"""Load plugins, process logs"""
|
||||||
|
|
||||||
def __init__(self, conf, logger):
|
def __init__(self, conf, logger):
|
||||||
stats_conf = conf.get('log-processor', {})
|
stats_conf = conf.get('log-processor', {})
|
||||||
|
|
||||||
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
|
|
||||||
'/etc/swift/proxy-server.conf')
|
|
||||||
self.proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
|
|
||||||
name='proxy-server')
|
|
||||||
if isinstance(logger, tuple):
|
if isinstance(logger, tuple):
|
||||||
self.logger = get_logger(*logger)
|
self.logger = get_logger(*logger)
|
||||||
else:
|
else:
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
self.internal_proxy = InternalProxy(self.proxy_server_conf,
|
|
||||||
self.logger,
|
|
||||||
retries=3)
|
|
||||||
|
|
||||||
# load the processing plugins
|
# load the processing plugins
|
||||||
self.plugins = {}
|
self.plugins = {}
|
||||||
plugin_prefix = 'log-processor-'
|
plugin_prefix = 'log-processor-'
|
||||||
@ -56,11 +53,26 @@ class LogProcessor(object):
|
|||||||
plugin_name = section[len(plugin_prefix):]
|
plugin_name = section[len(plugin_prefix):]
|
||||||
plugin_conf = conf.get(section, {})
|
plugin_conf = conf.get(section, {})
|
||||||
self.plugins[plugin_name] = plugin_conf
|
self.plugins[plugin_name] = plugin_conf
|
||||||
import_target, class_name = plugin_conf['class_path'].rsplit('.', 1)
|
class_path = self.plugins[plugin_name]['class_path']
|
||||||
|
import_target, class_name = class_path.rsplit('.', 1)
|
||||||
module = __import__(import_target, fromlist=[import_target])
|
module = __import__(import_target, fromlist=[import_target])
|
||||||
klass = getattr(module, class_name)
|
klass = getattr(module, class_name)
|
||||||
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
|
self.plugins[plugin_name]['instance'] = klass(plugin_conf)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def internal_proxy(self):
|
||||||
|
'''Lazy load internal proxy'''
|
||||||
|
if self._internal_proxy is None:
|
||||||
|
proxy_server_conf_loc = stats_conf.get('proxy_server_conf',
|
||||||
|
'/etc/swift/proxy-server.conf')
|
||||||
|
self.proxy_server_conf = appconfig(
|
||||||
|
'config:%s' % proxy_server_conf_loc,
|
||||||
|
name='proxy-server')
|
||||||
|
self._internal_proxy = InternalProxy(self.proxy_server_conf,
|
||||||
|
self.logger,
|
||||||
|
retries=3)
|
||||||
|
return self._internal_proxy
|
||||||
|
|
||||||
def process_one_file(self, plugin_name, account, container, object_name):
|
def process_one_file(self, plugin_name, account, container, object_name):
|
||||||
# get an iter of the object data
|
# get an iter of the object data
|
||||||
compressed = object_name.endswith('.gz')
|
compressed = object_name.endswith('.gz')
|
||||||
@ -72,25 +84,27 @@ class LogProcessor(object):
|
|||||||
container,
|
container,
|
||||||
object_name)
|
object_name)
|
||||||
|
|
||||||
def get_data_list(self, start_date=None, end_date=None, listing_filter=None):
|
def get_data_list(self, start_date=None, end_date=None,
|
||||||
|
listing_filter=None):
|
||||||
total_list = []
|
total_list = []
|
||||||
for name, data in self.plugins.items():
|
for plugin_name, data in self.plugins.items():
|
||||||
account = data['swift_account']
|
account = data['swift_account']
|
||||||
container = data['container_name']
|
container = data['container_name']
|
||||||
l = self.get_container_listing(account,
|
listing = self.get_container_listing(account,
|
||||||
container,
|
container,
|
||||||
start_date,
|
start_date,
|
||||||
end_date)
|
end_date)
|
||||||
for i in l:
|
for object_name in listing:
|
||||||
# The items in this list end up being passed as positional
|
# The items in this list end up being passed as positional
|
||||||
# parameters to process_one_file.
|
# parameters to process_one_file.
|
||||||
x = (name, account, container, i)
|
x = (plugin_name, account, container, object_name)
|
||||||
if x not in listing_filter:
|
if x not in listing_filter:
|
||||||
total_list.append(x)
|
total_list.append(x)
|
||||||
return total_list
|
return total_list
|
||||||
|
|
||||||
def get_container_listing(self, swift_account, container_name, start_date=None,
|
def get_container_listing(self, swift_account, container_name,
|
||||||
end_date=None, listing_filter=None):
|
start_date=None, end_date=None,
|
||||||
|
listing_filter=None):
|
||||||
'''
|
'''
|
||||||
Get a container listing, filtered by start_date, end_date, and
|
Get a container listing, filtered by start_date, end_date, and
|
||||||
listing_filter. Dates, if given, should be in YYYYMMDDHH format
|
listing_filter. Dates, if given, should be in YYYYMMDDHH format
|
||||||
@ -162,15 +176,16 @@ class LogProcessor(object):
|
|||||||
last_part = ''
|
last_part = ''
|
||||||
last_compressed_part = ''
|
last_compressed_part = ''
|
||||||
# magic in the following zlib.decompressobj argument is courtesy of
|
# magic in the following zlib.decompressobj argument is courtesy of
|
||||||
# http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk
|
# Python decompressing gzip chunk-by-chunk
|
||||||
d = zlib.decompressobj(16+zlib.MAX_WBITS)
|
# http://stackoverflow.com/questions/2423866
|
||||||
|
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
|
||||||
try:
|
try:
|
||||||
for chunk in o:
|
for chunk in o:
|
||||||
if compressed:
|
if compressed:
|
||||||
try:
|
try:
|
||||||
chunk = d.decompress(chunk)
|
chunk = d.decompress(chunk)
|
||||||
except zlib.error:
|
except zlib.error:
|
||||||
raise BadFileDownload() # bad compressed data
|
raise BadFileDownload() # bad compressed data
|
||||||
parts = chunk.split('\n')
|
parts = chunk.split('\n')
|
||||||
parts[0] = last_part + parts[0]
|
parts[0] = last_part + parts[0]
|
||||||
for part in parts[:-1]:
|
for part in parts[:-1]:
|
||||||
@ -208,6 +223,11 @@ class LogProcessor(object):
|
|||||||
|
|
||||||
|
|
||||||
class LogProcessorDaemon(Daemon):
|
class LogProcessorDaemon(Daemon):
|
||||||
|
"""
|
||||||
|
Gather raw log data and farm proccessing to generate a csv that is
|
||||||
|
uploaded to swift.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
c = conf.get('log-processor')
|
c = conf.get('log-processor')
|
||||||
super(LogProcessorDaemon, self).__init__(c)
|
super(LogProcessorDaemon, self).__init__(c)
|
||||||
@ -220,6 +240,7 @@ class LogProcessorDaemon(Daemon):
|
|||||||
self.log_processor_account = c['swift_account']
|
self.log_processor_account = c['swift_account']
|
||||||
self.log_processor_container = c.get('container_name',
|
self.log_processor_container = c.get('container_name',
|
||||||
'log_processing_data')
|
'log_processing_data')
|
||||||
|
self.worker_count = int(c.get('worker_count', '1'))
|
||||||
|
|
||||||
def run_once(self):
|
def run_once(self):
|
||||||
self.logger.info("Beginning log processing")
|
self.logger.info("Beginning log processing")
|
||||||
@ -228,19 +249,26 @@ class LogProcessorDaemon(Daemon):
|
|||||||
lookback_start = None
|
lookback_start = None
|
||||||
lookback_end = None
|
lookback_end = None
|
||||||
else:
|
else:
|
||||||
lookback_start = datetime.datetime.now() - \
|
delta_hours = datetime.timedelta(hours=self.lookback_hours)
|
||||||
datetime.timedelta(hours=self.lookback_hours)
|
lookback_start = datetime.datetime.now() - delta_hours
|
||||||
lookback_start = lookback_start.strftime('%Y%m%d%H')
|
lookback_start = lookback_start.strftime('%Y%m%d%H')
|
||||||
if self.lookback_window == 0:
|
if self.lookback_window == 0:
|
||||||
lookback_end = None
|
lookback_end = None
|
||||||
else:
|
else:
|
||||||
|
delta_window = datetime.timedelta(hours=self.lookback_window)
|
||||||
lookback_end = datetime.datetime.now() - \
|
lookback_end = datetime.datetime.now() - \
|
||||||
datetime.timedelta(hours=self.lookback_hours) + \
|
delta_hours + \
|
||||||
datetime.timedelta(hours=self.lookback_window)
|
delta_window
|
||||||
lookback_end = lookback_end.strftime('%Y%m%d%H')
|
lookback_end = lookback_end.strftime('%Y%m%d%H')
|
||||||
self.logger.debug('lookback_start: %s' % lookback_start)
|
self.logger.debug('lookback_start: %s' % lookback_start)
|
||||||
self.logger.debug('lookback_end: %s' % lookback_end)
|
self.logger.debug('lookback_end: %s' % lookback_end)
|
||||||
try:
|
try:
|
||||||
|
# Note: this file (or data set) will grow without bound.
|
||||||
|
# In practice, if it becomes a problem (say, after many months of
|
||||||
|
# running), one could manually prune the file to remove older
|
||||||
|
# entries. Automatically pruning on each run could be dangerous.
|
||||||
|
# There is not a good way to determine when an old entry should be
|
||||||
|
# pruned (lookback_hours could be set to anything and could change)
|
||||||
processed_files_stream = self.log_processor.get_object_data(
|
processed_files_stream = self.log_processor.get_object_data(
|
||||||
self.log_processor_account,
|
self.log_processor_account,
|
||||||
self.log_processor_container,
|
self.log_processor_container,
|
||||||
@ -261,12 +289,13 @@ class LogProcessorDaemon(Daemon):
|
|||||||
self.logger.info('loaded %d files to process' % len(logs_to_process))
|
self.logger.info('loaded %d files to process' % len(logs_to_process))
|
||||||
if not logs_to_process:
|
if not logs_to_process:
|
||||||
self.logger.info("Log processing done (%0.2f minutes)" %
|
self.logger.info("Log processing done (%0.2f minutes)" %
|
||||||
((time.time()-start)/60))
|
((time.time() - start) / 60))
|
||||||
return
|
return
|
||||||
|
|
||||||
# map
|
# map
|
||||||
processor_args = (self.total_conf, self.logger)
|
processor_args = (self.total_conf, self.logger)
|
||||||
results = multiprocess_collate(processor_args, logs_to_process)
|
results = multiprocess_collate(processor_args, logs_to_process,
|
||||||
|
self.worker_count)
|
||||||
|
|
||||||
#reduce
|
#reduce
|
||||||
aggr_data = {}
|
aggr_data = {}
|
||||||
@ -307,16 +336,23 @@ class LogProcessorDaemon(Daemon):
|
|||||||
|
|
||||||
# output
|
# output
|
||||||
sorted_keylist_mapping = sorted(keylist_mapping)
|
sorted_keylist_mapping = sorted(keylist_mapping)
|
||||||
columns = 'bill_ts,data_ts,account,' + ','.join(sorted_keylist_mapping)
|
columns = 'data_ts,account,' + ','.join(sorted_keylist_mapping)
|
||||||
print columns
|
out_buf = [columns]
|
||||||
for (account, year, month, day, hour), d in final_info.items():
|
for (account, year, month, day, hour), d in final_info.items():
|
||||||
bill_ts = ''
|
|
||||||
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
|
data_ts = '%s/%s/%s %s:00:00' % (year, month, day, hour)
|
||||||
row = [bill_ts, data_ts]
|
row = [data_ts]
|
||||||
row.append('%s' % account)
|
row.append('%s' % account)
|
||||||
for k in sorted_keylist_mapping:
|
for k in sorted_keylist_mapping:
|
||||||
row.append('%s'%d[k])
|
row.append('%s' % d[k])
|
||||||
print ','.join(row)
|
out_buf.append(','.join(row))
|
||||||
|
out_buf = '\n'.join(out_buf)
|
||||||
|
h = hashlib.md5(out_buf).hexdigest()
|
||||||
|
upload_name = time.strftime('%Y/%m/%d/%H/') + '%s.csv.gz' % h
|
||||||
|
f = cStringIO.StringIO(out_buf)
|
||||||
|
self.log_processor.internal_proxy.upload_file(f,
|
||||||
|
self.log_processor_account,
|
||||||
|
self.log_processor_container,
|
||||||
|
upload_name)
|
||||||
|
|
||||||
# cleanup
|
# cleanup
|
||||||
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
|
s = cPickle.dumps(processed_files, cPickle.HIGHEST_PROTOCOL)
|
||||||
@ -327,11 +363,11 @@ class LogProcessorDaemon(Daemon):
|
|||||||
'processed_files.pickle.gz')
|
'processed_files.pickle.gz')
|
||||||
|
|
||||||
self.logger.info("Log processing done (%0.2f minutes)" %
|
self.logger.info("Log processing done (%0.2f minutes)" %
|
||||||
((time.time()-start)/60))
|
((time.time() - start) / 60))
|
||||||
|
|
||||||
def multiprocess_collate(processor_args, logs_to_process):
|
|
||||||
|
def multiprocess_collate(processor_args, logs_to_process, worker_count):
|
||||||
'''yield hourly data from logs_to_process'''
|
'''yield hourly data from logs_to_process'''
|
||||||
worker_count = multiprocessing.cpu_count()
|
|
||||||
results = []
|
results = []
|
||||||
in_queue = multiprocessing.Queue()
|
in_queue = multiprocessing.Queue()
|
||||||
out_queue = multiprocessing.Queue()
|
out_queue = multiprocessing.Queue()
|
||||||
@ -361,6 +397,7 @@ def multiprocess_collate(processor_args, logs_to_process):
|
|||||||
for r in results:
|
for r in results:
|
||||||
r.join()
|
r.join()
|
||||||
|
|
||||||
|
|
||||||
def collate_worker(processor_args, in_queue, out_queue):
|
def collate_worker(processor_args, in_queue, out_queue):
|
||||||
'''worker process for multiprocess_collate'''
|
'''worker process for multiprocess_collate'''
|
||||||
p = LogProcessor(*processor_args)
|
p = LogProcessor(*processor_args)
|
||||||
@ -373,4 +410,4 @@ def collate_worker(processor_args, in_queue, out_queue):
|
|||||||
time.sleep(.1)
|
time.sleep(.1)
|
||||||
else:
|
else:
|
||||||
ret = p.process_one_file(*item)
|
ret = p.process_one_file(*item)
|
||||||
out_queue.put((item, ret))
|
out_queue.put((item, ret))
|
||||||
|
@ -25,18 +25,19 @@ from swift.common.internal_proxy import InternalProxy
|
|||||||
from swift.common.daemon import Daemon
|
from swift.common.daemon import Daemon
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
|
|
||||||
|
|
||||||
class LogUploader(Daemon):
|
class LogUploader(Daemon):
|
||||||
'''
|
'''
|
||||||
Given a local directory, a swift account, and a container name, LogParser
|
Given a local directory, a swift account, and a container name, LogParser
|
||||||
will upload all files in the local directory to the given account/container.
|
will upload all files in the local directory to the given account/
|
||||||
All but the newest files will be uploaded, and the files' md5 sum will be
|
container. All but the newest files will be uploaded, and the files' md5
|
||||||
computed. The hash is used to prevent duplicate data from being uploaded
|
sum will be computed. The hash is used to prevent duplicate data from
|
||||||
multiple times in different files (ex: log lines). Since the hash is
|
being uploaded multiple times in different files (ex: log lines). Since
|
||||||
computed, it is also used as the uploaded object's etag to ensure data
|
the hash is computed, it is also used as the uploaded object's etag to
|
||||||
integrity.
|
ensure data integrity.
|
||||||
|
|
||||||
Note that after the file is successfully uploaded, it will be unlinked.
|
Note that after the file is successfully uploaded, it will be unlinked.
|
||||||
|
|
||||||
The given proxy server config is used to instantiate a proxy server for
|
The given proxy server config is used to instantiate a proxy server for
|
||||||
the object uploads.
|
the object uploads.
|
||||||
'''
|
'''
|
||||||
@ -51,6 +52,11 @@ class LogUploader(Daemon):
|
|||||||
'/etc/swift/proxy-server.conf')
|
'/etc/swift/proxy-server.conf')
|
||||||
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
|
proxy_server_conf = appconfig('config:%s' % proxy_server_conf_loc,
|
||||||
name='proxy-server')
|
name='proxy-server')
|
||||||
|
new_log_cutoff = int(uploader_conf.get('new_log_cutoff', '7200'))
|
||||||
|
unlink_log = uploader_conf.get('unlink_log', 'True').lower() in \
|
||||||
|
('true', 'on', '1', 'yes')
|
||||||
|
self.unlink_log = unlink_log
|
||||||
|
self.new_log_cutoff = new_log_cutoff
|
||||||
if not log_dir.endswith('/'):
|
if not log_dir.endswith('/'):
|
||||||
log_dir = log_dir + '/'
|
log_dir = log_dir + '/'
|
||||||
self.log_dir = log_dir
|
self.log_dir = log_dir
|
||||||
@ -66,7 +72,7 @@ class LogUploader(Daemon):
|
|||||||
start = time.time()
|
start = time.time()
|
||||||
self.upload_all_logs()
|
self.upload_all_logs()
|
||||||
self.logger.info("Uploading logs complete (%0.2f minutes)" %
|
self.logger.info("Uploading logs complete (%0.2f minutes)" %
|
||||||
((time.time()-start)/60))
|
((time.time() - start) / 60))
|
||||||
|
|
||||||
def upload_all_logs(self):
|
def upload_all_logs(self):
|
||||||
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
|
i = [(self.filename_format.index(c), c) for c in '%Y %m %d %H'.split()]
|
||||||
@ -76,17 +82,17 @@ class LogUploader(Daemon):
|
|||||||
for start, c in i:
|
for start, c in i:
|
||||||
offset = base_offset + start
|
offset = base_offset + start
|
||||||
if c == '%Y':
|
if c == '%Y':
|
||||||
year_offset = offset, offset+4
|
year_offset = offset, offset + 4
|
||||||
# Add in the difference between len(%Y) and the expanded
|
# Add in the difference between len(%Y) and the expanded
|
||||||
# version of %Y (????). This makes sure the codes after this
|
# version of %Y (????). This makes sure the codes after this
|
||||||
# one will align properly in the final filename.
|
# one will align properly in the final filename.
|
||||||
base_offset += 2
|
base_offset += 2
|
||||||
elif c == '%m':
|
elif c == '%m':
|
||||||
month_offset = offset, offset+2
|
month_offset = offset, offset + 2
|
||||||
elif c == '%d':
|
elif c == '%d':
|
||||||
day_offset = offset, offset+2
|
day_offset = offset, offset + 2
|
||||||
elif c == '%H':
|
elif c == '%H':
|
||||||
hour_offset = offset, offset+2
|
hour_offset = offset, offset + 2
|
||||||
if not (year_offset and month_offset and day_offset and hour_offset):
|
if not (year_offset and month_offset and day_offset and hour_offset):
|
||||||
# don't have all the parts, can't upload anything
|
# don't have all the parts, can't upload anything
|
||||||
return
|
return
|
||||||
@ -122,9 +128,12 @@ class LogUploader(Daemon):
|
|||||||
# unexpected filename format, move on
|
# unexpected filename format, move on
|
||||||
self.logger.error("Unexpected log: %s" % filename)
|
self.logger.error("Unexpected log: %s" % filename)
|
||||||
continue
|
continue
|
||||||
if (time.time() - os.stat(filename).st_mtime) < 7200:
|
if ((time.time() - os.stat(filename).st_mtime) <
|
||||||
|
self.new_log_cutoff):
|
||||||
# don't process very new logs
|
# don't process very new logs
|
||||||
self.logger.debug("Skipping log: %s (< 2 hours old)" % filename)
|
self.logger.debug(
|
||||||
|
"Skipping log: %s (< %d seconds old)" % (filename,
|
||||||
|
self.new_log_cutoff))
|
||||||
continue
|
continue
|
||||||
self.upload_one_log(filename, year, month, day, hour)
|
self.upload_one_log(filename, year, month, day, hour)
|
||||||
|
|
||||||
@ -147,7 +156,7 @@ class LogUploader(Daemon):
|
|||||||
# By adding a hash to the filename, we ensure that uploaded files
|
# By adding a hash to the filename, we ensure that uploaded files
|
||||||
# have unique filenames and protect against uploading one file
|
# have unique filenames and protect against uploading one file
|
||||||
# more than one time. By using md5, we get an etag for free.
|
# more than one time. By using md5, we get an etag for free.
|
||||||
target_filename = '/'.join([year, month, day, hour, filehash+'.gz'])
|
target_filename = '/'.join([year, month, day, hour, filehash + '.gz'])
|
||||||
if self.internal_proxy.upload_file(filename,
|
if self.internal_proxy.upload_file(filename,
|
||||||
self.swift_account,
|
self.swift_account,
|
||||||
self.container_name,
|
self.container_name,
|
||||||
@ -155,6 +164,7 @@ class LogUploader(Daemon):
|
|||||||
compress=(not already_compressed)):
|
compress=(not already_compressed)):
|
||||||
self.logger.debug("Uploaded log %s to %s" %
|
self.logger.debug("Uploaded log %s to %s" %
|
||||||
(filename, target_filename))
|
(filename, target_filename))
|
||||||
os.unlink(filename)
|
if self.unlink_log:
|
||||||
|
os.unlink(filename)
|
||||||
else:
|
else:
|
||||||
self.logger.error("ERROR: Upload of log %s failed!" % filename)
|
self.logger.error("ERROR: Upload of log %s failed!" % filename)
|
||||||
|
@ -13,7 +13,9 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
class StatsLogProcessor(object):
|
class StatsLogProcessor(object):
|
||||||
|
"""Transform account storage stat logs"""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
pass
|
pass
|
||||||
|
@ -112,7 +112,7 @@ class TestLogProcessor(unittest.TestCase):
|
|||||||
|
|
||||||
def test_get_container_listing(self):
|
def test_get_container_listing(self):
|
||||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||||
p.internal_proxy = DumbInternalProxy()
|
p._internal_proxy = DumbInternalProxy()
|
||||||
result = p.get_container_listing('a', 'foo')
|
result = p.get_container_listing('a', 'foo')
|
||||||
expected = ['2010/03/14/13/obj1']
|
expected = ['2010/03/14/13/obj1']
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
@ -133,7 +133,7 @@ class TestLogProcessor(unittest.TestCase):
|
|||||||
|
|
||||||
def test_get_object_data(self):
|
def test_get_object_data(self):
|
||||||
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
p = log_processor.LogProcessor(self.proxy_config, DumbLogger())
|
||||||
p.internal_proxy = DumbInternalProxy()
|
p._internal_proxy = DumbInternalProxy()
|
||||||
result = list(p.get_object_data('a', 'c', 'o', False))
|
result = list(p.get_object_data('a', 'c', 'o', False))
|
||||||
expected = ['obj','data']
|
expected = ['obj','data']
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
@ -148,7 +148,7 @@ class TestLogProcessor(unittest.TestCase):
|
|||||||
'swift.stats.stats_processor.StatsLogProcessor'
|
'swift.stats.stats_processor.StatsLogProcessor'
|
||||||
}})
|
}})
|
||||||
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
|
p = log_processor.LogProcessor(stats_proxy_config, DumbLogger())
|
||||||
p.internal_proxy = DumbInternalProxy()
|
p._internal_proxy = DumbInternalProxy()
|
||||||
def get_object_data(*a,**kw):
|
def get_object_data(*a,**kw):
|
||||||
return [self.stats_test_line]
|
return [self.stats_test_line]
|
||||||
p.get_object_data = get_object_data
|
p.get_object_data = get_object_data
|
||||||
@ -158,4 +158,4 @@ class TestLogProcessor(unittest.TestCase):
|
|||||||
'object_count': 2,
|
'object_count': 2,
|
||||||
'container_count': 1,
|
'container_count': 1,
|
||||||
'bytes_used': 3}}
|
'bytes_used': 3}}
|
||||||
self.assertEquals(result, expected)
|
self.assertEquals(result, expected)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user