kafka.server.KafkaServer (Kafka Broker) not starting (programatically) properly after upgrade to 3.4.0

We use Kafka.server.KafkaServer class to programatically start 3 Kafka Brokers to run our integration tests where the micro services send kafka messages and received by the client micro services.

Uptill now we were using the Kafka version 2.8.0 version (org.apache.kafka:kafka_2.12:2.8.0) and the brokers were running just fine.

While starting the Kafka brokers we were using the following properties -

- broker.id: 1

listeners: SSL://kafka01.*dev.localhost*:9092

port: 9092

log.dir: /srv/tmp/kafka-broker-service/kafka-internal

advertised.port: 9092

advertised.listeners: SSL://kafka01.*dev.localhost*:9092

After we upgrade to 3.4.0 i.e (org.apache.kafka:kafka_2.13:3.4.0) the kafka brokers starts but then got disconnected with the error immediately after 2 minutes (zookeeper starts fine), some corresponding

Logs -

*2023-02-24 16:48:32.636* +*0000* WARN [Controller-*1*-to-broker-*1*-send-thread] [kafka-brokers] [] [] [] [] RequestSendThread - [RequestSendThread controllerId=*1*] Controller *1* epoch *1* fails to send request (type: UpdateMetadataRequest=, controllerId=*1*, controllerEpoch=*1*, brokerEpoch=*25*, partitionStates=[UpdateMetadataPartitionState(topicName=‘xxx’, partitionIndex=*0*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.1’, partitionIndex=*0*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.2’, partitionIndex=*2*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.3’, partitionIndex=*2*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName=‘test.1’, partitionIndex=*1*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[]), UpdateMetadataPartitionState(topicName='', partitionIndex=*1*, controllerEpoch=*1*, leader=*1*, leaderEpoch=*0*, isr=[*1*], zkVersion=*0*, replicas=[*1*], offlineReplicas=[])], liveBrokers=UpdateMetadataBroker(id=*1*, v0Host='', v0Port=*0*, endpoints=[UpdateMetadataEndpoint(port=*9092*, host='kafka01.dev.localhost', listener='SSL', securityProtocol=*1*)], rack=*null*)) to broker *kafka01.dev.localhost*:*9092* (id: *1* rack: *null*). Reconnecting to broker.

java.io.IOException: Connection to *1* was disconnected before the response was read

at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) ~[kafka-clients-kafka-clients-3.4.0.jar:na]

at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]

at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]

This causes the Tests to fail as when the service tries to send kafka message it get the below error -

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic test.1 not present in metadata after 60000 ms.

at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1442) ~[kafka-clients-kafka-clients-3.4.0.jar:na]

at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1084) ~[kafka-clients-kafka-clients-3.4.0.jar:na]

at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) ~[kafka-clients-kafka-clients-3.4.0.jar:na]

at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:847) ~[kafka-clients-kafka-clients-3.4.0.jar:na]

I tried comparing Kafka config values for both these version and found that there were some differences like below properties were missing in version V3.4.0

advertised.host.name = kafka01.dev.localhost

advertised.port = 9092

port = 9092

zookeeper.sync.time.ms = 2000

I tried to add programatically by adding into the Java Properties object but they were still ignored !! Don’t know why

I tried comparing the Kafka broker logs from 2.8.0 vs 3.4.0 and I noticed that in V2.8.0 we had below lines which are missing when I start version 3.4.0

2023-02-20 14:05:10.306 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Starting socket server acceptors and processors

2023-02-20 14:05:10.307 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Started data-plane acceptor and processor(s) for endpoint : ListenerName(SSL)

2023-02-20 14:05:10.307 +0000 INFO [main] [kafka-brokers] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=3] Started socket server acceptors and processors

Our set up is that we use Kafka client and server libraries on one big project , so what this means is we can not just upgrade kafka client libraries from version 2.8.0 to version 3.4.0 ,

We should be able to start the kafka server in version 3.4.0 as well

So my Question are

  1. I tried to find any documentation (with regards to upgrade) to illustrate how to use Kafka.server.KafkaServer , there is no java doc for this. There are some Scala docs Scaladoc for kafka.server but there is not much info. Do anyone knows any materials ?
  2. Do we know if any configuration properties have been changed after 3.0.0 version ?

Hi @kashyap2011.mk and welcome!

  1. I believe that the scaladoc is all that gets published.
  2. A lot has changed – you can compare 2.8 broker configs to 3.4’s. The things that jump out at me to try are (a) remove advertised.port since it was deprecated in 2.8 and removed in 3.4, and (b) configure zookeeper.connect which defaults to null. Did it work in 2.8 without setting zookeeper.connect?

