Kafka Streams - bizarre data loss

We are currently investigating a data loss in our Kafka Streams application.
Both the state store and outbound topic message got lost for a single message (which we detected by pure chance, and yes, only one message got lost), which should not be possible in a Kafka Streams application.

Here’s a small excerpt of our Kotlin application which does some deduplication work after a simple data transformation (data transformation not shown here):

override fun process(record: Record<K, V>) {
    val lastDownstreamed = downstreamedStore[record.key()]
    if (record.value() == lastDownstreamed) {
        return
    }

    downstreamedStore.put(record.key(), record.value())
    context.forward(record)
}

We are using Kafka Streams with the default configuration, which means that at-least-once processing is active. acks are set to all (-1). Our topic replication is set to 3 and our borkers min.insync.replicas is set to 2. Producer retries are set to max int. Both input and output topics are set to “compact”.
We are not using any other features beside state-stores; thus, caching etc should not affect us.

We are wondering how this data loss can even happen? All the configuration seems to be okay, which means that Kafka Streams should only commit the read offset after the underlying producers have flushed().
And no, the error is not in the equality check. Resetting the offsets on the input topic did not drop the message in a second run.

Timing wise, two messages were produced to the input topic in short succession (800ms) and only the earlier message was consumed, transformed and produced to the state-store and outgoing topic. The second, newer message, simply got “dropped” in regards to our outputs. Then, the offset of the newer message got commited which yielded the outlined data loss.

This is really strange and makes us uncomfortable, as data loss is hard to impossible to detect - this was pure chance.

Interestingly, we are having broker issues with said service. But these did not happen at the time of the data loss. But I still wanted to attach the errors we got in case that could matter. Sadly, our infrastructure team can not find the root cause of the broker issue, as everything seems to be o.k. metric-wise (it’s an AWS hosted MSK). Also, no cluster maintenance was happening, no broker had any restarts in the given timeframe:

org.apache.kafka.common.errors.TimeoutException: Topic redacted-changelog not present in metadata after 60000 ms. The broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors

org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_8] are corrupted and hence need to be re-initialized at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:447) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:297) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1108) ~[kafka-clients-3.9.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:993) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:268) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:262) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.logChange(ProcessorContextImpl.java:143) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:142) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:79)

These logs revolve around the state-store topics, which does not explain any “data loss” on the output topic.

Thank you for your time!

I believe it’s your de-duplication logic – and the fact how at-least-once works. The important part to understand is, that at-least-once only means, that it’s guaranteed that every input message is read and processed at least once; it does not guarantee that re-processing a message gives you the same result. The problem is potentially dirty state, which may lead to a different result during re-processing. With at-least-once, state is not rolled-back after a failure.

Message X comes in, and it’s not a duplicate. Thus, the put() and forward() calls execute. The put() is successful on RocksDB (but not the changelog) and forward() is not able to write the result record into the output topic either. – Now KS retries. The problem is, that the state store still contains the message. Thus, you re-read the same message X, but this time it’s incorrectly detected as a duplicate and it’s dropped on the floor.

After you did the offfset/app reset, I assume that state was somehow reset to a clean snapshot not containing message X any longer, and thus this time X was put into the output during re-processing.

In general, if you need deduplication, you need to use exactly-once. Otherwise, it’s not possible to write correct code. You either have the potential data loss problem you might have experience, or you change your code with the risk to introduce duplicates in the output topic.

We are using in-memory state stores (not RocksDB). But are you telling me that an in-memory put is not “reverted” when a re-try occurs? I was under the impression that a retry would revert in-memory stores to their previous state (which only contains commited changes); thinking about it now: that seems hard to implement. It would explain what we are seeing, but the fix would be easy with at-least once processing if we are fine with letting duplicates through in such edge cases:

override fun process(record: Record<K, V>) {
    val lastDownstreamed = downstreamedStore[record.key()]
    if (record.value() == lastDownstreamed) {
        return
    }

    context.forward(record)
    // only add to the downstreamedStore *after* sending the message
    downstreamedStore.put(record.key(), record.value())
}

