Hello
In my lightweigt C# .NET app I have to visualise the data, of a Kafka topic in a simple live chart. Below I have listed the native requirements of my use case. Accordingly to that I am searching an easy way for consuming this “native-live” data from the Kafka topic to my C# app.
- appr. 40 producers are sending data to the same Kafka topic (with 3 partitions) cyclic every second. One json telegram of each producer is containing 15 keys. I have to visualise 4-5 of this keys (-values).
- When my app is started I have to show just the data after connection to Kafka is established (historical data is not needed)
- My live chart shall be updated every 1-2 second.
- In case of communication is broken to Kafka, and re-established, the lost historical data is not needed.
- Data transmission delay from Kafka to my app up to 500 ms is acceptable
- As in our companies standard data architecture data is always send first to Kafka, I have to get the data over Kafka, I have no other chance to get data directly from the producers.
For starting point I have written your following sample code with Confluent Kafka for consuming data from the Kafka topic. Here I have noticed that when I run this code continiously, my consuming speed cannot catch the speed of producers. (I have compared in the log file the message timestamp and my currently dateandtime) . The speed ratio is appr. 4/5, I mean after 50 seconds of running the code I can see as last consumed data , the data of 10 seconds ago. Of course this difference is getting larger with time.
How should proceed in this case? What methods/tools of Confluent Kafka can/should I use for realising this low requirement-use case?
public async Task Read_data_from_Kafka()
{
config = new ConsumerConfig()
{
BootstrapServers = "servers",
GroupId = "foo3",
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = false,
};
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
c.Subscribe("my_topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
using (StreamWriter sw = File.AppendText("D:\\test\\kafka_messages.txt"))
{
sw.WriteLine("Kafka message: " + cr.Message.Value + " " + Convert.ToString(DateTime.UtcNow.AddHours(3).ToString("yyyy-MM-dd HH:mm:ss.fff",
CultureInfo.InvariantCulture)));
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}