Currently I am using following code with Confluent-Kafka library. But with this code I can read the newest message from the defined partition. But the Kafka server is writing the values of my topic each time to a different partition(Partition 0,1,2). So the last (newest) message in the partition is not always the newest message that has been sent to Kafka from the source side.
How can I adapt my code for three partitions? Is there a simple function for that? Or do I have to read each time from all partitions the message with Offset.End,check the timestamp, and decide which one is the newest one?
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("My_Topic");
//while (!cancellationToken.IsCancellationRequested)
while (var_true)
{
TopicPartitionOffset tps = new TopicPartitionOffset(new TopicPartition("My_Topic", 1),Offset.End);
consumer.Assign(tps);
var consumeResult = consumer.Consume(cancellationToken);
Kafka_message_total = consumeResult.Message.Value;
// additional code to send the message value to an application
System.Threading.Thread.Sleep(2000);
}
consumer.Close();
}