From 9ef1be1c389658cb57c44bd5a351ad94ac3df61c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Lemoine?= Date: Fri, 5 Feb 2016 10:52:07 +0100 Subject: [PATCH] Add Heka log decoder infrastructure In the future the Heka Lua sandboxes will be moved out of Kolla, and installed in the Heka container using a deb or rpm package. Partially implements: blueprint heka Change-Id: I34cc80f62ddbca8ee330c971f58fee8686e245e6 --- docker/heka/Dockerfile.j2 | 4 + docker/heka/plugins/decoders/os_syslog.lua | 55 ++++++++ docker/heka/plugins/encoders/os_syslog.lua | 26 ++++ docker/heka/plugins/modules/os_patterns.lua | 144 ++++++++++++++++++++ docker/heka/plugins/modules/os_utils.lua | 89 ++++++++++++ 5 files changed, 318 insertions(+) create mode 100644 docker/heka/plugins/decoders/os_syslog.lua create mode 100644 docker/heka/plugins/encoders/os_syslog.lua create mode 100644 docker/heka/plugins/modules/os_patterns.lua create mode 100644 docker/heka/plugins/modules/os_utils.lua diff --git a/docker/heka/Dockerfile.j2 b/docker/heka/Dockerfile.j2 index 8ce52f4e58..b5e8ea7f8e 100644 --- a/docker/heka/Dockerfile.j2 +++ b/docker/heka/Dockerfile.j2 @@ -14,6 +14,10 @@ RUN curl --location https://github.com/mozilla-services/heka/releases/download/v {% endif %} +COPY plugins/modules /usr/share/heka/lua_modules/ +COPY plugins/decoders /usr/share/heka/lua_decoders/ +COPY plugins/encoders /usr/share/heka/lua_encoders/ + COPY heka_sudoers /etc/sudoers.d/heka_sudoers COPY extend_start.sh /usr/local/bin/kolla_extend_start diff --git a/docker/heka/plugins/decoders/os_syslog.lua b/docker/heka/plugins/decoders/os_syslog.lua new file mode 100644 index 0000000000..34fdd0bcbd --- /dev/null +++ b/docker/heka/plugins/decoders/os_syslog.lua @@ -0,0 +1,55 @@ +-- Copyright 2016 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- The code in this file was inspired by Heka's rsyslog.lua decoder plugin. +-- https://github.com/mozilla-services/heka/blob/master/sandbox/lua/decoders/rsyslog.lua + +local syslog = require "syslog" +local utils = require "os_utils" + +local msg = { + Timestamp = nil, + Type = 'Syslog', + Hostname = read_config("hostname"), + Payload = nil, + Pid = nil, + Severity = nil, + Fields = nil +} + +-- See https://tools.ietf.org/html/rfc3164 +local grammar = syslog.build_rsyslog_grammar('<%PRI%>%TIMESTAMP% %syslogtag% %msg%') + +function process_message () + local log = read_message("Payload") + local fields = grammar:match(log) + if not fields then return -1 end + + msg.Timestamp = fields.timestamp + fields.timestamp = nil + + msg.Severity = fields.pri.severity + fields.syslogfacility = fields.pri.facility + fields.pri = nil + + fields.programname = fields.syslogtag.programname + msg.Pid = fields.syslogtag.pid + fields.syslogtag = nil + + msg.Payload = fields.msg + fields.msg = nil + + msg.Fields = fields + return utils.safe_inject_message(msg) +end diff --git a/docker/heka/plugins/encoders/os_syslog.lua b/docker/heka/plugins/encoders/os_syslog.lua new file mode 100644 index 0000000000..0acab8d58d --- /dev/null +++ b/docker/heka/plugins/encoders/os_syslog.lua @@ -0,0 +1,26 @@ +-- Copyright 2016 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +require "string" + +local interpolate = require "msg_interpolate" +local utils = require "os_utils" + +local header_template = "<%{Severity}>%{%FT%TZ} %{Hostname} %{programname}[%{Pid}]:" + +function process_message() + local timestamp = read_message("Timestamp") / 1e9 + local header = interpolate.interpolate_from_msg(header_template, timestamp) + local payload = string.format("%s %s\n", header, read_message("Payload")) + return utils.safe_inject_payload("txt", "", payload) +end diff --git a/docker/heka/plugins/modules/os_patterns.lua b/docker/heka/plugins/modules/os_patterns.lua new file mode 100644 index 0000000000..4cf0680306 --- /dev/null +++ b/docker/heka/plugins/modules/os_patterns.lua @@ -0,0 +1,144 @@ +-- Copyright 2015-2016 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +local table = require 'table' +local dt = require "date_time" +local l = require 'lpeg' +l.locale(l) + +local tonumber = tonumber + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module + +function format_uuid(t) + return table.concat(t, '-') +end + +function anywhere (patt) + return l.P { + patt + 1 * l.V(1) + } +end + +sp = l.space +colon = l.P":" +dash = l.P"-" +dot = l.P'.' +quote = l.P'"' + +local x4digit = l.xdigit * l.xdigit * l.xdigit * l.xdigit +local uuid_dash = l.C(x4digit * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * dash * x4digit * x4digit * x4digit) +local uuid_nodash = l.Ct(l.C(x4digit * x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit) * l.C(x4digit * x4digit * x4digit)) / format_uuid + +-- Return a UUID string in canonical format (eg with dashes) +Uuid = uuid_nodash + uuid_dash + +-- Parse a datetime string and return a table with the following keys +-- year (string) +-- month (string) +-- day (string) +-- hour (string) +-- min (string) +-- sec (string) +-- sec_frac (number less than 1, can be nil) +-- offset_sign ('-' or '+', can be nil) +-- offset_hour (number, can be nil) +-- offset_min (number, can be nil) +-- +-- The datetime string can be formatted as +-- 'YYYY-MM-DD( |T)HH:MM:SS(.ssssss)?(offset indicator)?' +TimestampTable = l.Ct(dt.rfc3339_full_date * (sp + l.P"T") * dt.rfc3339_partial_time * (dt.rfc3339_time_offset + dt.timezone_offset)^-1) + +-- Returns the parsed datetime converted to nanosec +Timestamp = TimestampTable / dt.time_to_ns + +programname = (l.R("az", "AZ", "09") + l.P"." + dash + l.P"_")^1 +Pid = l.digit^1 +SeverityLabel = l.P"CRITICAL" + l.P"ERROR" + l.P"WARNING" + l.P"INFO" + l.P"AUDIT" + l.P"DEBUG" +Message = l.P(1)^0 + +-- Capture for OpenStack logs producing four values: Timestamp, Pid, +-- SeverityLabel, PythonModule and Message. +-- +-- OpenStack log messages are of this form: +-- 2015-11-30 08:38:59.306 3434 INFO oslo_service.periodic_task [-] Blabla... +-- +-- [-] is the "request" part, it can take multiple forms. See below. +openstack = l.Ct(l.Cg(Timestamp, "Timestamp")* sp * l.Cg(Pid, "Pid") * sp * + l.Cg(SeverityLabel, "SeverityLabel") * sp * l.Cg(programname, "PythonModule") * + sp * l.Cg(Message, "Message")) + +-- Capture for OpenStack request context producing three values: RequestId, +-- UserId and TenantId. +-- +-- Notes: +-- +-- OpenStack logs include a request context, enclosed between square brackets. +-- It takes one of these forms: +-- +-- [-] +-- [req-0fd2a9ba-448d-40f5-995e-33e32ac5a6ba - - - - -] +-- [req-4db318af-54c9-466d-b365-fe17fe4adeed 8206d40abcc3452d8a9c1ea629b4a8d0 112245730b1f4858ab62e3673e1ee9e2 - - -] +-- +-- In the 1st case the capture produces nil. +-- In the 2nd case the capture produces one value: RequestId. +-- In the 3rd case the capture produces three values: RequestId, UserId, TenantId. +-- +-- The request id may be formatted as 'req-xxx' or 'xxx' depending on the project. +-- The user id and tenant id may not be present depending on the OpenStack release. +openstack_request_context = (l.P(1) - "[" )^0 * "[" * l.P"req-"^-1 * + l.Ct(l.Cg(Uuid, "RequestId") * sp * ((l.Cg(Uuid, "UserId") * sp * + l.Cg(Uuid, "TenantId")) + l.P(1)^0)) - "]" + +local http_method = l.Cg(l.R"AZ"^3, "http_method") +local url = l.Cg( (1 - sp)^1, "http_url") +local http_version = l.Cg(l.digit * dot * l.digit, "http_version") + +-- Pattern for the " HTTP/" format found +-- found in both OpenStack and Apache log files. +-- Example : OPTIONS /example.com HTTP/1.0 +http_request = http_method * sp * url * sp * l.P'HTTP/' * http_version + +-- Patterns for HTTP status, HTTP response size and HTTP response time in +-- OpenLayers logs. +-- +-- Notes: +-- Nova changes the default log format of eventlet.wsgi (see nova/wsgi.py) and +-- prefixes the HTTP status, response size and response time values with +-- respectively "status: ", "len: " and "time: ". +-- Other OpenStack services just rely on the default log format. +-- TODO(pasquier-s): build the LPEG grammar based on the log_format parameter +-- passed to eventlet.wsgi.server similar to what the build_rsyslog_grammar +-- function does for RSyslog. +local openstack_http_status = l.P"status: "^-1 * l.Cg(l.digit^3, "http_status") +local openstack_response_size = l.P"len: "^-1 * l.Cg(l.digit^1 / tonumber, "http_response_size") +local openstack_response_time = l.P"time: "^-1 * l.Cg(l.digit^1 * dot^0 * l.digit^0 / tonumber, "http_response_time") + +-- Capture for OpenStack HTTP producing six values: http_method, http_url, +-- http_version, http_status, http_response_size and http_response_time. +openstack_http = anywhere(l.Ct( + quote * http_request * quote * sp * + openstack_http_status * sp * openstack_response_size * sp * + openstack_response_time +)) + +-- Capture for IP addresses producing one value: ip_address. +ip_address = anywhere(l.Ct( + l.Cg(l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3 * dot * l.digit^-3, "ip_address") +)) + +-- Pattern used to match the beginning of a Python Traceback. +traceback = l.P'Traceback (most recent call last):' + +return M diff --git a/docker/heka/plugins/modules/os_utils.lua b/docker/heka/plugins/modules/os_utils.lua new file mode 100644 index 0000000000..c5b71c72a4 --- /dev/null +++ b/docker/heka/plugins/modules/os_utils.lua @@ -0,0 +1,89 @@ +-- Copyright 2015-2016 Mirantis, Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +local cjson = require 'cjson' +local string = require 'string' + +local patt = require 'os_patterns' + +local pairs = pairs +local inject_message = inject_message +local inject_payload = inject_payload +local read_message = read_message +local pcall = pcall + +local M = {} +setfenv(1, M) -- Remove external access to contain everything in the module + +severity_to_label_map = { + [0] = 'EMERGENCY', + [1] = 'ALERT', + [2] = 'CRITICAL', + [3] = 'ERROR', + [4] = 'WARNING', + [5] = 'NOTICE', + [6] = 'INFO', + [7] = 'DEBUG', +} + +label_to_severity_map = { + EMERGENCY = 0, + ALERT = 1, + CRITICAL = 2, + ERROR = 3, + WARNING = 4, + NOTICE = 5, + INFO= 6, + DEBUG = 7, +} + +function chomp(s) + return string.gsub(s, "\n$", "") +end + +-- Call inject_message() wrapped by pcall() +function safe_inject_message(msg) + local ok, err_msg = pcall(inject_message, msg) + if not ok then + return -1, err_msg + else + return 0 + end +end + +-- Call inject_payload() wrapped by pcall() +function safe_inject_payload(payload_type, payload_name, data) + local ok, err_msg = pcall(inject_payload, payload_type, payload_name, data) + if not ok then + return -1, err_msg + else + return 0 + end +end + +-- Shallow comparison between two tables. +-- Return true if the two tables have the same keys with identical +-- values, otherwise false. +function table_equal(t1, t2) + -- all key-value pairs in t1 must be in t2 + for k, v in pairs(t1) do + if t2[k] ~= v then return false end + end + -- there must not be other keys in t2 + for k, v in pairs(t2) do + if t1[k] == nil then return false end + end + return true +end + +return M