I have read the below discussions
- java - Difference between KakaConsumer.poll(timeoutMs) and KafkaConsumer.poll(Duration.ZERO) - Stack Overflow
- rebalancing - How to check if Kafka Consumer is ready - Stack Overflow
But it is still not clear to me on how to determine a consumer is ready while starting a consumer using assign()
My code looks something like below
List<TopicPartition> topicPartitions = List.of(new TopicPartition("topic", 1));
// auto.offset.reset = latest
consumer.assign(partitions);
// first poll to initiate connection as mentioned in https://stackoverflow.com/a/54336476/785523
consumer.poll(Duration.ofSeconds(1));
boolean keepOnReading = true;
while(keepOnReading){
final var records = consumer.poll(Duration.ofSeconds(1));
// process records and stop polling by setting keepOnReading to false
}
But I am observing that if I make the first poll to 100ms (consumer.poll(Duration.ofMillis(100))
) then consumer.poll()
on the while loop does not get any records. It works fine if poll duration is 1 second or more. Can someone let me know
- What is the reliable way to determine if a consumer is ready using
assign()
. - In case of
assign
what is the correct way to do a seek after starting a consumer?
- Kafka Client Version - 3.4.0
- Kafka Broker Version - confluentinc/cp-kafka:7.3.1