Exactly once processing and Stateful Processors

@abellemare

Hi again, I’ve got a further question which I think is related closely enough to this that it’s worth asking here instead of beginning a new thread.

How would you detect reaching the end of a topic?

I am currently working on a demo application which shows how to build an exactly once stateful processor. (It performs a very simple operation, calculating a rolling mean value.)

A question has occured to me while writing it.

As part of the startup operation, the process should read the entire changelog topic history. (Albeit that the topic might be set to “compacted”.)

What would the typical way be to detect the end of the topic being reached? Without knowing this there is no way to guarentee that all messages have been consumed and that the internal state is now up to date with the changelog topic. (Unless I perhaps misunderstand how this should be done?)

One possble approach could be to set poll to a “reasonably large” value, and assume the end of topic is reached if poll returns “None”. (Or other indication of no message for other languages.)

In principle the poll timeout is dependent on network conditions such as latency and reliability (packet loss rate).

A sensible value might be between 30 seconds and 1 minute. That should cover all but the most unreliable of networks. For applications on local networks in datacenters, a few seconds is probably more than sufficient.

My thoughts are that it would be possible to trick a consumer into thinking that it has reached the end of a changelog topic by disconnecting a network cable at the right time and then reconnecting it after the call to consumer.poll() has timed-out.

Of course, there might be other more likely failure cases. A router or switch power-cycling might produce the same effect.

Just a couple of points from my experience so far:

Some time ago, I wrote some software which interfaced with Kafka with Rust. (The Rust crate uses the C/C++ libkafka library.) Assuming I didn’t miss something, there wasn’t an easy way to detect an end of topic condition using this library.

More recently, I have been interfacing with Kafka with Python. According to this reference it should be possible to detect an end of topic condition using message.error().

However I haven’t been able to get this to work. (I just see the message is “None” condition, not an error() condition.) The Python code from that reference is the only example code I know of which indicates it should be possible to detect an end of topic condition.

In some ways this makes sense. Since Kafka is designed to support streaming applications to ask the question “have I reached the end of a stream” is a bit weird… becuase streams never really end. On the other hand, in this case, we need some mechanism to know when all the changelog topic data has been read.

I also had a look at the JavaDoc for Consumer and ConsumerRecord but didn’t find any information which specified how an end of topic condition might differ from a regular poll timeout which returns an “empty record”.

I tried to have a search around in the Kafka Streams code but didn’t make much progress here. Do you know how Kafka Streams manages this problem?

How would you detect reaching the end of a topic?

Just read the changelog until you get 0 records back. The changelog partition should NOT be getting updated while you are restoring state to it, since each partition should only be assigned to a single instance. The instance that is reading the changelog to restore state is the only instance that can write to the changelog - thus, if it’s reading from the changelog, it cannot be writing to it.

I believe you won’t get “0 records” back in the event of a network failure - you may get a timeout error, or a connection error, but you should only get 0 records returned when the request succeeds and there are 0 records left.

I went to look at the source code and it seems that while what I said used to be true, it now looks like Kafka Streams records the changelog offsets and feeds them in when restoring the data. kafka/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java at bf3f088c944763d8418764064f593d0bf06fbcc3 · apache/kafka · GitHub

I’m afraid I’m not going to be able to help you a lot on this as it will be fairly time intensive. If you’re looking to replicate changelogs from Kafka Streams you’re going to have to walk through the code yourself and familiarize yourself with it. IYou may find it helpful to run Kafka locally, and attach a debugger to walk through the process of storing changelogs and restoring changelogs.

Good luck!

However, you cannot be writing to the changelog partition while you are reading from it, otherwise you will never be able to catch up. If you don’t get 0 records, you aren’t caught up.

1 Like

Thanks again for a helpful response, I’m very greatful and appreciative for the effort.

I did some testing with plain consumers.

  • I created a new broker on a seperate machine connected to my network via a single physical cable link
  • I produced 1M ‘helloworld’ messages to the broker
  • I ran a consumer, and while it was consuming I disconnected the network cable (code below)

I have found that if I disconnect a network cable connecting between a broker host and a consumer host, I see the following behaviour.

