diff --git a/elk_metrics_6x/templates/logstash-pipelines.yml.j2 b/elk_metrics_6x/templates/logstash-pipelines.yml.j2 index 9ef5864c..d59e8b6c 100644 --- a/elk_metrics_6x/templates/logstash-pipelines.yml.j2 +++ b/elk_metrics_6x/templates/logstash-pipelines.yml.j2 @@ -44,6 +44,43 @@ add_tag => ["syslog"] } } + + # NOTE(mnaser): Filebeat doesn't support shipping to different outputs + # which means we need to parse `auditd` fileset here rather + # than rely on ingest. + if [fileset][module] == "auditd" { + grok { + break_on_match => false + match => { + message => [ + "type=%{WORD:[auditd][log][record_type]}", + "msg=audit\(%{NUMBER:timestamp}:%{NUMBER:[auditd][log][sequence]}\)", + "a0=\"%{DATA:[auditd][log][a0]}\"", + "acct=\"%{DATA:[auditd][log][acct]}\"", + "addr=%{IPORHOST:[auditd][log][addr]}" + ] + } + } + + date { + match => [ "timestamp", "UNIX" ] + remove_field => "timestamp" + } + + if [auditd][log][addr] { + geoip { + source => "[auditd][log][addr]" + target => "[auditd][geoip]" + } + } + + # NOTE(mnaser): We don't match all fields so `grok` thinks that we + # failed. + mutate { + remove_tag => ["_grokparsefailure"] + } + } + if [@metadata][source_type] == "beats" or [@metadata][source_type] == "syslog" { if [systemd_slice] { mutate {