Hello everyone,
I’m fighting with trying to reset the offset for a single partition.
I have a cluster that has a “state” topic with two partitions. I start the cluster with a single consumer and it gets assigned both partitions.
I then add another consumer and they correctly get assigned a partition each, I’m using the CooperativeStickyAssignor.
What I want however is for each consumer, in this consumer group, to reset the offset of its assigned partition, when it joins.
I’m doing the following:
final KafkaConsumer<String, String> stateConsumer = new KafkaConsumer<>(stateConsumerProperties.getConsumerProps());
stateConsumer.subscribe(STATE_TOPIC);
...
stateConsumer.seekToBeginning(stateConsumer.assignment());
...
stateConsumer.poll(Duration.ofMillis(1000)) // timeout too long, testing only
.forEach(record -> {
log.info("Warmup state read: " + record.value() + ", partition: " + record.partition());
stateMessages.add(record.value());
});
This is my consumer config:
this.stateConsumerProperties.setConsumer(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092",
ConsumerConfig.GROUP_ID_CONFIG, "state",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "30",
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "2000",
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CustomPartitionAssignor.class.getName()
));
CustomPartitionAssignor here is just an extended CooperativeStickyAssignor, with calling super for all overrides and adding some additional logging (can paste if needed, will omit for brevity).
This is what I get in the log when the second consumer joins however (it starts from the last committed offset):
consumer_2 | 2022-10-10 18:46:26.833 INFO 7 — [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-state-2, groupId=state] Setting offset for partition state-1 to the committed offset FetchPosition{offset=361, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[179bd3c6448e:9092 (id: 1001 rack: null)], epoch=0}}
Another detail is that when each of these consumers join, they read the state topic to a certain point then start producing to the state topic. So there is a point where the first consumer will write to and read from its state partitions.
Any help and/or guidance is very appreciated, thanks in advance