HTH,
Dave

Hi dtroiano,
First of all thanks very much for replying.

When you say that advertised.port was deprecated , that could be the reason when I was trying to insert them in the KafkaConfig but this was not been inserted in the configuration . In fact I tried to insert the following properties but never made it through and to me this was shown as different

advertised.host.name = kafka01.dev.localhost
advertised.port = 9092
port = 9092
zookeeper.sync.time.ms = 2000

Thanks for clarifying atleast one mystery that above properties were deprecated , not sure of port and zookeeper.sync.time.ms , but will go through the configuration which you suggests.

Secondly with regards to the setting additional properties like “zookeeper.connect” this was done within some convoluted program but this was set. To be exactly clear let me paste the full Kafka config values for 2.8 and for 3.4
For 2.8 which was working perfectly->

~		advertised.host.name = kafka01.dev.localhost
~		advertised.listeners = SSL://kafka01.dev.localhost:9092
~		advertised.port = 9092
~		alter.config.policy.class.name = null
~		alter.log.dirs.replication.quota.window.num = 11
~		alter.log.dirs.replication.quota.window.size.seconds = 1
~		authorizer.class.name =
~		auto.create.topics.enable = true
~		auto.leader.rebalance.enable = true
~		background.threads = 10
~		broker.heartbeat.interval.ms = 2000
~		broker.id = 1
~		broker.id.generation.enable = true
~		broker.rack = null
~		broker.session.timeout.ms = 9000
~		client.quota.callback.class = null
~		compression.type = producer
~		connection.failed.authentication.delay.ms = 100
~		connections.max.idle.ms = 600000
~		connections.max.reauth.ms = 0
~		control.plane.listener.name = null
~		controlled.shutdown.enable = true
~		controlled.shutdown.max.retries = 3
~		controlled.shutdown.retry.backoff.ms = 5000
~		controller.listener.names = null
~		controller.quorum.append.linger.ms = 25
~		controller.quorum.election.backoff.max.ms = 1000
~		controller.quorum.election.timeout.ms = 1000
~		controller.quorum.fetch.timeout.ms = 2000
~		controller.quorum.request.timeout.ms = 2000
~		controller.quorum.retry.backoff.ms = 20
~		controller.quorum.voters = []
~		controller.quota.window.num = 11
~		controller.quota.window.size.seconds = 1
~		controller.socket.timeout.ms = 30000
~		create.topic.policy.class.name = null
~		default.replication.factor = 1
~		delegation.token.expiry.check.interval.ms = 3600000
~		delegation.token.expiry.time.ms = 86400000
~		delegation.token.master.key = null
~		delegation.token.max.lifetime.ms = 604800000
~		delegation.token.secret.key = null
~		delete.records.purgatory.purge.interval.requests = 1
~		delete.topic.enable = true
~		fetch.max.bytes = 57671680
~		fetch.purgatory.purge.interval.requests = 1000
~		group.initial.rebalance.delay.ms = 3000
~		group.max.session.timeout.ms = 1800000
~		group.max.size = 2147483647
~		group.min.session.timeout.ms = 6000
~		host.name =
~		initial.broker.registration.timeout.ms = 60000
~		inter.broker.listener.name = null
~		inter.broker.protocol.version = 2.8-IV1
~		kafka.metrics.polling.interval.secs = 10
~		kafka.metrics.reporters = []
~		leader.imbalance.check.interval.seconds = 300
~		leader.imbalance.per.broker.percentage = 10
~		listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
~		listeners = SSL://kafka01.dev.localhost:9092
~		log.cleaner.backoff.ms = 15000
~		log.cleaner.dedupe.buffer.size = 134217728
~		log.cleaner.delete.retention.ms = 86400000
~		log.cleaner.enable = true
~		log.cleaner.io.buffer.load.factor = 0.9
~		log.cleaner.io.buffer.size = 524288
~		log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
~		log.cleaner.max.compaction.lag.ms = 9223372036854775807
~		log.cleaner.min.cleanable.ratio = 0.5
~		log.cleaner.min.compaction.lag.ms = 0
~		log.cleaner.threads = 1
~		log.cleanup.policy = [delete]
~		log.dir = /srv/tmp/kafka-broker-service/kafka-internal
~		log.dirs = null
~		log.flush.interval.messages = 9223372036854775807
~		log.flush.interval.ms = null
~		log.flush.offset.checkpoint.interval.ms = 60000
~		log.flush.scheduler.interval.ms = 9223372036854775807
~		log.flush.start.offset.checkpoint.interval.ms = 60000
~		log.index.interval.bytes = 4096
~		log.index.size.max.bytes = 10485760
~		log.message.downconversion.enable = true
~		log.message.format.version = 2.8-IV1
~		log.message.timestamp.difference.max.ms = 9223372036854775807
~		log.message.timestamp.type = CreateTime
~		log.preallocate = false
~		log.retention.bytes = -1
~		log.retention.check.interval.ms = 300000
~		log.retention.hours = 168
~		log.retention.minutes = null
~		log.retention.ms = null
~		log.roll.hours = 168
~		log.roll.jitter.hours = 0
~		log.roll.jitter.ms = null
~		log.roll.ms = null
~		log.segment.bytes = 1073741824
~		log.segment.delete.delay.ms = 60000
~		max.connection.creation.rate = 2147483647
~		max.connections = 2147483647
~		max.connections.per.ip = 2147483647
~		max.connections.per.ip.overrides =
~		max.incremental.fetch.session.cache.slots = 1000
~		message.max.bytes = 41943040
~		metadata.log.dir = null
~		metric.reporters = []
~		metrics.num.samples = 2
~		metrics.recording.level = INFO
~		metrics.sample.window.ms = 30000
~		min.insync.replicas = 1
~		node.id = -1
~		num.io.threads = 8
~		num.network.threads = 3
~		num.partitions = 3
~		num.recovery.threads.per.data.dir = 1
~		num.replica.alter.log.dirs.threads = null
~		num.replica.fetchers = 1
~		offset.metadata.max.bytes = 4096
~		offsets.commit.required.acks = -1
~		offsets.commit.timeout.ms = 5000
~		offsets.load.buffer.size = 5242880
~		offsets.retention.check.interval.ms = 600000
~		offsets.retention.minutes = 10080
~		offsets.topic.compression.codec = 0
~		offsets.topic.num.partitions = 50
~		offsets.topic.replication.factor = 1
~		offsets.topic.segment.bytes = 104857600
~		password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
~		password.encoder.iterations = 4096
~		password.encoder.key.length = 128
~		password.encoder.keyfactory.algorithm = null
~		password.encoder.old.secret = null
~		password.encoder.secret = null
~		port = 9092
~		principal.builder.class = null
~		process.roles = []
~		producer.purgatory.purge.interval.requests = 1000
~		queued.max.request.bytes = -1
~		queued.max.requests = 500
~		quota.consumer.default = 9223372036854775807
~		quota.producer.default = 9223372036854775807
~		quota.window.num = 11
~		quota.window.size.seconds = 1
~		replica.fetch.backoff.ms = 1000
~		replica.fetch.max.bytes = 42943040
~		replica.fetch.min.bytes = 1
~		replica.fetch.response.max.bytes = 10485760
~		replica.fetch.wait.max.ms = 500
~		replica.high.watermark.checkpoint.interval.ms = 5000
~		replica.lag.time.max.ms = 30000
~		replica.selector.class = null
~		replica.socket.receive.buffer.bytes = 65536
~		replica.socket.timeout.ms = 30000
~		replication.quota.window.num = 11
~		replication.quota.window.size.seconds = 1
~		request.timeout.ms = 30000
~		reserved.broker.max.id = 1000
~		sasl.client.callback.handler.class = null
~		sasl.enabled.mechanisms = [GSSAPI]
~		sasl.jaas.config = null
~		sasl.kerberos.kinit.cmd = /usr/bin/kinit
~		sasl.kerberos.min.time.before.relogin = 60000
~		sasl.kerberos.principal.to.local.rules = [DEFAULT]
~		sasl.kerberos.service.name = null
~		sasl.kerberos.ticket.renew.jitter = 0.05
~		sasl.kerberos.ticket.renew.window.factor = 0.8
~		sasl.login.callback.handler.class = null
~		sasl.login.class = null
~		sasl.login.refresh.buffer.seconds = 300
~		sasl.login.refresh.min.period.seconds = 60
~		sasl.login.refresh.window.factor = 0.8
~		sasl.login.refresh.window.jitter = 0.05
~		sasl.mechanism.controller.protocol = GSSAPI
~		sasl.mechanism.inter.broker.protocol = GSSAPI
~		sasl.server.callback.handler.class = null
~		sasl.server.max.receive.size = 524288
~		security.inter.broker.protocol = SSL
~		security.providers = null
~		socket.connection.setup.timeout.max.ms = 30000
~		socket.connection.setup.timeout.ms = 10000
~		socket.receive.buffer.bytes = 102400
~		socket.request.max.bytes = 104857600
~		socket.send.buffer.bytes = 102400
~		ssl.cipher.suites = [TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
~		ssl.client.auth = required
~		ssl.enabled.protocols = [TLSv1.2]
~		ssl.endpoint.identification.algorithm = HTTPS
~		ssl.engine.factory.class = null
~		ssl.key.password = [hidden]
~		ssl.keymanager.algorithm = SunX509
~		ssl.keystore.certificate.chain = null
~		ssl.keystore.key = null
~		ssl.keystore.location = /srv/etc/appserver.jks
~		ssl.keystore.password = [hidden]
~		ssl.keystore.type = JKS
~		ssl.principal.mapping.rules = DEFAULT
~		ssl.protocol = TLSv1.3
~		ssl.provider = null
~		ssl.secure.random.implementation = null
~		ssl.trustmanager.algorithm = PKIX
~		ssl.truststore.certificates = null
~		ssl.truststore.location = /srv/etc/trust_app.jks
~		ssl.truststore.password = [hidden]
~		ssl.truststore.type = JKS
~		transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
~		transaction.max.timeout.ms = 900000
~		transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
~		transaction.state.log.load.buffer.size = 5242880
~		transaction.state.log.min.isr = 2
~		transaction.state.log.num.partitions = 50
~		transaction.state.log.replication.factor = 3
~		transaction.state.log.segment.bytes = 104857600
~		transactional.id.expiration.ms = 604800000
~		unclean.leader.election.enable = true
~		zookeeper.clientCnxnSocket = null
~		zookeeper.connect = localhost:2181
~		zookeeper.connection.timeout.ms = 300000
~		zookeeper.max.in.flight.requests = 10
~		zookeeper.session.timeout.ms = 18000
~		zookeeper.set.acl = false
~		zookeeper.ssl.cipher.suites = null
~		zookeeper.ssl.client.enable = false
~		zookeeper.ssl.crl.enable = false
~		zookeeper.ssl.enabled.protocols = null
~		zookeeper.ssl.endpoint.identification.algorithm = HTTPS
~		zookeeper.ssl.keystore.location = null
~		zookeeper.ssl.keystore.password = null
~		zookeeper.ssl.keystore.type = null
~		zookeeper.ssl.ocsp.enable = false
~		zookeeper.ssl.protocol = TLSv1.2
~		zookeeper.ssl.truststore.location = null
~		zookeeper.ssl.truststore.password = null
~		zookeeper.ssl.truststore.type = null
~		zookeeper.sync.time.ms = 2000

For 3.4 which does not works

~		advertised.listeners = SSL://kafka01.dev.localhost:9092
~		alter.config.policy.class.name = null
~		alter.log.dirs.replication.quota.window.num = 11
~		alter.log.dirs.replication.quota.window.size.seconds = 1
~		authorizer.class.name =
~		auto.create.topics.enable = true
~		auto.include.jmx.reporter = true
~		auto.leader.rebalance.enable = true
~		background.threads = 10
~		broker.heartbeat.interval.ms = 2000
~		broker.id = 1
~		broker.id.generation.enable = true
~		broker.rack = null
~		broker.session.timeout.ms = 9000
~		client.quota.callback.class = null
~		compression.type = producer
~		connection.failed.authentication.delay.ms = 100
~		connections.max.idle.ms = 600000
~		connections.max.reauth.ms = 0
~		control.plane.listener.name = null
~		controlled.shutdown.enable = true
~		controlled.shutdown.max.retries = 3
~		controlled.shutdown.retry.backoff.ms = 5000
~		controller.listener.names = null
~		controller.quorum.append.linger.ms = 25
~		controller.quorum.election.backoff.max.ms = 1000
~		controller.quorum.election.timeout.ms = 1000
~		controller.quorum.fetch.timeout.ms = 2000
~		controller.quorum.request.timeout.ms = 2000
~		controller.quorum.retry.backoff.ms = 20
~		controller.quorum.voters = []
~		controller.quota.window.num = 11
~		controller.quota.window.size.seconds = 1
~		controller.socket.timeout.ms = 30000
~		create.topic.policy.class.name = null
~		default.replication.factor = 1
~		delegation.token.expiry.check.interval.ms = 3600000
~		delegation.token.expiry.time.ms = 86400000
~		delegation.token.master.key = null
~		delegation.token.max.lifetime.ms = 604800000
~		delegation.token.secret.key = null
~		delete.records.purgatory.purge.interval.requests = 1
~		delete.topic.enable = true
~		early.start.listeners = null
~		fetch.max.bytes = 57671680
~		fetch.purgatory.purge.interval.requests = 1000
~		group.initial.rebalance.delay.ms = 3000
~		group.max.session.timeout.ms = 1800000
~		group.max.size = 2147483647
~		group.min.session.timeout.ms = 6000
~		initial.broker.registration.timeout.ms = 60000
~		inter.broker.listener.name = null
~		inter.broker.protocol.version = 3.4-IV0
~		kafka.metrics.polling.interval.secs = 10
~		kafka.metrics.reporters = []
~		leader.imbalance.check.interval.seconds = 300
~		leader.imbalance.per.broker.percentage = 10
~		listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
~		listeners = SSL://kafka01.dev.localhost:9092
~		log.cleaner.backoff.ms = 15000
~		log.cleaner.dedupe.buffer.size = 134217728
~		log.cleaner.delete.retention.ms = 86400000
~		log.cleaner.enable = true
~		log.cleaner.io.buffer.load.factor = 0.9
~		log.cleaner.io.buffer.size = 524288
~		log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
~		log.cleaner.max.compaction.lag.ms = 9223372036854775807
~		log.cleaner.min.cleanable.ratio = 0.5
~		log.cleaner.min.compaction.lag.ms = 0
~		log.cleaner.threads = 1
~		log.cleanup.policy = [delete]
~		log.dir = /srv/tmp/kafka-broker-service/kafka-internal
~		log.dirs = null
~		log.flush.interval.messages = 9223372036854775807
~		log.flush.interval.ms = null
~		log.flush.offset.checkpoint.interval.ms = 60000
~		log.flush.scheduler.interval.ms = 9223372036854775807
~		log.flush.start.offset.checkpoint.interval.ms = 60000
~		log.index.interval.bytes = 4096
~		log.index.size.max.bytes = 10485760
~		log.message.downconversion.enable = true
~		log.message.format.version = 3.0-IV1
~		log.message.timestamp.difference.max.ms = 9223372036854775807
~		log.message.timestamp.type = CreateTime
~		log.preallocate = false
~		log.retention.bytes = -1
~		log.retention.check.interval.ms = 300000
~		log.retention.hours = 168
~		log.retention.minutes = null
~		log.retention.ms = null
~		log.roll.hours = 168
~		log.roll.jitter.hours = 0
~		log.roll.jitter.ms = null
~		log.roll.ms = null
~		log.segment.bytes = 1073741824
~		log.segment.delete.delay.ms = 60000
~		max.connection.creation.rate = 2147483647
~		max.connections = 2147483647
~		max.connections.per.ip = 2147483647
~		max.connections.per.ip.overrides =
~		max.incremental.fetch.session.cache.slots = 1000
~		message.max.bytes = 41943040
~		metadata.log.dir = null
~		metadata.log.max.record.bytes.between.snapshots = 20971520
~		metadata.log.max.snapshot.interval.ms = 3600000
~		metadata.log.segment.bytes = 1073741824
~		metadata.log.segment.min.bytes = 8388608
~		metadata.log.segment.ms = 604800000
~		metadata.max.idle.interval.ms = 500
~		metadata.max.retention.bytes = 104857600
~		metadata.max.retention.ms = 604800000
~		metric.reporters = []
~		metrics.num.samples = 2
~		metrics.recording.level = INFO
~		metrics.sample.window.ms = 30000
~		min.insync.replicas = 1
~		node.id = 1
~		num.io.threads = 8
~		num.network.threads = 3
~		num.partitions = 3
~		num.recovery.threads.per.data.dir = 1
~		num.replica.alter.log.dirs.threads = null
~		num.replica.fetchers = 1
~		offset.metadata.max.bytes = 4096
~		offsets.commit.required.acks = -1
~		offsets.commit.timeout.ms = 5000
~		offsets.load.buffer.size = 5242880
~		offsets.retention.check.interval.ms = 600000
~		offsets.retention.minutes = 10080
~		offsets.topic.compression.codec = 0
~		offsets.topic.num.partitions = 50
~		offsets.topic.replication.factor = 1
~		offsets.topic.segment.bytes = 104857600
~		password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
~		password.encoder.iterations = 4096
~		password.encoder.key.length = 128
~		password.encoder.keyfactory.algorithm = null
~		password.encoder.old.secret = null
~		password.encoder.secret = null
~		principal.builder.class = class org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
~		process.roles = []
~		producer.id.expiration.check.interval.ms = 600000
~		producer.id.expiration.ms = 86400000
~		producer.purgatory.purge.interval.requests = 1000
~		queued.max.request.bytes = -1
~		queued.max.requests = 500
~		quota.window.num = 11
~		quota.window.size.seconds = 1
~		remote.log.index.file.cache.total.size.bytes = 1073741824
~		remote.log.manager.task.interval.ms = 30000
~		remote.log.manager.task.retry.backoff.max.ms = 30000
~		remote.log.manager.task.retry.backoff.ms = 500
~		remote.log.manager.task.retry.jitter = 0.2
~		remote.log.manager.thread.pool.size = 10
~		remote.log.metadata.manager.class.name = null
~		remote.log.metadata.manager.class.path = null
~		remote.log.metadata.manager.impl.prefix = null
~		remote.log.metadata.manager.listener.name = null
~		remote.log.reader.max.pending.tasks = 100
~		remote.log.reader.threads = 10
~		remote.log.storage.manager.class.name = null
~		remote.log.storage.manager.class.path = null
~		remote.log.storage.manager.impl.prefix = null
~		remote.log.storage.system.enable = false
~		replica.fetch.backoff.ms = 1000
~		replica.fetch.max.bytes = 42943040
~		replica.fetch.min.bytes = 1
~		replica.fetch.response.max.bytes = 10485760
~		replica.fetch.wait.max.ms = 500
~		replica.high.watermark.checkpoint.interval.ms = 5000
~		replica.lag.time.max.ms = 30000
~		replica.selector.class = null
~		replica.socket.receive.buffer.bytes = 65536
~		replica.socket.timeout.ms = 30000
~		replication.quota.window.num = 11
~		replication.quota.window.size.seconds = 1
~		request.timeout.ms = 30000
~		reserved.broker.max.id = 1000
~		sasl.client.callback.handler.class = null
~		sasl.enabled.mechanisms = [GSSAPI]
~		sasl.jaas.config = null
~		sasl.kerberos.kinit.cmd = /usr/bin/kinit
~		sasl.kerberos.min.time.before.relogin = 60000
~		sasl.kerberos.principal.to.local.rules = [DEFAULT]
~		sasl.kerberos.service.name = null
~		sasl.kerberos.ticket.renew.jitter = 0.05
~		sasl.kerberos.ticket.renew.window.factor = 0.8
~		sasl.login.callback.handler.class = null
~		sasl.login.class = null
~		sasl.login.connect.timeout.ms = null
~		sasl.login.read.timeout.ms = null
~		sasl.login.refresh.buffer.seconds = 300
~		sasl.login.refresh.min.period.seconds = 60
~		sasl.login.refresh.window.factor = 0.8
~		sasl.login.refresh.window.jitter = 0.05
~		sasl.login.retry.backoff.max.ms = 10000
~		sasl.login.retry.backoff.ms = 100
~		sasl.mechanism.controller.protocol = GSSAPI
~		sasl.mechanism.inter.broker.protocol = GSSAPI
~		sasl.oauthbearer.clock.skew.seconds = 30
~		sasl.oauthbearer.expected.audience = null
~		sasl.oauthbearer.expected.issuer = null
~		sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
~		sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
~		sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
~		sasl.oauthbearer.jwks.endpoint.url = null
~		sasl.oauthbearer.scope.claim.name = scope
~		sasl.oauthbearer.sub.claim.name = sub
~		sasl.oauthbearer.token.endpoint.url = null
~		sasl.server.callback.handler.class = null
~		sasl.server.max.receive.size = 524288
~		security.inter.broker.protocol = SSL
~		security.providers = null
~		socket.connection.setup.timeout.max.ms = 30000
~		socket.connection.setup.timeout.ms = 10000
~		socket.listen.backlog.size = 50
~		socket.receive.buffer.bytes = 102400
~		socket.request.max.bytes = 104857600
~		socket.send.buffer.bytes = 102400
~		ssl.cipher.suites = [TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]
~		ssl.client.auth = required
~		ssl.enabled.protocols = [TLSv1.2]
~		ssl.endpoint.identification.algorithm = HTTPS
~		ssl.engine.factory.class = null
~		ssl.key.password = [hidden]
~		ssl.keymanager.algorithm = SunX509
~		ssl.keystore.certificate.chain = null
~		ssl.keystore.key = null
~		ssl.keystore.location = /srv/etc/appserver.jks
~		ssl.keystore.password = [hidden]
~		ssl.keystore.type = JKS
~		ssl.principal.mapping.rules = DEFAULT
~		ssl.protocol = TLSv1.3
~		ssl.provider = null
~		ssl.secure.random.implementation = null
~		ssl.trustmanager.algorithm = PKIX
~		ssl.truststore.certificates = null
~		ssl.truststore.location = /srv/etc/trust_app.jks
~		ssl.truststore.password = [hidden]
~		ssl.truststore.type = JKS
~		transaction.abort.timed.out.transaction.cleanup.interval.ms = 10000
~		transaction.max.timeout.ms = 900000
~		transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
~		transaction.state.log.load.buffer.size = 5242880
~		transaction.state.log.min.isr = 2
~		transaction.state.log.num.partitions = 50
~		transaction.state.log.replication.factor = 3
~		transaction.state.log.segment.bytes = 104857600
~		transactional.id.expiration.ms = 604800000
~		unclean.leader.election.enable = true
~		zookeeper.clientCnxnSocket = null
~		zookeeper.connect = localhost:2181
~		zookeeper.connection.timeout.ms = 300000
~		zookeeper.max.in.flight.requests = 10
~		zookeeper.metadata.migration.enable = false
~		zookeeper.session.timeout.ms = 18000
~		zookeeper.set.acl = false
~		zookeeper.ssl.cipher.suites = null
~		zookeeper.ssl.client.enable = false
~		zookeeper.ssl.crl.enable = false
~		zookeeper.ssl.enabled.protocols = null
~		zookeeper.ssl.endpoint.identification.algorithm = HTTPS
~		zookeeper.ssl.keystore.location = null
~		zookeeper.ssl.keystore.password = null
~		zookeeper.ssl.keystore.type = null
~		zookeeper.ssl.ocsp.enable = false
~		zookeeper.ssl.protocol = TLSv1.2
~		zookeeper.ssl.truststore.location = null
~		zookeeper.ssl.truststore.password = null
~		zookeeper.ssl.truststore.type = null

Thanks once again and if you think of something else as an issue do let me know.
Regards,
Manish

One more question , We are not setting (in 2.8 and so in 3.4)

control.plane.listener.name

Could this be an issue as I mentioned that below 3 log lines did not appear in Version 3.4?


*2023-02-20 14:05:10.306* +*0000* INFO [main] [kafka-brokers] [] [] [] [] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=*3*] Starting socket server acceptors and processors