This way, the in-memory downstreamedStore is only modified after a successful forward, right? Or am I still missing things?

How would EOS help here? If the in-memory state store can be “dirty” (it contains uncommmited changes from a previous attempt) after a failed attempt, it would still run into the outlined issue if I follow correctly.

Thank you again!

Would need to dig more into it. It depends if the task is considered corrupted, and if the task and its state store is closed (which would drop all in-memory state), and re-opened (which would rebuild the state from the changelog). Kafka Streams tries to be smart and not close a task/state-store trying to preserve in-memory state across errors/rebalances (it’s expensive to restore from the changelog) – so it’s certainly possible that the state did survive.

Not sure if you have application logs that would reveal this?

Exactly-once would help because it would do a state roll-back on error, and thus it’s guaranteed that reprocessing (assuming that there is no other external side effects or non-deterministic business logic) will provide the exact same result. Ie, we would force state to be rebuild from the changelog, and the changelog is guarded by the underlying Kafka transaction (together with output topics, and committing offsets).

For the updated code you shared flipping the order of operations. I understand the idea, however, because the producer.send() call is async, if context.forward(…) returns, it’s not guaranteed that the write into the output topic was already done or not. Thus, it doesn’t really solve the problem, because the put() could still execute first, due to multi-threading (the producers “sender thread” might still buffer the result record).

Btw: doing de-duplication without exactly-once will never be perfect. Even if we take that “dirty state store” problem out of the picture, and even if you use idempotent producer, there is still edge cases what could introduce duplicates in the result.

I can follow your arguments but theres still an issue: both writes got lost. If your scenario is what happened, I would expect just a state-store commit (containing the “false duplicate”), which we ruled out by looking at the changelog topic.

I still cannot see any scenario where both writes are lost, at least the state-store write should be commited in the “worst-case”. Losing both writes should be impossible, if I understand correctly.

I think what happened it:

  • read message at offset X
  • put into state store
  • try to write to changelog → did not go through yet (async write)
  • try to write to output topic → did not go through (async write)
  • ERROR

So no offset commit happed – we would only commit offset, after both writes into the changelog and output topic were successful. KS would flush the producer and wait for all acks to come back, before committing offset. But KS failed before it even attempted to a a commit.

Now KS recovers, and seeks back to offset X, but the state store says dirty:

  • re-read message at offset X
  • drop message as duplicate (not writes into changelog or output topic)
  • flush everything and commit offset

Sorry, I can’t quite follow here.
In your example “flush everything and commit offset” should at least flush the dirty state store changes which did not happen - both writes got lost.
Or is the state store change somehow associated to the rolled back and retried offset and is thus not tracked by the retry?
That would be very confusing, at least to me.

And would EOS fix this, at all? I currently have the impression that in memory state stores may be dirty when a retry is needed, in all configurations.

No, because the write into the state store and the writes into the topics are independent with at-least-once. So after the ERROR, we don’t know that we had pending writes which did not get flushed.

Ie, we do three writes: the store, the changelog, and the output topic. All three writes are buffered, and after they are buffered we “forget” about them (we would flush all three writes before we commit offset at some future point; but we never get to this point). The write into the store makes it to disk (ie, w/o an explicit flush). The other two are still pending. Now we crash, and we lose the two writes into the changelog and output topic.

When we restart to recover, we seek back to the last committed offset. At this point, we have no way to know, that we had a successful write into the store, so we cannot redo any write into the topics.

So we now re-process the input record but find the write in the state store, and consider the record as duplicate and drop it on the floor.

EOS would fix it, because for this case, after error, we would drop the state store and re-build it from the changelog. The writes never make it into the changelog/output topics as the TX would be aborted after the crash before we seek back to last committed offset, and thus after we re-build the store we have the guarantee that any “uncommitted writes” which were physically flushed to disk is not there any longer.

I see that makes sense, thank you for taking the time to help me!

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.