We are currently investigating a data loss in our Kafka Streams application.
Both the state store and outbound topic message got lost for a single message (which we detected by pure chance, and yes, only one message got lost), which should not be possible in a Kafka Streams application.
Here’s a small excerpt of our Kotlin application which does some deduplication work after a simple data transformation (data transformation not shown here):
override fun process(record: Record<K, V>) {
val lastDownstreamed = downstreamedStore[record.key()]
if (record.value() == lastDownstreamed) {
return
}
downstreamedStore.put(record.key(), record.value())
context.forward(record)
}
We are using Kafka Streams with the default configuration, which means that at-least-once processing is active. acks are set to all (-1). Our topic replication is set to 3 and our borkers min.insync.replicas is set to 2. Producer retries are set to max int. Both input and output topics are set to “compact”.
We are not using any other features beside state-stores; thus, caching etc should not affect us.
We are wondering how this data loss can even happen? All the configuration seems to be okay, which means that Kafka Streams should only commit the read offset after the underlying producers have flushed().
And no, the error is not in the equality check. Resetting the offsets on the input topic did not drop the message in a second run.
Timing wise, two messages were produced to the input topic in short succession (800ms) and only the earlier message was consumed, transformed and produced to the state-store and outgoing topic. The second, newer message, simply got “dropped” in regards to our outputs. Then, the offset of the newer message got commited which yielded the outlined data loss.
This is really strange and makes us uncomfortable, as data loss is hard to impossible to detect - this was pure chance.
Interestingly, we are having broker issues with said service. But these did not happen at the time of the data loss. But I still wanted to attach the errors we got in case that could matter. Sadly, our infrastructure team can not find the root cause of the broker issue, as everything seems to be o.k. metric-wise (it’s an AWS hosted MSK). Also, no cluster maintenance was happening, no broker had any restarts in the given timeframe:
org.apache.kafka.common.errors.TimeoutException: Topic redacted-changelog not present in metadata after 60000 ms. The broker is either slow or in bad state (like not having enough replicas) in responding the request, or the connection to broker was interrupted sending the request or receiving the response. Consider overwriting `max.block.ms` and /or `delivery.timeout.ms` to a larger value to wait longer for such scenarios and avoid timeout errors
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_8] are corrupted and hence need to be re-initialized at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:447) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:297) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1108) ~[kafka-clients-3.9.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:993) at org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:268) ~[kafka-streams-3.9.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:262) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.logChange(ProcessorContextImpl.java:143) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.log(ChangeLoggingKeyValueBytesStore.java:142) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:79)
These logs revolve around the state-store topics, which does not explain any “data loss” on the output topic.
Thank you for your time!