*2023-02-20 14:05:10.307* +*0000* INFO [main] [kafka-brokers] [] [] [] [] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=*3*] Started data-plane acceptor and processor(s) for endpoint : ListenerName(SSL)

*2023-02-20 14:05:10.307* +*0000* INFO [main] [kafka-brokers] [] [] [] [] SocketServer - [SocketServer listenerType=ZK_BROKER, nodeId=*3*] Started socket server acceptors and processors

I tried comparing the Kafka broker logs from 2.8.0 vs 3.4.0 and I noticed that in V2.8.0 we had below lines which are missing when I start version 3.4.0

I think you’re onto something with control.plane.listener.name. From the 3.4 docs:

Note that the default value is null, which means that the controller will use the same listener defined by inter.broker.listener

I think that inter.broker.listener is a bug and should be inter.broker.listener.name since the former isn’t a valid config as far as I can tell. And since you don’t have this config set, I’m not sure what’ll happen. inter.broker.listener.name will default to whatever security.inter.broker.protocol is set to (SSL in your case), but I am pretty sure that defaults don’t cascade – i.e., security.inter.broker.protocol’s value getting used for inter.broker.listener.name, which then gets used for control.plane.listener.name.

So I’d recommend trying one or both of:

  1. set control.plane.listener.name to SSL
  2. set inter.broker.listener.name to SSL (and you may have to unset security.inter.broker.protocol).

