Hi! I am querying a kafka topic with a newly created consumer group. I chose the following props:
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getCanonicalName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "someRandomGroup2");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
My polling logic is like so:
try (KafkaConsumer<String, JsonNode> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(List.of(topic));
ConsumerRecords<String, JsonNode> records;
records = consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_TIMEOUT_MS));
System.out.println("Length of recs: " + records.count());
}..
The issue is similar to here: Consumer.poll() returns empty ConsumerRecords
Setting KAFKA_CONSUMER_TIMEOUT_MS = 1000 (ie 1s) makes results empty very often (4/5 times)
Setting KAFKA_CONSUMER_TIMEOUT_MS = 10000 (ie 10s) gets me at best 1 record from 1 partition.
When I change AUTO_OFFSET_RESET_CONFIG to ‘latest’ and set a new consumer group id = “latestConsumerGroup”, even with 10s timeout I get 0 results.
Why? This feels so weird and buggy. Am I doing something incorrect here?
Also, how does the confluent cloud UI list of messages on a partition work? My hope was to replicate similar functionality (ie fetch N messages from start or end - anything will do).