We use Kafka.server.KafkaServer class to programatically start 3 Kafka Brokers to run our integration tests where the micro services send kafka messages and received by the client micro services.
Uptill now we were using the Kafka version 2.8.0 version (org.apache.kafka:kafka_2.12:2.8.0) and the brokers were running just fine.
While starting the Kafka brokers we were using the following properties -
- broker.id: 1
listeners: SSL://kafka01.*dev.localhost*:9092
port: 9092
log.dir: /srv/tmp/kafka-broker-service/kafka-internal
advertised.port: 9092
advertised.listeners: SSL://kafka01.*dev.localhost*:9092
After we upgrade to 3.4.0 i.e (org.apache.kafka:kafka_2.13:3.4.0) the kafka brokers starts but then got disconnected with the error immediately after 2 minutes (zookeeper starts fine), some corresponding
Logs -
*2023-02-24 16:48:32.636* +*0000* WARN [Controller-*1*-to-broker-*1*-send-thread] [kafka-brokers] [] [] [] [] RequestSendThread - [RequestSendThread controllerId=*1*] Controller *1* epoch *1* fails to send request (type: UpdateMetadataRequest=, controllerId=*1*, controllerEpoch=*1*, brokerEpoch=*25*, partitionStates=[UpdateMetadataPartitionState(topicName=‘xxx’, partitionIndex=*0*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.1’, partitionIndex=*0*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.2’, partitionIndex=*2*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.3’, partitionIndex=*2*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.1’, partitionIndex=*1*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName='', partitionIndex=*1*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[])], liveBrokers=UpdateMetadataBroker(id=*1*, v0Host='', v0Port=*0*, endpoints=[UpdateMetadataEndpoint(port=*9092*, host='kafka01.dev.localhost', listener='SSL', securityProtocol=*1*)], rack=*null*)) to broker *kafka01.dev.localhost*:*9092* (id: *1* rack: *null*). Reconnecting to broker.
java.io.IOException: Connection to *1* was disconnected before the response was read
at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]
This causes the Tests to fail as when the service tries to send kafka message it get the below error -
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test.1 not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1442) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1084) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:847) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
I tried comparing Kafka config values for both these version and found that there were some differences like below properties were missing in version V3.4.0
advertised.host.name = kafka01.dev.localhost
advertised.port = 9092
port = 9092
zookeeper.sync.time.ms = 2000
I tried to add programatically by adding into the Java Properties object but they were still ignored !! Don’t know why
I tried comparing the Kafka broker logs from 2.8.0 vs 3.4.0 and I noticed that in V2.8.0 we had below lines which are missing when I start version 3.4.0
2023-02-20 14:05:10.306 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Starting socket server acceptors and processors
2023-02-20 14:05:10.307 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Started data-plane acceptor and processor(s) for endpoint : ListenerName(SSL)
2023-02-20 14:05:10.307 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Started socket server acceptors and processors
Our set up is that we use Kafka client and server libraries on one big project , so what this means is we can not just upgrade kafka client libraries from version 2.8.0 to version 3.4.0 ,
We should be able to start the kafka server in version 3.4.0 as well
So my Question are
- I tried to find any documentation (with regards to upgrade) to illustrate how to use Kafka.server.KafkaServer , there is no java doc for this. There are some Scala docs Scaladoc for kafka.server but there is not much info. Do anyone knows any materials ?
- Do we know if any configuration properties have been changed after 3.0.0 version ?