Hi again, I’ve got a further question which I think is related closely enough to this that it’s worth asking here instead of beginning a new thread.
How would you detect reaching the end of a topic?
I am currently working on a demo application which shows how to build an exactly once stateful processor. (It performs a very simple operation, calculating a rolling mean value.)
A question has occured to me while writing it.
As part of the startup operation, the process should read the entire changelog topic history. (Albeit that the topic might be set to “compacted”.)
What would the typical way be to detect the end of the topic being reached? Without knowing this there is no way to guarentee that all messages have been consumed and that the internal state is now up to date with the changelog topic. (Unless I perhaps misunderstand how this should be done?)
One possble approach could be to set poll to a “reasonably large” value, and assume the end of topic is reached if poll returns “None”. (Or other indication of no message for other languages.)
In principle the poll timeout is dependent on network conditions such as latency and reliability (packet loss rate).
A sensible value might be between 30 seconds and 1 minute. That should cover all but the most unreliable of networks. For applications on local networks in datacenters, a few seconds is probably more than sufficient.
My thoughts are that it would be possible to trick a consumer into thinking that it has reached the end of a changelog topic by disconnecting a network cable at the right time and then reconnecting it after the call to consumer.poll()
has timed-out.
Of course, there might be other more likely failure cases. A router or switch power-cycling might produce the same effect.
Just a couple of points from my experience so far:
Some time ago, I wrote some software which interfaced with Kafka with Rust. (The Rust crate uses the C/C++ libkafka library.) Assuming I didn’t miss something, there wasn’t an easy way to detect an end of topic condition using this library.
More recently, I have been interfacing with Kafka with Python. According to this reference it should be possible to detect an end of topic condition using message.error()
.
However I haven’t been able to get this to work. (I just see the message is “None” condition, not an error()
condition.) The Python code from that reference is the only example code I know of which indicates it should be possible to detect an end of topic condition.
In some ways this makes sense. Since Kafka is designed to support streaming applications to ask the question “have I reached the end of a stream” is a bit weird… becuase streams never really end. On the other hand, in this case, we need some mechanism to know when all the changelog topic data has been read.
I also had a look at the JavaDoc for Consumer
and ConsumerRecord
but didn’t find any information which specified how an end of topic condition might differ from a regular poll timeout which returns an “empty record”.
I tried to have a search around in the Kafka Streams code but didn’t make much progress here. Do you know how Kafka Streams manages this problem?