Fault Tolerant Stream without Logging in State Store

I am wondering if it is possible to have a fault tolerant stream without logging in the state store. Based on my use case, all I need is for my custom processor to commit records after punctuate, and not commit any records that have not been punctuated.

For example, here is a simple processor that outputs the latest record for each key, using an internal version that is not necessarily the same as the Kafka timestamp (so compacted log is not possible).

public class LatestProcessor implements Processor<Long, Dto> {
    private ProcessorContext context = null;
    private KeyValueStore<Long, Dto> kvStore;

    public LatestProcessor () {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.kvStore = (KeyValueStore) context.getStateStore(Config.stateStore);
        this.context.schedule(Duration.ofMillis(TimeUnit.MINUTES.toMillis(1)),
                PunctuationType.WALL_CLOCK_TIME, (l) -> {
                    this.punctuate(l);
                });
    }


    @Override
    public void process(Long key, Dto message) {
        Dto current = kvStore.get(key);
        if (current == null || current.version() <= message.version()) {
              kvStore.put(key, message);
        }
    }


    public void punctuate(long timestamp) {
        // processes all the messages in the state
        KeyValueIterator<Long, Dto> iter = this.kvStore.all();
        while (iter.hasNext()) {
            KeyValue<Long, Dto> entry = iter.next();
            context.forward(entry.key, entry.value);
            kvStore.delete(entry.key);

          
        }
        iter.close();
        context.commit();
    }

    @Override
    public void close() {
        kvStore.close();
    }

}

Is it possible for this processor to be fault-tolerant with a state store without logging? As long as the stream does not commit records that have not gone through punctuate, it should be fault tolerant. If the stream commits records that have not been through punctuate, then I would assume we would need to add logging to the state store.

We can assume the Topology is only this processor, and it forwards to a downstream topic.

Well, that’s all a little difficult. Short answer is “no”.

  1. Kafka Streams commits on multiple occasion that you cannot fully control: commit.interval.ms (you could set it to MAX_VALUE to effectively disable it), but a commit happens also when a rebalance starts to hand off a task to a different instance.
  2. context.commit() is a request to commit, but after it returned the commit was not executed yet, and thus you are not in full control when the commit you want to do will happen. (cf Agenda | Current 2023)

Btw: As pointed out in the JavaDocs, you should not close the state store in close()… Kafka Streams might still need access to it, after your Processor was closed, and Kafka Streams will close the state store in the background automatically for you.

I would also recommend to use try-with-resource construct to ensure the iterator is close even in case of a failure.

Thanks!

I had one other quick question if you have time. Within punctuate we are deleting from the state store.

public void punctuate(long timestamp) {
        // processes all the messages in the state
        KeyValueIterator<Long, Dto> iter = this.kvStore.all();
        while (iter.hasNext()) {
            KeyValue<Long, Dto> entry = iter.next();
            context.forward(entry.key, entry.value);
            kvStore.delete(entry.key);
        }
        iter.close();
        context.commit();
    }

I think for this Topology to be fault tolerant, the next step must be a Kafka sink, right?

If the next step is not a Kafka sink, and this processor fails (at any stage), we would not be able to restore since messages may been deleted from the state store without being committed.

If the next step is a Kafka sink, then it is fault tolerant because ProcessorContext::forward would have sent every message we deleted.

Not sure if I fully understand. Guess it depends what you really mean by “fault tolerant” for this case.

You also need to consider how Kafka Streams’ runtime “chains” Processor (it’s also covered in the talk I linked previously), plus you need to consider that the Producer also first buffers data in its write buffer, so output records are sent async.

Kafka Streams would of course first flush() the producer before committing input topic offsets, but for your use case this might not help. Depending on the producer internal “auto flushing”, writes (ie, deleted) into the store/changelog could make it, while writes into the result topic might still be pending.

In the end, you would need to enable EOS to get the guarantees you want (deletes don’t play well with at-least-once for your use case). – At-least-once really only means to guarantee that Kafka Streams processes every input record at-least-once. But updates to state stores or writes into output topics are not really covered, and it’s really up to your own Processor code to ensure that processing each input record is done “correctly”.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.