Concurrent message processing

I observe the strange behavior - if I receive two consistent messages one with some data, and right after that with null

I see in the output topic the right sequence

But in logs I see the warning

Detected out-of-order KTable update for in-fix-store, old timestamp=[1748334084071] new timestamp=[1748334084050]. topic=[offer-structure-write-v2x11-offer-out-fixture] partition=[7] offset=[6863].

Looks like that my application process null much faster than data and produce the data with correct order but with “wrong” timestamp…

Is it possible?

S.

Hard to say w/o more information.

There is some known bugs about FK-joins that could be related – but not sure if you are using FK joins or what version you are using (these bug got fixed in 3.9.1 and 4.0.0 releases).

Yes, I know about the bug(s) and it is situation after 3.9.1 update. I will try to investigate and collect more information.

BTW. The output of this topic (with out-of-order KTable warn) is input for the next Kafka Stream application (WS) and EACH of such warning automatically produce the next set of warning in the WS application e.g.

{"@timestamp":"2025-05-27T05:21:29.933244881Z","log_level":"WARN","application":"offer-structure-eps-write-v2x11","thread":"offer-structure-write-v2x11-websocket-StreamThread-7","logger":"org.apache.kafka.streams.kstream.internals.KTableSource","message":"Detected out-of-order KTable update for store-from-offer-fixture, old timestamp=[1748323289427] new timestamp=[1748323289289]. topic=[offer-structure-write-v2x11-offer-out-fixture] partition=[10] offset=[1768]."}
{"@timestamp":"2025-05-27T05:21:29.947204971Z","log_level":"WARN","application":"offer-structure-eps-write-v2x11","thread":"offer-structure-write-v2x11-websocket-StreamThread-7","logger":"org.apache.kafka.streams.kstream.internals.KTableSource","message":"Detected out-of-order KTable update for in-fix-store, old timestamp=[1748323289427] new timestamp=[1748323289289]. topic=[offer-structure-write-v2x11-offer-out-fixture] partition=[10] offset=[1768]."}
{"@timestamp":"2025-05-27T05:21:30.071290858Z","log_level":"WARN","application":"offer-structure-eps-write-v2x11","thread":"offer-structure-write-v2x11-websocket-StreamThread-7","logger":"org.apache.kafka.streams.kstream.internals.KTableSource","message":"Detected out-of-order KTable update for store-join-fix-home-part, old timestamp=[1748323289427] new timestamp=[1748323289289]. topic=[offer-structure-write-v2x11-websocket-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000103-topic] partition=[10] offset=[289]."}
{"@timestamp":"2025-05-27T05:21:30.096900959Z","log_level":"WARN","application":"offer-structure-eps-write-v2x11","thread":"offer-structure-write-v2x11-websocket-StreamThread-7","logger":"org.apache.kafka.streams.kstream.internals.KTableSource","message":"Detected out-of-order KTable update for store-join-fix-away-part, old timestamp=[1748323289427] new timestamp=[1748323289289]. topic=[offer-structure-write-v2x11-websocket-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000116-topic] partition=[10] offset=[269]."}
{"@timestamp":"2025-05-27T05:21:30.197370803Z","log_level":"WARN","application":"offer-structure-eps-write-v2x11","thread":"offer-structure-write-v2x11-websocket-StreamThread-7","logger":"org.apache.kafka.streams.kstream.internals.KTableSource","message":"Detected out-of-order KTable update for part-join-fix-store, old timestamp=[1748323289427] new timestamp=[1748323289289]. topic=[offer-structure-write-v2x11-websocket-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000116-topic] partition=[10] offset=[269]."}

Is there some simple way how to reset timestamp of the messages in the source topic according read time (offsets)? Or maybe from another side - on the output (sink) from the previous Kafka Stream application?

If you are using FK joins, data is naturally shuffled, and thus, out-of-order data can get introduced. – But if this only did become an issue after an upgrade to 3.9.1, I am now worried about a potential regression (what was the old version before the upgrade?).

There is many ways how you can modify timestamps – but it would of course change your event-time processing semantics. Record timestamps is data. – Of course (as always), it depends on your application requirements, if changing timestamps would mess with your application and potentially lead to incorrect result.

On a high level, using versioned state stores (instead of default key-value-timestamp store) would help to handle out-of-order data more gracefully.

You could also use a custom TimestampExtractor, or a custom process(...) step, to modify timestamps. (Or, even more “hard cord” switch to log-append time semantics in your topic configuration.)

No, it was before update too. After update it is only one issue with out-of-order.