Read messages from the last x seconds of a topic

Lets say 30 producers are sending data to the same Kafka topic every 2 seconds at the same time. From my C# code I want to read cyclic (every 2 seconds) the messages only from the last 2 seconds. How can I do that? I tried a lot but couldn’t succeed. Could you share a very simple sample code?

You can accomplish this by getting the offsets corresponding to the desired timestamp (2 seconds ago) via OffsetsForTimes, and then seeking to those offsets:

consumer.Subscribe(topic);

...

var seekTimestamp = new Timestamp(DateTime.Now.Subtract(TimeSpan.FromSeconds(2)));
var offsets = consumer.OffsetsForTimes(
    consumer.Assignment.Select(partition => new TopicPartitionTimestamp(partition, seekTimestamp)),
    /* insert TimeSpan timeout */);

foreach (var offset in offsets) {
    consumer.Seek(offset);
}

...

while (true) {
    var cr = consumer.Consume(cts.Token);
    ...
}

Hello I have completed the code, according to your answer as following. But I think I make something wrong. My goal is to read every two seconds the messages of the last two seconds. That would be in ideal case 30 messages (from 30 consumers that produce data at the same time to the same topic). But I get every 2 seconds just one message with my code. The reason is clear. For every while loop I get one result. What could be wrong in my code?

    public async Task Read_data_from_Kafka()
    {
        await Task.Delay(1);
        string item_value = "";
        string inventar = "";
        string timestamp = "";
        config = new ConsumerConfig()
        {
            BootstrapServers = servers,
            GroupId = "foo",
            AutoOffsetReset = AutoOffsetReset.Latest,                
            EnableAutoCommit = false,
        };
        
        consumer_2 = new ConsumerBuilder<Ignore, string>(config).Build();

        TopicPartition topicPartition_0 = new TopicPartition("my_topic", new Partition(0));
        
        consumer_2.Subscribe("my_topic");

        var seekTimestamp = new Timestamp(DateTime.Now.Subtract(TimeSpan.FromSeconds(2)));
        var offsets = consumer_2.OffsetsForTimes(
        consumer_2.Assignment.Select(partition => 
        
        new TopicPartitionTimestamp(topicPartition_0, seekTimestamp)), TimeSpan.FromMilliseconds(10000));
          

        foreach (var offset in offsets)
        {
            consumer_2.Seek(offset);
        }            

        while (true)
        {
            var consumeResult = consumer_2.Consume(CancellationToken.None);
            using (StreamWriter sw = File.AppendText("D:\\test\\kafka_messages.txt"))
            {                    
                sw.WriteLine("Kafka message: " + consumeResult.Message.Value + " " + Convert.ToString(DateTime.Now));
            }
            Thread.Sleep(2000);
            //break;
        }
    }

This is expected - in each iteration you’re calling Consume which just gets one result. You’d need to keep going. There isn’t a single method you can call to get all within a time window.

To do this and avoid dupes, you’d need a block of code that seeks to two seconds ago and stops when the message timestamps go beyond the given two second window. And then it gets tricky to handle the case where the consumer can’t keep up. Are you sure you need to consume in this 2 second window way? Why not avoid the 2 second windows and just consume as events come in? You wouldn’t have to worry about tracking state, carefully handling the 2 second window boundaries to avoid dupes / skipping messages, and handling the case where the consumer gets behind.

Hello Dave
Thank you for the great suggestion. Sorry, as I am recently working with Kafka, I would ask how I could realise in this case the event based consuming. Could you give a very basic example for that? I mean how will I be informed of an event , so that I can just consume in this case.

Thanks in advance

Streaming apps often keep running, i.e., you would Consume in a loop. The consumer is always “on” and continuously consumes. In this case, there’s no need to separately be informed of an event and then subsequently consume it. You subscribe and consume, and depending on the Consume method you’re using, you can either have it return if no results are available within a time span, or wait until canceled.

It doesn’t always have to be this “run forever” way though. What kind of application are you working on and what kind of event produce / consume pattern goes with it?

hello Dave

I have explained my use case in the following topic: