Unable to reset offset for a single partition

Hello everyone,

I’m fighting with trying to reset the offset for a single partition.
I have a cluster that has a “state” topic with two partitions. I start the cluster with a single consumer and it gets assigned both partitions.
I then add another consumer and they correctly get assigned a partition each, I’m using the CooperativeStickyAssignor.
What I want however is for each consumer, in this consumer group, to reset the offset of its assigned partition, when it joins.

I’m doing the following:

final KafkaConsumer<String, String> stateConsumer = new KafkaConsumer<>(stateConsumerProperties.getConsumerProps());
stateConsumer.subscribe(STATE_TOPIC);
...
stateConsumer.seekToBeginning(stateConsumer.assignment());
...
stateConsumer.poll(Duration.ofMillis(1000)) // timeout too long, testing only
    .forEach(record -> {
        log.info("Warmup state read: " + record.value() + ", partition: " + record.partition());
            stateMessages.add(record.value());
        });

This is my consumer config:

this.stateConsumerProperties.setConsumer(Map.of(
    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092",
    ConsumerConfig.GROUP_ID_CONFIG, "state",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "30",
    ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "2000",
    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CustomPartitionAssignor.class.getName()
));

CustomPartitionAssignor here is just an extended CooperativeStickyAssignor, with calling super for all overrides and adding some additional logging (can paste if needed, will omit for brevity).

This is what I get in the log when the second consumer joins however (it starts from the last committed offset):

consumer_2 | 2022-10-10 18:46:26.833 INFO 7 — [pool-1-thread-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-state-2, groupId=state] Setting offset for partition state-1 to the committed offset FetchPosition{offset=361, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[179bd3c6448e:9092 (id: 1001 rack: null)], epoch=0}}

Another detail is that when each of these consumers join, they read the state topic to a certain point then start producing to the state topic. So there is a point where the first consumer will write to and read from its state partitions.

Any help and/or guidance is very appreciated, thanks in advance :slight_smile:

1 Like

Hi Simeon!

What’s Happening

auto.offset.reset has some annoying nuances… and it doesn’t always do what you think it should do. When the consumers start up, they first look at their committed offsets and will use that offset as a bookmark from which to begin processing. In the event that there is not an offset present – either the consumer is brand new or there are no committed offsets – the consumer will then fall back to auto.offset.reset.

What I think is happening here is that your consumers have likely been around for a bit, and they’ll have had time to commit their offsets. So when you start your consumers up, they’re ignoring auto.offset.reset and using that offset.

What You Can Do

  • Do you want your consumers to start from the beginning just this once? Meaning, you need them to start from the earliest offset to get all of the state and from there they should be free to commit offsets and use bookmarks? To do this, change ConsumerConfig.GROUP_ID_CONFIG so that it’s referring to a new consumer group. By doing so, when your consumers start up, they won’t have a consumer offset to refer to, and they’ll default to that auto.offset.reset value.
  • Do you want your consumers to always start at the earliest offset no matter what? In this case, what you really want is a non-committing consumer. In addition to changing ConsumerConfig.GROUP_ID_CONFIG to a new name, you’ll also want to add ConsumerConfig.ENABLE_AUTO_COMMIT, "false" to your configuration. By doing so, your new consumer group will not attempt to store their offsets, and, as such, they will always start from auto.offset.reset.

Hope that helps! Give it a try and let me know what happens!

2 Likes

Hey Danica,

Thanks a LOT for the reply :slight_smile:

I looked at your suggestions and I don’t think I can get away with placing the state consumers in separate groups, as I need to have a sticky cooperative rebalance. I want to avoid the case where a consumer that has loaded sufficient state disconnects and has to catch up again (of course I’ll maybe have to go this way if there is no choice :)).

The thing that bothers me the most here is that calling

consumer.seekToBeginning()

doesn’t seem to work (or I’m misunderstanding, how it’s supposed to work :)). I’m not trying to rely on auto.offset.reset, I’m trying to explicitly reset the offset for each consumer, when it gets a state partition assigned. I tried calling .seekToBeggining before I start looping .polls and I also tried calling it in a ConsumerRebalanceListener#onPartitionsAssigned callback, both didn’t work.

What I found is that consumer.subscribe(topic) doesn’t actually cause rebalance until I call ,poll. Could it be possible that explicit offset reset is callable only after rebalance is complete? I see no offset related messages in the log before the first poll.

I actually tried

.poll()
.seekToBeginning()
.poll()

to check if this help, but there seems to be no change. .seekToBeginning() worked when I place it after every .poll call in the “poller loop”, but it still doesn’t seem to do anything prior to the first .poll call.

Thanks again for the help!
Hope this give a little more clarity.

Ah I think there was a bit of confusion. I’m not suggesting that you have a consumer group for each consumer. You can still have a single consumer group. The renaming of ConsumerConfig.GROUP_ID_CONFIG was simply to trigger a new consumer group so that there weren’t any offsets stored for that consumer group and they would start afresh.

Unfortunately, consumer.seekToBeginning() will only work if the brokers have a heartbeat to your consumer, so poll() needs to be called first.

I think the second situation I mentioned would help you. Set all of your consumers to have the same group id (but change it so that it’s not “state” like it is now). Use ConsumerConfig.ENABLE_AUTO_COMMIT, "false" so that you don’t commit offsets. That way, when consumers rebalance and they receive new partitions to handle, they still won’t have a committed offset for that partition, and they will read from the beginning no matter what. You will have your consumers read from the beginning regardless of when they receive their partition assignment.

1 Like

You’re correct in saying that your second original proposal would solve the problem, so I’ll be accepting that answer as correct :slight_smile:

A detail that I forgot to mention however, is that I was actually using the diff between the consumer and producer offset to find out when a state consumer has caught up to a state producer. This way I know when they’re done with “warmup” and are ready to (manually) subscribe to another topic where they need the state to process.

consumer_2 | 2022-10-13 08:53:07.842 INFO 7 — [ main] c.endava.fdi.poc.kafka.service.Consumer : Lags: [MessageLag{consumerOffset=400, producerOffset=430, lagMessages=30, timestamp=2022-10-13T08:53:05.840337Z}, MessageLag{consumerOffset=null, producerOffset=400, lagMessages=null, timestamp=2022-10-13T08:52:05.840337Z}]

So when I disabled the auto offset commit this lag measurement stopped working of course :slight_smile:

What I ended up doing (and it worked!) is to actually “rewind” the partition on revocation in the “old” consumer synchronously, before the new one joins.

Essentially I do the following when I set up the state consumer:

stateConsumer.subscribe(STATE_TOPIC, new ConsumerRebalanceListener() {
    
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        stateConsumer.seekToBeginning(partitions);
        partitions.forEach(stateConsumer::position); // call position as seekToBeginning evaluates lazily 
        stateConsumer.commitSync();
    }
...
});

I wouldn’t get here without help however so thanks again for the involvement :slight_smile:

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.