Messages processing more than once in a exactly_once kafka streams application

hello everybody. I’m having problems with kafka streams recently in my application. When upgrading kafka streams from 2.8.1 to 3.4.0 i am having messages being processed more than once, even though my application has always been using exactly_once. Using version 2.8.1 or 3.0.0 this problem doesnt seems to happen, in addition It only seems to happen when I have a large volume of messages being processed and several instances of Kafka Streams running.

The problem occurs after a Kafka transaction timeout followed by a rebalance. At the end of processing I can see that some messages are processed more than once, the more rebalancing, the more the problem occurs. I reduced the transaction timeout to simulate occasional processing timeouts just for testing reasons.

I upgraded the version from 2.8.1 to 3.0.0 with exactly_once and then to 3.4.0 using exactly_once_v2, restarting the Kafka streams services with each update, but not the kafka brokers.

Reverting exactly_once_v2 to exactly_once had no effect, I tested with versions 3.5.0 and 3.7.0 and the problem persists.

This application has been in production for years and we have never had any problems like this, I would like help understanding what could be happening.

I’m currently investigating the problem in a test Kubernetes environment with 3 kafka brokers and Kafka streams with these configurations:

{DEFAULT_KEY_SERDE_CLASS_CONFIG      (class (Serdes/String))
 DEFAULT_VALUE_SERDE_CLASS_CONFIG    (class (JsonSerde.))
 NUM_STREAM_THREADS_CONFIG           4
 PROCESSING_GUARANTEE_CONFIG         StreamsConfig/EXACTLY_ONCE_V2
 REPLICATION_FACTOR_CONFIG           3
 SECURITY_PROTOCOL_CONFIG           "PLAINTEXT"
 ;; Consumer
 AUTO_OFFSET_RESET_CONFIG           "earliest"
 CLIENT_DNS_LOOKUP_CONFIG           "use_all_dns_ips"
 ;; Producer
 COMPRESSION_TYPE_CONFIG            "zstd"
 CLIENT_DNS_LOOKUP_CONFIG           "use_all_dns_ips"
 MAX_BLOCK_MS_CONFIG                "300000"
 TRANSACTION_TIMEOUT_CONFIG         "25000" -> (for tests)
 MAX_REQUEST_SIZE_CONFIG            "10400000"
}

I would appreciate any help, thanks.

i am having messages being processed more than once

Can you elaborate a little bit more? In general, EOS does not mean that input messages are not re-read and retried after an error. EOS guarantees that if there was an error, the corresponding output messages are not committed as part the a transaction. Thus, a downstream consumer would also need to be configures with READ_COMMITTED to not see aborted messages.

Are you saying you have duplicate messages in the output topic, returned by a consumer in READ_COMMITTED more? Or what exact observations do you make?

Hi, thanks for the reply. We always use isolation_level “read_commited”, in our application scenario we specifically need exactly_once. Here are some configurations of our Kafka consumers:

"max.poll.records"   500
"isolation.level"    "read_committed"
"auto.offset.reset"  "earliest"
"enable.auto.commit" "false"

Regarding duplicate messages, no, we are not having exactly duplicate messages in the output topics, we are having messages being processed more than once in some topics. We have internal control ids for operations and output files based in other topics, and when a message is reprocessed, an extra message is passed to the output topic which can be rejected by our internal control or executed and generate an extra operation that should not exist, it all depends on whether it was reprocessed at the beginning or end of the flow . But we can know for sure whether and how many duplicate reprocessings occurred.

I hope I explained it a little better

we are not having exactly duplicate messages in the output topics

For this case, Kafka Streams works as designed. That’s a relieve…

we are having messages being processed more than once in some topics

Well, as I said. If there is an error and a TX is aborted, we roll back and retry. So this is expected. Not sure why this seems to happen more often after the upgrade from top of my head. I guess you need to read some logs to find the root cause and change config parameters afterward to stabilize the system (eg, tx-timeout, commit interval, max poll record, max poll interval are all good candidates to double check).

and when a message is reprocessed, an extra message is passed to the output topic

Not sure what you mean by “extra message” – if there is input record R1, and it’s processed the first time, and it produced output message O1, O1 should be part of the TX. Thus, if we abort the TX, O1 should not be seen by any downstream system, and when we retry R1, we would write O1 again, and hopefully commit on the retry.

which can be rejected by our internal control or executed and generate an extra operation that should not exist,

So your internal controller should never see O1 and not extra operation should be triggered.

it all depends on whether it was reprocessed at the beginning or end of the flow .

Not sure what this means?

but that’s it exactly what is happening, in most error cases “validation-topic-by-id” receives a message and writes its id in a KS state-store. Then the exact same message is reprocessed and then rejected because its id is no longer unique. The message is processed until the end because it reaches our output topics. I don’t think that this has to do with uncommited messages, because before version 3.4.0 we already had uncommited messages caused by instability but never anything that violated exactly_once.

It is related to the path that messages take through our system, If reprocessing occurs before a certain validation, we have a rejected message, if it occurs after, we have a success, but duplicated. Not really important to the question.

I did a little more research and we upgraded from 2.8.1 to 3.4.0 without using the upgrade.from configuration. This could be the problem.

That is interesting, and worrisome. If you process and input message, and update the store, and there is an error, we should revert the store to it’s old state before we re-try under EOS. – Can you check if this is happening correctly? After an EOS error, we should wipe out state stores, and rebuild from the changelog to put them back into their old state.

I did a little more research and we upgraded from 2.8.1 to 3.4.0 without using the upgrade.from configuration. This could be the problem.

:thinking: – Did not go back to the docs. Can you elaborate?