Consuming data from Kafka topic for a leight weight Live-Chart app. in .NET

Hello

In my lightweigt C# .NET app I have to visualise the data, of a Kafka topic in a simple live chart. Below I have listed the native requirements of my use case. Accordingly to that I am searching an easy way for consuming this “native-live” data from the Kafka topic to my C# app.

  1. appr. 40 producers are sending data to the same Kafka topic (with 3 partitions) cyclic every second. One json telegram of each producer is containing 15 keys. I have to visualise 4-5 of this keys (-values).
  2. When my app is started I have to show just the data after connection to Kafka is established (historical data is not needed)
  3. My live chart shall be updated every 1-2 second.
  4. In case of communication is broken to Kafka, and re-established, the lost historical data is not needed.
  5. Data transmission delay from Kafka to my app up to 500 ms is acceptable
  6. As in our companies standard data architecture data is always send first to Kafka, I have to get the data over Kafka, I have no other chance to get data directly from the producers.

For starting point I have written your following sample code with Confluent Kafka for consuming data from the Kafka topic. Here I have noticed that when I run this code continiously, my consuming speed cannot catch the speed of producers. (I have compared in the log file the message timestamp and my currently dateandtime) . The speed ratio is appr. 4/5, I mean after 50 seconds of running the code I can see as last consumed data , the data of 10 seconds ago. Of course this difference is getting larger with time.

How should proceed in this case? What methods/tools of Confluent Kafka can/should I use for realising this low requirement-use case?

   public async Task Read_data_from_Kafka()
    {            
        config = new ConsumerConfig()
        {
            BootstrapServers = "servers",

            GroupId = "foo3",
            AutoOffsetReset = AutoOffsetReset.Latest, 
            EnableAutoCommit = false,
        };
        using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
        {
            c.Subscribe("my_topic"); 
            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        using (StreamWriter sw = File.AppendText("D:\\test\\kafka_messages.txt"))
                        {
                            sw.WriteLine("Kafka message: " + cr.Message.Value + " " + Convert.ToString(DateTime.UtcNow.AddHours(3).ToString("yyyy-MM-dd HH:mm:ss.fff",
                                        CultureInfo.InvariantCulture)));
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }

    }