Handling of kafka stream if kafka broker goes down


I am facing one issue if any of my kafka broker from kafka cluster is getting down, KSQL stream is not able to push data to the topic. How can we use ksql stream if any broker is not available.

Seems there is a configuration problem. If you configure your brokers and topics correctly, a single broker being offline should not prevent ksqlDB to make progress.

What is the replication factor of your topics? If the replication factor is only 1, it would explain the issue and you should increase it to three, to allow a different broker to take over if one broker goes offline.

I have a replication factor of 3 and have 10 partitions on each topic.

Command - ./bin/kafka-topics.sh --describe --topic ******* --bootstrap-server :9092,:9092,*******:9092

Output - Topic: ********* PartitionCount: 10 ReplicationFactor: 3 Configs: compression.type=lz4,min.insync.replicas=1,cleanup.policy=delete,segment.bytes=1073741824,retention.ms=900000,unclean.leader.election.enable=true

Also, I have another issue, how can I handle NullPointerException exception.

Error Details :

“type”: 1,
“deserializationError”: null,
“recordProcessingError”: {
“errorMessage”: “Error computing expression M_BEFORE->IS_AVAILABLE_JCB for column M_BEFORE_IS_AVAILABLE_JCB with index 55”,
“record”: null,
“cause”: [
“productionError”: null,
“serializationError”: null,
“kafkaStreamsThreadError”: null

In insert state, before value will be null, so while creating a stream it’s giving NullPointerException. How can i handle it.

Can you inspect if the leader for the topic partition changes broker side? Also, does kslqDB refresh its metadata about partition leaders (you would need to inspect the logs).

Error computing expression M_BEFORE->IS_AVAILABLE_JCB for column M_BEFORE_IS_AVAILABLE_JCB with index 55

It seems M_BEFORE is null? It’s a known issue. You would need use CASE to first check for NULL and return NULL for this case, and evaluate M_BEFORE->IS_AVAILABLE_JCB only if not NULL.

Hi @mjsax I am getting the below logs from KSQL logs if one of my Kafka broker goes down for the cluster. I have a 3 node cluster setup.

“type”: 4,
“deserializationError”: null,
“recordProcessingError”: null,
“productionError”: null,
“serializationError”: null,
“kafkaStreamsThreadError”: {
“errorMessage”: “Unhandled exception caught in streams thread”,
“threadName”: “confluent-ksql-************query_CSAS_247-0c0fec9f-aa6b-4de7-83cf-c8f45dcaf795-StreamThread-1",
“cause”: [
"Error encountered sending record to topic confluent-ksql-**************query_CSAS********************_247-Join-left-repartition for task 0_6 due to:\norg.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for confluent-ksql-**********query_CSAS
_247-Join-left-repartition-4:120001 ms has passed since batch creation\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. \nConsider overwriting max.block.ms and /or delivery.timeout.ms to a larger value to wait longer for such scenarios and avoid timeout errors\nException handler choose to FAIL the processing, no more records would be sent.”,
“Expiring 1 record(s) for confluent-ksql-**********query_CSAS************_247-Join-left-repartition-4:120001 ms has passed since batch creation”

WARN [Consumer clientId=confluent-ksql-***********query_CSAS**************_247-0c0fec9f-aa6b-4de7-83cf-c8f45dcaf795-StreamThread-3-consumer, groupId=confluent-ksql-***************query_CSAS*********_247] Connection to node 1 (/...:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient:796)

If you need any more details please let me know.

Thanks @mjsax CASE solved my nullPointerException.

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.