message offset: 131096: b'helloworld'
message offset: 131097: b'helloworld'
message offset: 131098: b'helloworld'
message offset: 131099: b'helloworld'
message offset: 131100: b'helloworld'
message is None
message is None
message is None
message is None
%4|1710877256.793|SESSTMOUT|consume_to_end_test.consumer#consumer-1| [thrd:main]: Consumer group session timed out (in join-state steady) after 45498 ms without a successful response from the group coordinator (broker 1, last error was Success): revoking assignment and rejoining group
%5|1710877259.797|REQTMOUT|consume_to_end_test.consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out HeartbeatRequest in flight (after 45503ms, timeout #0)
%4|1710877259.797|REQTMOUT|consume_to_end_test.consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1710877259.797|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: 192.168.0.63:9092: 1 request(s) timed out: disconnect (after 51513ms in state UP)
message is None
%3|1710877262.861|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: 192.168.0.63:9092: Connect to ipv4#192.168.0.63:9092 failed: No route to host (after 3064ms in state CONNECT)
%3|1710877265.933|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: 192.168.0.63:9092: Connect to ipv4#192.168.0.63:9092 failed: No route to host (after 2890ms in state CONNECT, 1 identical error(s) suppressed)
message is None
%5|1710877273.811|REQTMOUT|consume_to_end_test.consumer#consumer-1| [thrd:192.168.0.63:9092/bootstrap]: 192.168.0.63:9092/1: Timed out FetchRequest in flight (after 61341ms, timeout #0)
%4|1710877273.811|REQTMOUT|consume_to_end_test.consumer#consumer-1| [thrd:192.168.0.63:9092/bootstrap]: 192.168.0.63:9092/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
%3|1710877273.811|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:192.168.0.63:9092/bootstrap]: 192.168.0.63:9092/1: 1 request(s) timed out: disconnect (after 65531ms in state UP)
%3|1710877276.429|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:192.168.0.63:9092/bootstrap]: 192.168.0.63:9092/1: Connect to ipv4#192.168.0.63:9092 failed: No route to host (after 2618ms in state CONNECT)
%3|1710877279.498|FAIL|consume_to_end_test.consumer#consumer-1| [thrd:192.168.0.63:9092/bootstrap]: 192.168.0.63:9092/1: Connect to ipv4#192.168.0.63:9092 failed: No route to host (after 2815ms in state CONNECT, 1 identical error(s) suppressed)
message is None
message is None

I am wondering - is this a bug? Perhaps I misunderstand something about how to detect the end of topic condition? The consumer thread is producing log messages but these error conditions don’t appear to be propagated back to the main thread consumer? (I just see the same “message is None” as would be the normal case for having no messages to consume having reached the end of the topic.)

Example code:

#!/usr/bin/env python3

from confluent_kafka import Consumer
from confluent_kafka import KafkaError

def create_bootstrap_servers_string(server_address: str, port: str) -> str:
    return f'{server_address}:{port}'

def create_kafka_consumer(
    config: dict,
) -> Consumer:

    consumer = Consumer(config)
    return consumer

def get_default_kafka_consumer_config(
    bootstrap_servers: str,
    client_id: str,
    group_id: str,
) -> dict:

    return {
        'bootstrap.servers': bootstrap_servers,
        'group.id': group_id,
        'client.id': client_id,
        'enable.auto.commit': False,
        'auto.offset.reset': 'earliest',
        'isolation.level': 'read_committed',
    }

def main():

    address = '192.168.0.63'
    port = '9092'

    bootstrap_servers = create_bootstrap_servers_string(address, port)

    print(f'{bootstrap_servers}')

    consumer = create_kafka_consumer(
        config=get_default_kafka_consumer_config(
            bootstrap_servers=bootstrap_servers,
            client_id='consume_to_end_test.consumer',
            group_id='consume_to_end_test.consumer',
        )
    )

    topic = 'long.topic'
    consumer.subscribe([topic])

    while True:

        message = consumer.poll(10.0)

        if message is None:
            print(f'message is None')
        else:
            print(f'message offset: {message.offset()}: {message.value()}')

            if message.error():
                print(f'message is Error')

                if message.error().code() == KafkaError._PARTITION_EOF:
                    print(f'end of topic detected')
                    break
                else:
                    print(f'{message.error()}')
            else:
                # throw away data
                #print(f'consume data')
                pass

    consumer.unsubscribe()
    consumer.close()


if __name__ == '__main__':
    main()