Merge "Kafka - Implement SASL Authentication"

This commit is contained in:
Zuul 2019-12-31 19:02:38 +00:00 committed by Gerrit Code Review
commit e389f51865
15 changed files with 400 additions and 57 deletions

@ -0,0 +1,52 @@
#!/bin/sh
{{/* Copyright 2019 The Openstack-Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */}}
{{- $envAll := . }}
{{- if .Values.monitoring.prometheus.enabled }}
{{- $credentials := .Values.endpoints.kafka_exporter.auth }}
/opt/kafka/bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER_CONNECT \
--add \
--allow-principal User:{{ $credentials.username }} \
--operation DESCRIBE \
--topic "*" \
--group "*" \
--cluster
{{ end }}
{{ $producers := .Values.conf.kafka.jaas.producers }}
{{- range $producer, $properties := $producers }}
/opt/kafka/bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER_CONNECT \
--add \
--allow-principal User:{{ $properties.username }} \
--producer \
--topic {{ $properties.topic | quote }}
{{- end }}
{{ $consumers := .Values.conf.kafka.jaas.consumers }}
{{- range $consumer, $properties := $consumers }}
/opt/kafka/bin/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect=$KAFKA_ZOOKEEPER_CONNECT \
--add \
--allow-principal User:{{ $properties.username }} \
--consumer \
--topic {{ $properties.topic | quote }} \
--group {{ $properties.group | quote }}
{{- end }}

