How can I reset the topic offset while consuming from Kafka topic

Hello
I start to consume from the last message of a Kafka topic with following code.
What I want to do is:

  1. I want to consume the last 50 messages
  2. wait for 5 seconds and consume again the last 50 messages
    (The data in between for 5 seconds shall be discarded)
    This should be done cyclic in the while loop.

I have following code: Currently when consume 50 messages, wait for 5 seconds and proceed with consuming, the consuming will be proceeded where it was paused.
I have marked the line below where I should be able to reset the offser to the last message.

    public async Task Read_Status_from_Kafka_test2()
    {
        Int32 counter = 0;
        config = new ConsumerConfig()
        {
            BootstrapServers = "servers"
            GroupId = "foo3",
            AutoOffsetReset = AutoOffsetReset.Latest,
            EnableAutoCommit = false,
        };
        using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            c.Subscribe("myTopic");
            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        counter += 1;
                        Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
                        if (counter>50)
                        {
                            counter = 0;
                            // here I want to set the offset again to the last message !!!!!!!!!                        
                            Thread.Sleep(5000);
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }

    }

Try the following to seek to the end (warning: not tested :slight_smile: ):

var offsets = consumer.Assignment.Select(partition => new TopicPartitionOffset(partition, Offset.End));
foreach (var offset in offsets) {
    consumer.Seek(offset);
}
1 Like

Hello Dave

I have tested it for a while and it works like a charm ! Thank you so much. I thought that without stopping the consumer, this would not be possible, but I see it is ! :smile:

1 Like