I am using kafka 3.5.0 in KRaft mode and deploying to a Kubernetes cluster using the bitnami chart. When the brokers were launching, they failed to talk to each other. This is the error message I saw in the log:
[2023-07-17 06:56:47,311] WARN [RaftManager id=2] Error connecting to node kafka-0.kafka-headless.kafka.svc.cluster.local:9093 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka-0.kafka-headless.kafka.svc.cluster.local
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:801)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1533)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1385)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:1030)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:107)
at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:103)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:103)
at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:77)
at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:98)
at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
The statefulset looks like the following:
spec:
podManagementPolicy: Parallel
replicas: 3
revisionHistoryLimit: 10
selector:
matchLabels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
serviceName: kafka-headless
template:
metadata:
annotations:
checksum/tls-secret: f73481f4d88da94584ed3268c06cf55b18bcf8f8fc61a4bc0b76eeec8b991bb6
creationTimestamp: null
labels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/managed-by: Helm
app.kubernetes.io/name: kafka
helm.sh/chart: kafka-23.0.1
spec:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
app.kubernetes.io/component: kafka
app.kubernetes.io/instance: kafka
app.kubernetes.io/name: kafka
topologyKey: kubernetes.io/hostname
weight: 1
containers:
- command:
- /scripts/setup.sh
env:
- name: BITNAMI_DEBUG
value: "true"
- name: MY_POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INTERNAL
- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
value: INTERNAL:SSL,CLIENT:SSL,CONTROLLER:SSL
- name: KAFKA_CFG_LISTENERS
value: INTERNAL://:9094,CLIENT://:9092,CONTROLLER://:9093
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: INTERNAL://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9094,CLIENT://$(MY_POD_NAME).kafka-headless.kafka.svc.cluster.local:9092
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_ZOOKEEPER_PROTOCOL
value: PLAINTEXT
- name: KAFKA_TLS_TYPE
value: PEM
- name: KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: KAFKA_TLS_CLIENT_AUTH
value: none
- name: KAFKA_CERTIFICATE_PASSWORD
- name: KAFKA_VOLUME_DIR
value: /bitnami/kafka
- name: KAFKA_LOG_DIR
value: /opt/bitnami/kafka/logs
- name: KAFKA_CFG_DELETE_TOPIC_ENABLE
value: "true"
- name: KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_HEAP_OPTS
value: -Xmx1024m -Xms1024m
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES
value: "10000"
- name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MS
value: "1000"
- name: KAFKA_CFG_LOG_RETENTION_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS
value: "300000"
- name: KAFKA_CFG_LOG_RETENTION_HOURS
value: "168"
- name: KAFKA_CFG_MESSAGE_MAX_BYTES
value: "1000012"
- name: KAFKA_CFG_LOG_SEGMENT_BYTES
value: "1073741824"
- name: KAFKA_CFG_LOG_DIRS
value: /bitnami/kafka/data
- name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_CFG_NUM_IO_THREADS
value: "8"
- name: KAFKA_CFG_NUM_NETWORK_THREADS
value: "3"
- name: KAFKA_CFG_NUM_PARTITIONS
value: "1"
- name: KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR
value: "1"
- name: KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES
value: "104857600"
- name: KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES
value: "102400"
- name: KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
value: "6000"
- name: KAFKA_CFG_AUTHORIZER_CLASS_NAME
- name: KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND
value: "true"
- name: KAFKA_CFG_SUPER_USERS
value: User:admin
- name: KAFKA_ENABLE_KRAFT
value: "true"
- name: KAFKA_KRAFT_CLUSTER_ID
value: kafka_cluster_id_test1
- name: KAFKA_CFG_PROCESS_ROLES
value: broker,controller
- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
image: docker.io/bitnami/kafka:3.5.0-debian-11-r1
imagePullPolicy: IfNotPresent
livenessProbe:
failureThreshold: 3
initialDelaySeconds: 10
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka-client
timeoutSeconds: 5
name: kafka
ports:
- containerPort: 9092
name: kafka-client
protocol: TCP
- containerPort: 9094
name: kafka-internal
protocol: TCP
- containerPort: 9093
name: kafka-ctlr
protocol: TCP
readinessProbe:
failureThreshold: 6
initialDelaySeconds: 60
periodSeconds: 10
successThreshold: 1
tcpSocket:
port: kafka-client
timeoutSeconds: 5
resources: {}
securityContext:
allowPrivilegeEscalation: false
runAsNonRoot: false
runAsUser: 0
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /bitnami/kafka
name: data
- mountPath: /opt/bitnami/kafka/logs
name: logs
- mountPath: /scripts/setup.sh
name: scripts
subPath: setup.sh
- mountPath: /certs-0
name: kafka-certs-0
readOnly: true
- mountPath: /certs-1
name: kafka-certs-1
readOnly: true
- mountPath: /certs-2
name: kafka-certs-2
readOnly: true
dnsPolicy: ClusterFirst
nodeSelector:
storage-type: ceph
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
fsGroup: 1001
serviceAccount: kafka
serviceAccountName: kafka
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoSchedule
key: node-role.kubernetes.io/control-plane
operator: Exists
- effect: NoSchedule
key: node-role.kubernetes.io/master
operator: Exists
volumes:
- configMap:
defaultMode: 493
name: kafka-scripts
name: scripts
- name: kafka-certs-0
secret:
defaultMode: 256
secretName: kafka-0-tls
- name: kafka-certs-1
secret:
defaultMode: 256
secretName: kafka-1-tls
- name: kafka-certs-2
secret:
defaultMode: 256
secretName: kafka-2-tls
- name: data
persistentVolumeClaim:
claimName: kafka-pvc
- emptyDir: {}
name: logs
updateStrategy:
rollingUpdate:
partition: 0
type: RollingUpdate
Can anyone please share some insight to what I have missed?
Many thanks,
Mike