I’ve tested #2 and it does work. Also, it doesn’t work if I only set security.inter.broker.protocol but don’t set inter.broker.listener.name which suggests that defaults indeed don’t cascade.

HTH,
Dave

Thanks
I will quickly give it a go and will reply back…

Hi Dave ,

Tests still failed with the above changes.
However we are suspecting that this is because of slowness in Kafka broker being started in version 3.4.0. (rather than failure)
To test this theory we are just putting some sleep before we start running Tests as that will give a chance to Kafka Broker to start (If that is a delay problem)
Will get back to you
Manish

Ah, yes, I should have clarified try only one of the options, not both since the value for control.plane.listener.name can’t be the same as that of inter.broker.listener.name (from here).

Startup race could definitely cause some flakiness - please do let me know how it goes on investigating that front.

Dave

Hi Dave ,

In spite of adding some sleep , still my tests failed
I looked again and after Kafka broker started I noticed again same errors which overlaps around the tests running (more of less)

These are the sequence of the tests

  1. All (3) Kafka Broker started
    3rd Kafka broker started at 09:44:42.452
2023-03-03 09:44:42.452 +0000 INFO  [main] [kafka-brokers] [] [] [] [] KafkaServer - [KafkaServer id=3] started
 2023-03-03 09:44:42.452 +0000 INFO  [main] [kafka-brokers] [] [] [] [] LifeCycleManager - Successfully started: LocalKafkaBroker

