Hello
I start to consume from the last message of a Kafka topic with following code.
What I want to do is:
- I want to consume the last 50 messages
- wait for 5 seconds and consume again the last 50 messages
(The data in between for 5 seconds shall be discarded)
This should be done cyclic in the while loop.
I have following code: Currently when consume 50 messages, wait for 5 seconds and proceed with consuming, the consuming will be proceeded where it was paused.
I have marked the line below where I should be able to reset the offser to the last message.
public async Task Read_Status_from_Kafka_test2()
{
Int32 counter = 0;
config = new ConsumerConfig()
{
BootstrapServers = "servers"
GroupId = "foo3",
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = false,
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
c.Subscribe("myTopic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
counter += 1;
Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
if (counter>50)
{
counter = 0;
// here I want to set the offset again to the last message !!!!!!!!!
Thread.Sleep(5000);
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}