@ -20,13 +20,13 @@ function create_topic () {
--create --topic $1 \
--partitions $2 \
--replication-factor $3 \
--bootstrap-server $KAFKA_BROKERS
--zookeeper $KAFKA_ZOOKEEPER_CONNECT
}
function describe_topic () {
./opt/kafka/bin/kafka-topics.sh \
--describe --topic $1 \
--bootstrap-server $KAFKA_BROKERS
--zookeeper $KAFKA_ZOOKEEPER_CONNECT
}
function produce_message () {
@ -39,7 +39,7 @@ function produce_message () {
function consume_messages () {
./opt/kafka/bin/kafka-console-consumer.sh \
--topic $1 \
--timeout-ms 500 \
--timeout-ms 5000 \
--from-beginning \
--bootstrap-server $KAFKA_BROKERS
}
@ -53,10 +53,10 @@ function delete_partition_messages () {
function delete_topic () {
./opt/kafka/bin/kafka-topics.sh \
--delete --topic $1 \
--bootstrap-server $KAFKA_BROKERS
--zookeeper $KAFKA_ZOOKEEPER_CONNECT
}
set -e
set -ex
TOPIC="kafka-test"
PARTITION_COUNT=3
@ -66,50 +66,56 @@ echo "Creating topic $TOPIC"
create_topic $TOPIC $PARTITION_COUNT $PARTITION_REPLICAS
describe_topic $TOPIC
echo "Producing 5 messages"
for i in {1..5}; do
MESSAGE="Message #$i"
produce_message $TOPIC "$MESSAGE"
done
# Note: The commands used here are not playing well with the WIP
# SASL auth implementation. Commenting for now:
echo -e "\nConsuming messages (A \"TimeoutException\" is expected, else this would consume forever)"
consume_messages $TOPIC
# echo "Producing 5 messages"
# for i in {1..5}; do
# MESSAGE="Message #$i"
# produce_message $TOPIC "$MESSAGE"
# done
echo "Producing 5 more messages"
for i in {6..10}; do
MESSAGE="Message #$i"
produce_message $TOPIC "$MESSAGE"
done
# echo -e "\nConsuming messages (A \"TimeoutException\" is expected, else this would consume forever)"
# consume_messages $TOPIC
echo -e "\nCreating partition offset reset json file"
tee /tmp/partition_offsets.json << EOF
{
"partitions": [
{
"topic": "$TOPIC",
"partition": 0,
"offset": -1
}, {
"topic": "$TOPIC",
"partition": 1,
"offset": -1
}, {
"topic": "$TOPIC",
"partition": 2,
"offset": -1
}
],
"version": 1
}
EOF
# echo "Producing 5 more messages"
# for i in {6..10}; do
# MESSAGE="Message #$i"
# produce_message $TOPIC "$MESSAGE"
# done
echo "Resetting $TOPIC partitions (deleting messages)"
delete_partition_messages /tmp/partition_offsets.json
# echo -e "\nCreating partition offset reset json file"
# tee /tmp/partition_offsets.json << EOF
# {
# "partitions": [
# {
# "topic": "$TOPIC",
# "partition": 0,
# "offset": -1
# }, {
# "topic": "$TOPIC",
# "partition": 1,
# "offset": -1
# }, {
# "topic": "$TOPIC",
# "partition": 2,
# "offset": -1
# }
# ],
# "version": 1
# }
# EOF
# echo "Resetting $TOPIC partitions (deleting messages)"
# delete_partition_messages /tmp/partition_offsets.json
echo "Deleting topic $TOPIC"
delete_topic $TOPIC
delete_topic $TOPIC >> /tmp/deletion
if [ $(describe_topic $TOPIC | wc -l) -eq 0 ]; then
cat /tmp/deletion
if [ $(cat /tmp/deletion | grep 'marked for deletion' | wc -l) -eq 1 ]
then
echo "Topic $TOPIC was deleted successfully."
exit 0
else

@ -15,13 +15,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{ if not (empty .Values.conf.kafka.server_settings) }}
{{ range $key, $value := .Values.conf.kafka.server_settings }}
{{ $varName := printf "%s%s" "KAFKA_" ($key | upper) }}
{{ $varValue := ternary ($value | quote) ($value | int) (kindIs "string" $value) }}
{{- if not (empty .Values.conf.kafka.server_settings) }}
{{- range $key, $value := .Values.conf.kafka.server_settings }}
{{- $varName := printf "%s%s" "KAFKA_" ($key | upper) }}
{{- $varValue := ternary ($value | quote) ($value | int) (kindIs "string" $value) }}
export {{ $varName }}={{ $varValue }}
{{ end }}
{{ end }}
{{- end }}
{{- end }}
export KAFKA_SUPER_USERS="User:$ADMIN_USERNAME"
COMMAND="${@:-start}"

@ -30,4 +30,6 @@ data:
{{ tuple "bin/_kafka-probe.sh.tpl" . | include "helm-toolkit.utils.template" | indent 4 }}
helm-test.sh: |
{{ tuple "bin/_helm-test.sh.tpl" . | include "helm-toolkit.utils.template" | indent 4 }}
generate-acl.sh: |
{{ tuple "bin/_generate-acl.sh.tpl" . | include "helm-toolkit.utils.template" | indent 4 }}
{{- end -}}

@ -0,0 +1,27 @@
{{/*
Copyright 2019 The Openstack-Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.manifests.configmap_etc }}
{{- $envAll := . }}
---
apiVersion: v1
kind: Secret
metadata:
name: kafka-etc
type: Opaque
data:
{{- include "helm-toolkit.snippets.values_template_renderer" (dict "envAll" $envAll "template" .Values.conf.kafka.jaas.template "key" "jaas.conf" "format" "Secret") | indent 2 }}
{{- end }}

@ -0,0 +1,74 @@
{{/*
Copyright 2019 The Openstack-Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.manifests.job_generate_acl }}
{{- $envAll := . }}
{{- $KafkaUserSecret := .Values.secrets.kafka.admin }}
{{- $serviceAccountName := "kafka-generate-acl" }}
{{ tuple $envAll "generate_acl" $serviceAccountName | include "helm-toolkit.snippets.kubernetes_pod_rbac_serviceaccount" }}
---
apiVersion: batch/v1
kind: Job
metadata:
name: kafka-generate-acl
annotations:
{{ tuple $envAll | include "helm-toolkit.snippets.release_uuid" }}
spec:
backoffLimit: {{ .Values.jobs.generate_acl.backoffLimit }}
template:
metadata:
labels:
{{ tuple $envAll "kafka" "generate-acl" | include "helm-toolkit.snippets.kubernetes_metadata_labels" | indent 8 }}
spec:
{{ dict "envAll" $envAll "application" "generate-acl" | include "helm-toolkit.snippets.kubernetes_pod_security_context" | indent 6 }}
serviceAccountName: {{ $serviceAccountName }}
activeDeadlineSeconds: {{ .Values.jobs.generate_acl.activeDeadlineSeconds }}
restartPolicy: OnFailure
nodeSelector:
{{ .Values.labels.job.node_selector_key }}: {{ .Values.labels.job.node_selector_value | quote }}
initContainers:
{{ tuple $envAll "generate_acl" list | include "helm-toolkit.snippets.kubernetes_entrypoint_init_container" | indent 8 }}
containers:
- name: generate-acl
{{ tuple $envAll "generate_acl" | include "helm-toolkit.snippets.image" | indent 10 }}
{{ tuple $envAll $envAll.Values.pod.resources.jobs.generate_acl | include "helm-toolkit.snippets.kubernetes_resources" | indent 10 }}
{{ dict "envAll" $envAll "application" "generate_acl" "container" "generate_acl" | include "helm-toolkit.snippets.kubernetes_container_security_context" | indent 10 }}
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: "{{ tuple "zookeeper" "internal" "client" $envAll | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" }}"
command:
- /tmp/generate-acl.sh
volumeMounts:
- name: kafka-bin
mountPath: /tmp/generate-acl.sh
subPath: generate-acl.sh
readOnly: true
- name: kafka-etc
mountPath: /opt/kafka/config/jaas.conf
subPath: jaas.conf
readOnly: true
volumes:
- name: kafka-bin
configMap:
name: kafka-bin
defaultMode: 0555
- name: kafka-etc
secret:
secretName: kafka-etc
defaultMode: 0444
{{- end }}

@ -20,7 +20,10 @@ COMMAND="${@:-start}"
function start () {
exec /bin/kafka_exporter \
--kafka.server={{ tuple "kafka" "internal" "broker" . | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" }}
--sasl.enabled \
--sasl.username=$KAFKA_EXPORTER_USERNAME \
--sasl.password=$KAFKA_EXPORTER_PASSWORD \
--kafka.server=$KAFKA_BROKERS
}
function stop () {

@ -62,7 +62,19 @@ spec:
command:
- /tmp/kafka-exporter.sh
- stop
# env: {}
env:
- name: KAFKA_BROKERS
value: {{ tuple "kafka" "internal" "broker" . | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" | quote }}
- name: KAFKA_EXPORTER_USERNAME
valueFrom:
secretKeyRef:
name: {{ $kafkaExporterUserSecret }}
key: KAFKA_EXPORTER_USERNAME
- name: KAFKA_EXPORTER_PASSWORD
valueFrom:
secretKeyRef:
name: {{ $kafkaExporterUserSecret }}
key: KAFKA_EXPORTER_PASSWORD
ports:
- name: exporter
containerPort: {{ tuple "kafka_exporter" "internal" "exporter" . | include "helm-toolkit.endpoints.endpoint_port_lookup" }}

@ -0,0 +1,29 @@
{{/*
Copyright 2019 The Openstack-Helm Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/}}
{{- if .Values.manifests.secret_kafka }}
{{- $envAll := . }}
{{- $secretName := .Values.secrets.kafka_exporter.user }}
---
apiVersion: v1
kind: Secret
metadata:
name: {{ $secretName }}
type: Opaque
data:
KAFKA_EXPORTER_USERNAME: {{ .Values.endpoints.kafka_exporter.auth.username | b64enc }}
KAFKA_EXPORTER_PASSWORD: {{ .Values.endpoints.kafka_exporter.auth.password | b64enc }}
{{- end }}

@ -45,8 +45,12 @@ spec:
command:
- "/tmp/helm-test.sh"
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: "{{ tuple "zookeeper" "internal" "client" $envAll | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" }}"
- name: KAFKA_BROKERS
value: "{{ tuple "kafka" "internal" "broker" $envAll | include "helm-toolkit.endpoints.host_and_port_endpoint_uri_lookup" }}"
- name: KAFKA_OPTS
value: {{ include "helm-toolkit.utils.joinListWithSpace" .Values.conf.kafka.jvm_options | quote }}
volumeMounts:
- name: pod-tmp
mountPath: /tmp
@ -54,6 +58,10 @@ spec:
mountPath: /tmp/helm-test.sh
subPath: helm-test.sh
readOnly: true
- name: kafka-etc
mountPath: /opt/kafka/config/jaas.conf
subPath: jaas.conf
readOnly: true
volumes:
- name: pod-tmp
emptyDir: {}
@ -61,4 +69,8 @@ spec:
configMap:
name: kafka-bin
defaultMode: 0555
- name: kafka-etc
secret:
secretName: kafka-etc
defaultMode: 0444
{{- end }}

@ -111,6 +111,11 @@ spec:
- name: broker
containerPort: {{ $kafkaBrokerPort }}
env:
- name: ADMIN_USERNAME
valueFrom:
secretKeyRef:
name: {{ $kafkaUserSecret }}
key: KAFKA_ADMIN_USERNAME
- name: KAFKA_PORT
value: "{{ $kafkaBrokerPort }}"
- name: ZOOKEEPER_PORT
@ -121,6 +126,8 @@ spec:
value: "PLAINTEXT://:{{$kafkaBrokerPort}}"
- name: KAFKA_CREATE_TOPICS
value: "{{ include "helm-toolkit.utils.joinListWithComma" .Values.conf.kafka.topics }}"
- name: KAFKA_OPTS
value: {{ include "helm-toolkit.utils.joinListWithSpace" .Values.conf.kafka.jvm_options | quote }}
readinessProbe:
initialDelaySeconds: 20
periodSeconds: 30
@ -152,6 +159,10 @@ spec:
mountPath: /tmp/kafka-readiness.sh
subPath: kafka-readiness.sh
readOnly: true
- name: kafka-etc
mountPath: /opt/kafka/config/jaas.conf
subPath: jaas.conf
readOnly: true
- name: data
mountPath: {{ .Values.conf.kafka.config.data_directory }}
{{ if $mounts_kafka.volumeMounts }}{{ toYaml $mounts_kafka.volumeMounts | indent 12 }}{{ end }}
@ -160,6 +171,10 @@ spec:
configMap:
name: kafka-bin
defaultMode: 0555
- name: kafka-etc
secret:
secretName: kafka-etc
defaultMode: 0444
{{ if $mounts_kafka.volumes }}{{ toYaml $mounts_kafka.volumes | indent 8 }}{{ end }}
{{- if not .Values.storage.enabled }}
- name: data

@ -24,6 +24,7 @@ images:
dep_check: quay.io/stackanetes/kubernetes-entrypoint:v0.3.1
image_repo_sync: docker.io/docker:17.07.0
helm_test: docker.io/wurstmeister/kafka:2.12-2.3.0
generate_acl: docker.io/wurstmeister/kafka:2.12-2.3.0
pull_policy: IfNotPresent
local_registry:
active: false
@ -53,6 +54,10 @@ pod:
pod: {}
container:
kafka_exporter: {}
generate_acl:
pod: {}
container:
generate_acl: {}
affinity:
anti:
type:
@ -101,6 +106,13 @@ pod:
limits:
memory: "1024Mi"
cpu: "2000m"
generate_acl:
requests:
memory: "128Mi"
cpu: "100m"
limits:
memory: "1024Mi"
cpu: "2000m"
test:
requests:
memory: "128Mi"
@ -155,6 +167,9 @@ endpoints:
jmx-exporter:
default: 9404
kafka_exporter:
auth:
username: kafka-exporter
password: changeme
namespace: null
hosts:
default: kafka-exporter
@ -204,11 +219,15 @@ dependencies:
kafka:
services:
- endpoint: internal
service: zookeeper-int
service: zookeeper
kafka_exporter:
services:
- endpoint: internal
service: kafka-broker
service: kafka
generate_acl:
services:
- endpoint: internal
service: kafka
monitoring:
prometheus:
@ -271,10 +290,12 @@ manifests:
helm_test: true
ingress: true
job_image_repo_sync: true
job_generate_acl: true
monitoring:
prometheus:
configmap_bin: true
deployment: true
secret_exporter: true
service: true
network_policy: false
network_policy: false
@ -286,15 +307,73 @@ manifests:
service: true
statefulset: true
jobs:
generate_acl:
backoffLimit: 6
activeDeadlineSeconds: 600
conf:
kafka:
config:
data_directory: /var/lib/kafka/data
server_settings: {}
# Optionally provide configuration overrides for
# Kafka's server.properties file ie:
# message_max_bytes: 5000000
topics: []
server_settings:
# Optionally provide configuration overrides for Kafka's
# server.properties file. Replace '.' with '_' ie:
# for message.max.bytes enter message_max_bytes
message_max_bytes: 5000000
authorizer_class_name: kafka.security.auth.SimpleAclAuthorizer
listeners: SASL_PLAINTEXT://:9092
security_protocol: SASL_PLAINTEXT
security_inter_broker_protocol: SASL_PLAINTEXT
sasl_mechanism: PLAIN
sasl_enabled_mechanisms: PLAIN
sasl_mechanism_inter_broker_protocol: PLAIN
topics:
# List of topic strings formatted like:
# topic_name:number_of_partitions:replication_factor
# - "mytopic:1:1"
jaas: # Define Authentication Details in this section
producers:
# region_a: # Just an ID used to iterate through the dict of producers
# username: region-a-producer
# password: changeme
# topic: region-a # Used in generate-acl.sh to provide access
consumers:
# region_a: # Just an ID used to iterate through the dict of consumers
# username: region-a-consumer
# password: changeme
# topic: region-a # Used in generate-acl.sh to provide access
# group: region-a # Used in generate-acl.sh to provide access
template: |
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
{{- $admin := .Values.endpoints.kafka.auth.admin }}
username={{ $admin.username | quote}}
password={{ $admin.password | quote}}
user_{{ $admin.username }}={{ $admin.password | quote }}
{{- if .Values.monitoring.prometheus.enabled }}
{{- $exporter := .Values.endpoints.kafka_exporter.auth }}
user_{{ $exporter.username }}={{ $exporter.password | quote }}
{{- end }}
{{- range $producer, $credentials := .Values.conf.kafka.jaas.producers }}
user_{{ $credentials.username }}={{ $credentials.password | quote }}
{{- end }}
{{- range $consumer, $credentials := .Values.conf.kafka.jaas.producers }}
user_{{ $credentials.username }}={{ $credentials.password | quote }}
{{- end }}
{{- printf ";" }}
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username={{ $admin.username | quote}}
password={{ $admin.password | quote}}
{{- printf ";" }}
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username={{ $admin.username | quote}}
password={{ $admin.password | quote}}
{{- printf ";" }}
};
jvm_options:
- -Djava.security.auth.login.config=/opt/kafka/config/jaas.conf

@ -25,4 +25,5 @@ metadata:
type: Opaque
data:
{{- include "helm-toolkit.snippets.values_template_renderer" (dict "envAll" $envAll "template" .Values.conf.zookeeper.template "key" "zoo.cfg" "format" "Secret") | indent 2 }}
{{- include "helm-toolkit.snippets.values_template_renderer" (dict "envAll" $envAll "template" .Values.conf.zookeeper.jaas.template "key" "jaas.conf" "format" "Secret") | indent 2 }}
{{- end }}

@ -19,6 +19,7 @@ limitations under the License.
{{- $mounts_zookeeper := .Values.pod.mounts.zookeeper.zookeeper }}
{{- $mounts_zookeeper_init := .Values.pod.mounts.zookeeper.init_container }}
{{- $zookeeperUserSecret := .Values.secrets.zookeeper.admin }}
{{- $serviceAccountName := printf "%s-%s" .Release.Name "zookeeper" }}
@ -153,6 +154,8 @@ spec:
value: "{{ .Values.conf.zookeeper.config.data_directory }}"
- name: ZOO_CLIENT_PORT
value: "{{ tuple "zookeeper" "internal" "client" . | include "helm-toolkit.endpoints.endpoint_port_lookup" }}"
- name: SERVER_JVMFLAGS
value: {{ include "helm-toolkit.utils.joinListWithSpace" .Values.conf.zookeeper.jvm_options | quote }}
readinessProbe:
initialDelaySeconds: 20
periodSeconds: 30
@ -179,6 +182,9 @@ spec:
- name: zookeeper-etc
mountPath: /conf/zoo.cfg
subPath: zoo.cfg
- name: zookeeper-etc
mountPath: /conf/jaas.conf
subPath: jaas.conf
- name: zookeeper-bin
mountPath: /tmp/zookeeper.sh
subPath: zookeeper.sh

@ -139,6 +139,11 @@ endpoints:
default: 9404
zookeeper_exporter:
default: 9141
kafka:
auth:
admin:
username: admin
password: changeme
dependencies:
dynamic:
@ -245,3 +250,22 @@ conf:
{{- $ensembleCount := add $podInt 1 }}
server.{{$ensembleCount}}=zookeeper-{{$podInt}}.{{$domain}}:{{$serverPort}}:{{$electionPort}}:participant;{{$clientPort}}
{{- end }}
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
jaas:
template: |
{{- $admin := .Values.endpoints.kafka.auth.admin }}
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_{{ $admin.username }}={{ $admin.password | quote }}
{{- printf ";" }}
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username={{ $admin.username | quote }}
password={{ $admin.password | quote }}
{{- printf ";" }}
};
jvm_options:
- -Djava.security.auth.login.config=/conf/jaas.conf