2)Getting some errors on console

 2023-03-03 09:45:17.744 +0000 WARN  [Controller-3-to-broker-3-send-thread] [kafka-brokers] [] [] [] [] RequestSendThread - [RequestSendThread controllerId=3] Controller 3 epoch 1 fails to send request (type: UpdateMetadataRequest=, controllerId=3, controllerEpoch=1, brokerEpoch=25, partitionStates=[], liveBrokers=UpdateMetadataBroker(id=3, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9094, host='kafka01.dev.localhost', listener='SSL', securityProtocol=1)], rack=null)) to broker kafka01.dev.localhost:9094 (id: 3 rack: null). Reconnecting to broker.
 java.io.IOException: Connection to 3 was disconnected before the response was read
	at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]
  1. There were 40 errors
    Last one at09:54:20.374
023-03-03 09:54:20.374 +0000 WARN  [Controller-3-to-broker-3-send-thread] [kafka-brokers] [] [] [] [] RequestSendThread - [RequestSendThread controllerId=3] Controller 3 epoch 1 fails to send request (type=LeaderAndIsRequest, controllerId=3, controllerEpoch=1, brokerEpoch=25, partitionStates=[LeaderAndIsrPartitionState(topicName='db.move-to-uc-core.moveToUcRequest', partitionIndex=2, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3], partitionEpoch=0, replicas=[3], addingReplicas=[], removingReplicas=[], isNew=true, leaderRecoveryState=0), LeaderAndIsrPartitionState(topicName='db.move-to-uc-core.moveToUcRequest', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3], partitionEpoch=0, replicas=[3], addingReplicas=[], removingReplicas=[], isNew=true, leaderRecoveryState=0), LeaderAndIsrPartitionState(topicName='db.move-to-uc-core.moveToUcRequest', partitionIndex=0, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3], partitionEpoch=0, replicas=[3], addingReplicas=[], removingReplicas=[], isNew=true, leaderRecoveryState=0)], topicIds={db.move-to-uc-core.moveToUcRequest=qAAwiRevSY6-IqoqUfUC4A}, liveLeaders=(kafka01.dev.localhost:9094 (id: 3 rack: null))) to broker kafka01.dev.localhost:9094 (id: 3 rack: null). Reconnecting to broker.
 java.io.IOException: Connection to 3 was disconnected before the response was read
	at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99) ~[kafka-clients-kafka-clients-3.4.0.jar:na]
	at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:252) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) ~[kafka_2.13-kafka_2.13-3.4.0.jar:na]

