openstack-ansible-ops/elk_metrics_7x/templates/logstash-pipelines.yml.j2
Georgina Shippey 68664a9dc1 Config updates for elk 7.x
Updated ELK config files to elk 7.x reference samples, bringing over
existing customisation from elk_metrics_6x.

Removed deprecated use of --pipeline in elastic_beat_setup/tasks/main.yml,
--pipeline is no longer a valid cli argument.

Updated logstash-pipelines and removed the dynamic insertion of the date into
index names. This function is now done with the new ILM feature in elasticsearch
rather than logstash.

Installation of each beat creates an ILM policy for that beat and this patch
does not change the default policy. It is possible that the default policy
will exhaust the available storage and future work needs to be done to address
this.

The non-beat elements of the logstash pipeline (syslog, collectd and others)
are not yet updated to be compatible with ILM.

Change-Id: I735b64c2b7b93e23562f35266134a176a00af1b7
2019-08-05 07:47:35 +00:00

598 lines
24 KiB
Django/Jinja

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
- pipeline.id: "elk_metrics_7x"
queue.type: "persisted"
config.string: |
input {
beats {
id => "inputBeats"
client_inactivity_timeout => 300
port => {{ logstash_beat_input_port }}
add_field => {
"[@metadata][source_type]" => "beats"
}
}
}
{% if logstash_syslog_input_enabled | bool %}
input {
{% if logstash_syslog_input_mode == 'tcp' %}
tcp {
id => "inputSyslogTcp"
port => {{ logstash_syslog_input_port }}
type => syslog
add_field => {
"[@metadata][source_type]" => "syslog"
}
}
{% elif logstash_syslog_input_mode == 'udp' %}
udp {
id => "inputSyslogUdp"
port => {{ logstash_syslog_input_port }}
type => syslog
add_field => {
"[@metadata][source_type]" => "syslog"
}
}
{% endif %}
}
{% endif %}
{% if logstash_collectd_input_enabled | bool %}
input {
udp {
port => {{ logstash_collectd_port }}
buffer_size => {{ logstash_collectd_buffer_size }}
codec => collectd { }
{% if (logstash_collectd_security_level is defined) and (logstash_collectd_authfile is defined) %}
security_level => {{ logstash_collectd_security_level }}
authfile => {{ logstash_collectd_authfile }}
{% endif %}
add_field => {
"[@metadata][source_type]" => "collectd"
}
}
}
{% endif %}
filter {
if [@metadata][source_type] == "syslog" {
mutate {
add_tag => ["syslog"]
}
}
if [@metadata][source_type] == "collectd" {
mutate {
add_tag => ["collectd"]
}
}
# NOTE(mnaser): Filebeat doesn't support shipping to different outputs
# which means we need to parse `auditd` fileset here rather
# than rely on ingest.
if [fileset][module] == "auditd" {
grok {
break_on_match => false
match => {
message => [
"type=%{WORD:[auditd][log][record_type]}",
"msg=audit\(%{NUMBER:timestamp}:%{NUMBER:[auditd][log][sequence]}\)",
"a0=\"%{DATA:[auditd][log][a0]}\"",
"acct=\"%{DATA:[auditd][log][acct]}\"",
"addr=%{IPORHOST:[auditd][log][addr]}"
]
}
}
date {
match => [ "timestamp", "UNIX" ]
remove_field => "timestamp"
}
if [auditd][log][addr] {
geoip {
source => "[auditd][log][addr]"
target => "[auditd][geoip]"
}
}
# NOTE(mnaser): We don't match all fields so `grok` thinks that we
# failed.
mutate {
remove_tag => ["_grokparsefailure"]
}
}
if [@metadata][source_type] == "beats" or [@metadata][source_type] == "syslog" {
if [systemd_slice] {
mutate {
copy => { "systemd_slice" => "systemd_slice_tag" }
}
mutate {
gsub => [ "systemd_slice_tag", ".slice", "" ]
}
if [systemd_slice_tag] != "-" {
mutate {
add_tag => [
"%{systemd_slice_tag}"
]
}
mutate {
add_tag => [
"filebeat"
]
}
}
mutate {
remove_field => [ "%{systemd_slice_tag}" ]
}
}
if "filebeat" in [tags] {
if "Traceback" in [message] {
mutate {
add_tag => ["traceback"]
remove_tag => ["_grokparsefailure"]
}
}
if "auth" in [tags] {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG}: (?:%{SPACE})?%{GREEDYDATA:logmessage}" }
}
mutate {
add_field => { "module" => "auth" }
}
} else if "elasticsearch" in [tags] {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\]\[%{LOGLEVEL:loglevel}\s*\]\[%{NOTSPACE:module}\s*\] %{GREEDYDATA:logmessage}" }
}
mutate {
replace => { "module" => "elasticsearch.%{module}" }
}
} else if "ceph" in [tags] {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:date} %{NOTSPACE:osd_epoch} ?%{SPACE}?%{NOTSPACE:error_bool} %{GREEDYDATA:logmessage}" }
}
if "ceph-osd" in [tags] {
grok {
match => { "message" => "-- (?<src_host>(%{IPORHOST}\:%{POSINT}/%{POSINT})) (?:[<|>]){1,2} (?<dst_host>(%{IPORHOST}\:%{POSINT}/%{POSINT}))" }
}
}
} else if "libvirt" in [tags] {
grok {
match => { "message" => "(?m)^%{TIMESTAMP_ISO8601:logdate}:%{SPACE}%{NUMBER:code}:?%{SPACE}\[?\b%{NOTSPACE:loglevel}\b\]?%{SPACE}?:?%{SPACE}\[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?" }
add_field => { "received_at" => "%{@timestamp}"}
}
mutate {
uppercase => [ "loglevel" ]
}
} else if "logstash" in [tags] {
grok {
match => {
"message" => "\{\:timestamp=>\"%{TIMESTAMP_ISO8601:timestamp}\", \:message=>\"%{DATA:logmessage}\"(;|)(, \:address=>\"%{URIHOST:address}\", \:exception=>#<\"%{DATA:exception}\">, \:backtrace=>\[%{DATA:backtrace}\]|)(, \:level=>:\"%{LOGLEVEL:loglevel}\"|)\}"
}
}
mutate {
add_field => { "module" => "logstash" }
uppercase => [ "loglevel" ]
}
if [loglevel] == "WARN" {
mutate {
replace => { "loglevel" => "WARNING" }
}
} else if ![loglevel] {
mutate {
add_field => { "loglevel" => "ERROR" }
}
}
} else if "mysql" in [tags] {
grok {
match => { "message" => "# User@Host: %{WORD:user}\[%{WORD}\] @ (%{HOSTNAME:client_hostname}|) \[(%{IP:client_ip}|)\]" }
}
grok {
match => { "message" => "# Thread_id: %{NUMBER:thread_id:int} \s*Schema: (%{WORD:schema}| ) \s*QC_hit: %{WORD:qc_hit}" }
}
grok {
match => { "message" => "# Query_time: %{NUMBER:query_time:float} \s*Lock_time: %{NUMBER:lock_time:float} \s*Rows_sent: %{NUMBER:rows_sent:int} \s*Rows_examined: %{NUMBER:rows_examined:int}" }
}
grok {
match => { "message" => "(?m)SET timestamp=%{NUMBER:timestamp};%{GREEDYDATA:logmessage}" }
}
geoip {
source => "clientip"
}
date {
match => [ "timestamp", "UNIX" ]
}
mutate {
remove_field => "timestamp"
}
mutate {
gsub => [ "logmessage", "^\n", "" ]
add_field => { "module" => "mysql" }
add_field => { "loglevel" => "WARNING" }
}
} else if "nginx" in [tags] {
if "nginx-access" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{IP:client_ip} - %{USER:client_user} \[%{NGINX_TIMESTAMP:timestamp}\] \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" %{INT:response_code} %{INT:bytes} %{QUOTEDSTRING:referer} %{QUOTEDSTRING:user_agent} %{QUOTEDSTRING:gzip_ratio}"
}
}
geoip {
source => "clientip"
}
}
if "nginx-error" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => "%{NGINX_ERROR_TIMESTAMP:timestamp} \[%{LOGLEVEL:loglevel}\] %{GREEDYDATA:error_msg}"
}
}
}
} else if "openstack" in [tags] {
if "Can not find policy directory: policy.d" in [message] {
drop { }
}
grok {
match => {
"message" => [
"^%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}?%{SPACE}?(?<loglevel>AUDIT|CRITICAL|DEBUG|INFO|TRACE|WARNING|ERROR) \[?\b%{NOTSPACE:module}\b\]?%{SPACE}?%{GREEDYDATA:logmessage}?",
"^%{CISCOTIMESTAMP:journalddate}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{SYSLOGPROG:prog}%{SPACE}%{TIMESTAMP_ISO8601:logdate}%{SPACE}%{NUMBER:pid}%{SPACE}%{NOTSPACE:loglevel}%{SPACE}%{NOTSPACE:module}%{SPACE}%{GREEDYDATA:logmessage}"
]
}
}
grok {
match => {
"logmessage" => ["\[(%{NOTSPACE:request_id} %{NOTSPACE:user} %{NOTSPACE:tenant} %{NOTSPACE:domain} %{NOTSPACE:user_domain} %{NOTSPACE:project_domain}|\-)\] %{GREEDYDATA:logmessage}?"]
}
overwrite => [ "logmessage" ]
}
date {
match => [ "logdate", ISO8601 ]
remove_field => [ "logdate" ]
}
if "nova" in [tags] {
# Instance ID from logs (i.e. "[instance: 5ee83c6e-3604-467a-be54-e48429086e3f]")
grok {
match => {
"logmessage" => ["(\[instance\: %{NOTSPACE:instance_id}\] )?%{GREEDYDATA:logmessage}?"]
}
overwrite => [ "logmessage" ]
}
if [module] == "nova.api.openstack.requestlog" {
grok {
match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request}\" status\: %{NUMBER:response} len\: %{NUMBER:bytes} microversion\: (%{NUMBER:microversion}|\-) time\: %{NUMBER:duration:float}" }
add_tag => ["api"]
remove_field => [ "logmessage", "message" ]
}
} else if [module] == "nova.api.openstack.placement.requestlog" {
grok {
match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request}\" status\: %{NUMBER:response} len\: %{NUMBER:bytes} microversion\: (%{NUMBER:microversion}|\-)" }
add_tag => ["api"]
remove_field => [ "logmessage", "message" ]
}
}
} else if "neutron" in [tags] {
if [module] == "neutron.wsgi" {
grok {
match => { "logmessage" => "%{IPORHOST:client_ip} \"%{WORD:verb} %{NOTSPACE:request} HTTP/%{NUMBER:http_version}\" status\: %{NUMBER:response} len\: %{NUMBER:bytes} time\: %{NUMBER:duration:float}" }
add_tag => ["api"]
remove_field => [ "logmessage", "message" ]
}
} else if "neutron-ha-tool" in [source] {
mutate {
add_tag => ["neutron-ha-tool"]
remove_tag => ["_grokparsefailure"]
}
}
if "starting" in [message] and "_grokparsefailure" in [tags] {
grok {
match => { "logmessage" => "(%{SPACE}\(%{NUMBER:pid}\)) %{GREEDYDATA:servicemessage}" }
}
mutate {
remove_tag => ["_grokparsefailure"]
}
}
} else if "glance" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["api"]
}
mutate {
replace => { "module" => "glance.%{module}" }
}
}
} else if "cinder" in [tags] {
if [module] == "cinder.eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["api"]
}
}
mutate {
replace => { "module" => "cinder.%{module}" }
}
}
} else if "horizon" in [tags] {
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"message" => [
"%{COMMONAPACHELOG}",
"\[%{APACHE_ERROR_TIMESTAMP:timestamp}\] \[%{DATA:module}:%{DATA:loglevel}\] \[pid %{POSINT:apache_pid}\:tid %{POSINT:apache_tid}\] ?(?:\[client %{IP:clientip}:%{POSINT:clientport}\] )?%{GREEDYDATA:logmessage}",
"%{SYSLOGTIMESTAMP:timestamp}%{SPACE}%{SYSLOGHOST:host}%{SPACE}%{PROG:prog}%{SPACE}%{IP:clientip}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NUMBER}%{SPACE}%{QS}%{SPACE}%{QS}"
]
}
}
geoip {
source => "clientip"
}
if ![loglevel] {
mutate {
add_field => { "logmessage" => "%{request}" }
add_field => { "module" => "horizon.access" }
add_field => { "loglevel" => "INFO" }
add_tag => [ "apache-access" ]
}
} else {
mutate {
replace => { "module" => "horizon.error.%{module}" }
add_tag => [ "apache-error" ]
uppercase => [ "loglevel" ]
}
}
} else if "heat" in [tags] {
if [module] == "eventlet.wsgi.server" {
if "accepted" not in [logmessage] {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "%{NOTSPACE:requesterip} %{NOTSPACE} %{NOTSPACE} \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes} %{BASE10NUM:httptime}" }
add_tag => ["api"]
}
}
mutate {
replace => { "module" => "heat.%{module}" }
}
} else if [module] == "heat.engine.service" {
grok {
match => { "logmessage" => "%{GREEDYDATA:servicemessage}" }
add_tag => ["api"]
}
}
} else if "swift-account" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP}%{SPACE}%{HOSTNAME}%{SPACE}%{PROG}%{SPACE}%{SYSLOGTIMESTAMP}%{SPACE}%{S3_REQUEST_LINE}%{SPACE}%{IP}%{SPACE}%{NOTSPACE}%{SPACE}%{NOTSPACE}%{SPACE}%{SYSLOG5424SD}%{SPACE}%{QS}%{SPACE}%{POSINT}%{SPACE}%{NOTSPACE}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{QS}%{SPACE}%{SECOND}%{SPACE}%{QS}%{SPACE}%{NUMBER}%{SPACE}%{NOTSPACE}"
}
}
} else if "swift" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{NOTSPACE:logsource} %{SYSLOGPROG:module}: (?:%{SPACE})?%{GREEDYDATA:logmessage}"
}
}
grok {
patterns_dir => ["/opt/logstash/patterns"]
match => {
"logmessage" => [
"%{COMBINEDAPACHELOG}",
"%{SWIFTPROXY_ACCESS}",
"%{GREEDYDATA:logmessage} \(txn\: %{DATA:swift_txn}\)"
]
}
tag_on_failure => []
overwrite => [ "logmessage" ]
}
if [request] {
mutate {
replace => { "logmessage" => "%{request}" }
}
}
mutate {
replace => { "module" => "swift.%{module}" }
}
if [file] =~ "error.log$" {
mutate {
add_field => { "loglevel" => "NOTICE" }
}
} else {
mutate {
add_field => { "loglevel" => "INFO" }
}
}
} else if "keystone" in [tags] {
if [loglevel] == "INFO" and [module] == "keystone.common.wsgi" {
grok {
match => { "logmessage" => "%{WORD:verb} %{NOTSPACE:request}" }
remove_field => [ "logmessage", "message" ]
}
}
} else if "magnum" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["api"]
}
mutate {
replace => { "module" => "magnum.%{module}" }
}
}
} else if "octavia" in [tags] {
if [module] == "eventlet.wsgi.server" {
mutate {
gsub => ["logmessage","\"",""]
}
grok {
match => { "logmessage" => "%{NOTSPACE:requesterip} \- \- \[%{NOTSPACE:req_date} %{NOTSPACE:req_time}\] %{NOTSPACE:verb} %{NOTSPACE:url_path} %{NOTSPACE:http_ver} %{NUMBER:response} %{NUMBER:bytes:int} %{BASE10NUM:httptime:float}" }
add_tag => ["api"]
}
mutate {
replace => { "module" => "octavia.%{module}" }
}
}
}
} else if "rabbitmq" in [tags] {
if [message] == "" {
drop { }
}
grok {
match => { "message" => "^\=%{LOGLEVEL:loglevel} REPORT\=\=\=\= %{MONTHDAY:event_day}\-%{MONTH:event_month}\-%{YEAR:event_year}\:\:%{TIME:event_time} \=\=\=\n%{GREEDYDATA:logmessage}" }
}
mutate {
replace => { "module" => "rabbitmq" }
add_field => { "timestamp" => "%{event_day} %{event_month} %{event_year} %{event_time}" }
}
date {
match => [ "timestamp", "dd MMM YYYY HH:mm:ss" ]
remove_field => [ "event_day", "event_month", "event_year", "event_time", "timestamp" ]
}
}
}
}
if [source.ip] {
geoip {
id => "setGeoIpSource"
source => "source.ip"
}
} else if [ip] {
geoip {
id => "setGeoIp"
source => "ip"
}
}
if [message] {
fingerprint {
id => "setSHA1"
target => "[@metadata][fingerprint]"
method => "SHA1"
key => "{{ inventory_hostname | to_uuid }}"
}
}
}
output {
if [@metadata][fingerprint] {
if [@metadata][version] {
elasticsearch {
id => "elasticsearchDocIDOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
} else if [@metadata][beat] {
elasticsearch {
id => "elasticsearchLegacyDocIDOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
} else if "syslog" in [tags] {
elasticsearch {
id => "elasticsearchSyslogDocIDOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "syslog-%{+YYYY.MM.dd}"
}
} else if "collectd" in [tags] {
elasticsearch {
id => "elasticsearchCollectdDocIDOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "collectd-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
id => "elasticsearchUndefinedDocIDOutputPipeline"
document_id => "%{[@metadata][fingerprint]}"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "undefined-%{+YYYY.MM.dd}"
}
}
} else {
if [@metadata][version] {
elasticsearch {
id => "elasticsearchOutputPipeline"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
}
} else if [@metadata][beat] {
elasticsearch {
id => "elasticsearchLegacyOutputPipeline"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
} else if "syslog" in [tags] {
elasticsearch {
id => "elasticsearchSyslogOutputPipeline"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "syslog-%{+YYYY.MM.dd}"
}
} else if "collectd" in [tags] {
elasticsearch {
id => "elasticsearchCollectdOutputPipeline"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "collectd-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
id => "elasticsearchUndefinedOutputPipeline"
hosts => ["{{ '127.0.0.1:' ~ elastic_port }}"]
sniffing => {{ (elastic_sniffing_enabled | default(not data_node)) | bool | string | lower }}
manage_template => {{ (data_node | bool) | lower }}
index => "undefined-%{+YYYY.MM.dd}"
}
}
}
{% if logstash_kafka_options is defined %}
kafka {
{% for key, value in logstash_kafka_options.items() %}
{% if value is number %}
{{ key }} => {{ value }}
{% elif value is iterable and value is not string %}
{{ key }} => "{{ value | join(',') }}"
{% else %}
{{ key }} => "{{ value }}"
{% endif %}
{% endfor %}
}
{% endif %}
}