I am trying to implement a near-realtime reporting system based on Kafka streams. I am doing a series of windowed aggregations from Kstreams into KTables and the end goal is to use Interactive Queries to allow clients to read these aggregations. The system works ok until the application is shut down and restarted, when I observed that it will no longer find the previous aggregations that it used to return. It (most probably) will just read from the last committed offset and return that data. Is there something that I can do to avoid this ? (Since it is a reporting system, I would like it to be consistent on every request from the past). Is there a configuration that I can use or is it just that kafka-streams and/or interactive queries are not the right tool for the job and that I should just collect the data from the aggregated KTables and just dump it in a third party system which I can query more easily?
Can you elaborate what you mean by this? Maybe give a concrete example with data?
Sure I can. It can be reproduced with (probably) one of the most known repositories for people starting to learn Kafka:
You can clone this repository and follow the steps in the Readme file to start the environment with docker-compose, produce test data like it is in the Readme then start the application using Gradle.
If you then call curl localhost:7000/bpm/all
, it will output {"[1@1606122180000/1606122240000]":14,"[1@1606122120000/1606122180000]":120}
as presented in the Readme and as expected.
If you then stop the Java application, and restart it again (using the same command with Gradle), calling the same command curl localhost:7000/bpm/all
will output {}
. Likewise, trying to do a ranged query (curl localhost:7000/bpm/range/1/1606122120000/1606122180000
) will output []
.
If you go and repeat the Produce test data steps again to produce to the 2 topics, querying the REST Api again will behave as expected previously, which is why I believe that the streams application will somehow store the last read offset and, if the application restarts, will just start reading again everything afterwards but not before that offset (which probably is a good thing for a streaming application, but not a good thing if you want to query the history again).
Let me know if you need more information. Thank you!
Looking into the code, it seem to be correct. – What should happen after a restart is, that Kafka Streams reload it’s previous state into the state store. However, as long as the state store is rebuilding, it cannot be queried; by default, you can only query it after recovery finished.
Couple of question:
- How long did you wait after restart before you queried?
- Did you inspect the logs of the application for errors (for example all() would throw an exception if the state store is not fully rebuilt yet).
- Did you monitor the
KafkaStreams
client state? As long as it’s not inRUNNING
state, you cannot query the stores (at least not by default; you would need to tell the system that you want to allow querying it, what could also mean that it might return stale data). Kafka Streams logs state transition and also expose it as JMX metrics. You can also access it programmatically if you want (eg,KafkaStreams#state()
orKafkaStream#setStateListener(...)
).
Hello,
Thank you for taking the time to reply. To answer your questions:
- I waited 1 minute and I also waited 15 minutes. The results were the same (no data could be read from the state store from the past)
2 + 3. I (wanted to attach) 2 logs, one from the first run of the application, when the application behaves correctly and one from the second run (after application stop + restart) when it does not find the previous values. The KafkaStreams client state is RUNNING before trying to query. (Apparently I can only attach pictures so I will post 2 links to pastebin with the logs)
Let me know what other information you would need. Also, if you have any sample application that behaves as expected after a restart, I would be happy to test it.
Thank you.
Correct run link: Correct run
Incorrect run link: Incorrect run
The logs for both runs say:
Finished restoring changelog […] with a total number of 0 records
For the first run, it’s expected, but not for the second.
It seems the data is not in the changelog any longer. I dug around a little more in the GitHub repo, and I think the issue is the input data in combination how data retention works in Kafka brokers (which is different to how Kafka Streams handles it).
The data encodes timestamps in the payload, and a custom timestamp extractor is used. The extracted timestamps are old, leading to the issue.
On your first run, input data is consumed, processed, and state stores are updated. At the same time, we write into the changelog topics. For writing into the changelog topics, we use the extracted (old timestamps) from the payload which are set a record timestamps. Here the issue starts: broker compare record timestamps to current wall-clock time to determine if data is expired – because the timestamps are older than retention time, the broker delete the data quickly after they where written. – At the same time, the state stores still hold on to the data and you can query it. Kafka Streams does not take wall-clock time into account to delete data, but compare event-time to “stream-time” (stream-time = max(event-time-seen)).
After a restart, you lost your state stores and we try to recover from the changelogs. However, the changelogs did expire data and thus you end up with empty state stores.
If you modify the input data to use recent timestamp in the payload, it should avoid the issue.
The issue does not apply to the input topic, because the old timestamps are in the payload, while the record timestamp are current.
Thank you so much for the detailed explanation. I changed the timestamps in the input data and the application works as you described it.
Also, thank you for the insight into how Kafka streams and broker behave in this situation, it is a reply that really helps me learn more.
This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.