The Tests started at 2023-03-03 09:50:17.824 and they all failed.
To me it looks something is wrong with the kafka server code , dont know what.

Do you have any suggestions or advise , I mean does these errors hints at something although they are either Logged with INFO or WARN level?
Thanks Manish

For sanity’s sake you might want to do some experiments to rule out some other usual suspects:

  1. use plain rather than SSL
  2. run all on the same host with different ports to rule out networking / security group type issues

Or, have you already ruled these out by successfully running the 2.8 stack on the same VMs?

I’m not seeing much else at the moment without getting hands on - if you’re able to share an easily runnable repro I can take a shot at debugging.

HTH,
Dave

Sure , Will try and will get bak to you…

Update -
The failing BAT tests are passing now after I disabled (explicitly) enable.idempotence as FALSE .
From Version 3.0 + enable.idempotence becomes true and since we set acks to all I believe there is some sort of latency , in terms of Transactions (exactly once) ,
https://cwiki.apache.org/confluence/display/KAFKA/KIP-679%3A+Producer+will+enable+the+strongest+delivery+guarantee+by+default

however there are still some unit-integration tests are failing (where we start the broker programatically) because of timeout , even though I have disabled (explicitly) enable.idempotence , looking into that

Btw my BAT test also passed when I set the acks to 1 and enable.idempotence = TRUE (Default value)
This was strange as our ISR was set to 1 and there was only 1 broker
OR
acks to “all” and enable.idempotence = FALSE

All of my tests are passing now , thanks for your help.
Since we default the Idempotency as false, our tests are passing now
As an extra check I set the ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG as True for previous Kafka Version 2.8 (client and server both) and that successfully passed the tests which implies that there seems to be some regression (may be in server start up) from V2.8 to V3.0+

So the test passes for Kafka 3.0+ if we do
acks = 1 , Idempotency could be anything
acks = all, Idempotency to false

However they fails when acks = all and Idempotency is True

Can you think of what could be the reason ?

In general this is the error that is logged that prevents the topic to be created when acks = all, Idempotency to true even when min.insync.replicas is set to 1

11:40:18.837 [data-plane-kafka-request-handler-3] INFO kafka.server.ZkAdminManager - [Admin Manager on Broker 1]: Error processing create topic request CreatableTopic(name='test', numPartitions=1, replicationFactor=1, assignments=[], configs=[])
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 1 larger than available brokers: 0.