Retrying join-operation for records

I have a Kafka Streams application responsible for joining each stream record with a corresponding entry in a KTable. The application works in the following way:

Records (known as values) are consumed to a KStream. Records from another topic (known as keys) are consumed to a KTable. Each value in the KStream are joined in a leftjoin operation with the corresponding key in the KTable, and the result from the join operation is send to an output topic.

Occasionally, the value is received before the corresponding key, resulting in an unsuccessful join-operation. As of the current solution, these non-joined records are discarded, resulting in loss of data. I have experimented with splitting the KStream after the join operation into two stream branches. Branch 1, consisting of joined records, produces the records to the output topic. Branch 2, consisting of non-joined records, stores the records in a state-store, sheduled to produce the records to the input topic of the application with 1 second intervals.

What are some solutions to retry/delay key-value-pairs that are not successfully joined?
How can I make sure that values are joined with the corresponding key even though the key is delayed?

I’ve made a simplified illustration of my “experiment”:

1 Like

Hi @HGodal - what you have presented here is something I’ve seen done before.
Another approach could be to use a KStream for your “keys” and place the incoming records in a state store. For the KStream representing the “values”, you’d place those records in another state store. Then using a punctuation, you could iterate over the “values” state store looking for the corresponding key from the “keys” state store, and when a join occurs, delete the entry from both stores. But if what you have currently works for you, I would be inclined to stick with it.

One of the biggest priorities we have is to minimize processing-time in the application. Because of this, I think having a “forced” punctuation delay only works as a last resort.

The current solution does unfortunately not work as I do not want to have the application produce records to its own input topic, thus creating duplicate records.

I like your suggestion ofimplementing another state store. Maybe an option could be to somehow insert a state store before the join-operation in the “values” stream. That being: meaning that “values” would directly flow from the input topic to the join operation. If the join operation is unsuccessful, the “value” is sent to the new state store. This probably means that I need another step before the join-operation that allows the stream of the input topic and the punctuated stream of the state store to be combined somehow. Hmm…

For additional context; one key can match multiple values, but not the other way around. The input topic of the keys rarely receives updates, while the topic for the values receive approximately 5000 new records every minute, give or take a couple of seconds.