Cyclic data reading from topic partitions takes too long (7-8 second) What could be the problem?

In my usecase tha data source (CNC machine) is sending data (flow value of a liquid) to a Kafka topic in one second interval. There a machanism in the source side that provides that the data is send to Kafka in the correct order. The Kafka topic has three partitions (that’s a mandatory standard in our company). Kafka is deciding itself where to write the incoming data each time. (This I can also not change).

With my C# code I have to draw a live chart of the flow data. (I use Confluent-Kafka Library) Therefore I try to read always the last incoming data from the CNC machine to Kafka. As Kafka writes the data each time to another partition, I do following every second:

1) I read from all three partitions P0,P1,P2 the last incoming (the newest) data message with Offset.End.

2) I compare the timestamps of these "newest" messages of P0,P1 and P2

3) I take the one with the greatest Timestamp . I pass this data to my live chart.

But in my code, I think that I do something wrong, because one data reading cycle from Kafka takes in avarage 7-8 seconds. (I have marked the part that takes 2-3 seconds for each partition).

Do I have a mistake in my code? Or is my srategie wrong? I can definitly say that there is no network problem or Kafka performance problem, as other consumers can consume data without problem from the same Kafka server.

My code:

public void Read_from_Kafka()
    {
        try
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = server_uri,
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Latest,
                SecurityProtocol = SecurityProtocol.Ssl,
                SslCaLocation = path_1,
                SslCertificateLocation = path_2,
                SslKeyLocation = path_3,
                SslKeyPassword = password_string,
                EnableAutoCommit = false
            };

            CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken cancellationToken = source.Token;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                consumer.Subscribe("My_Topic");
                while (var_true)
                {
                    //Reading newest data from partition-0
                    TopicPartitionOffset tps_0 = new TopicPartitionOffset(new TopicPartition("My_Topic", 0),Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_0);
                    var consumeResult_0 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P0 = consumeResult_0.Message.Value;                                          
                    
                    //Reading newest data from partition-1
                    TopicPartitionOffset tps_1 = new TopicPartitionOffset(new TopicPartition("My_Topic", 1), Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_1);
                    var consumeResult_1 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P1 = consumeResult_1.Message.Value;
                                            
                    //Reading newest data from partition-2
                    TopicPartitionOffset tps_2 = new TopicPartitionOffset(new TopicPartition("My_Topic", 2), Offset.End);
                    // this part of code takes 2-3 seconds-start*****
                    consumer.Assign(tps_2);
                    var consumeResult_2 = consumer.Consume(cancellationToken);
                    // this part of code takes 2-3 seconds-stop*****
                    Kafka_message_P2 = consumeResult_2.Message.Value;
                                           
                    // Reading the time stamps of the last written message in each partition, and finding out the newest (most actual) data.
                    if(TimeStamp_dateTime_P0> TimeStamp_dateTime_P1 && TimeStamp_dateTime_P0 > TimeStamp_dateTime_P2)
                    {
                        newest_Kafka_value = Kafka_value_P0;
                    }
                    if (TimeStamp_dateTime_P1 > TimeStamp_dateTime_P0 && TimeStamp_dateTime_P1 > TimeStamp_dateTime_P2)
                    {
                        newest_Kafka_value = Kafka_value_P1;
                    }
                    if (TimeStamp_dateTime_P2 > TimeStamp_dateTime_P1 && TimeStamp_dateTime_P2 > TimeStamp_dateTime_P0)
                    {
                        newest_Kafka_value = Kafka_value_P2;
                    }

                    // send this data to live chart
                    System.Threading.Thread.Sleep(1000);
                }
                consumer.Close();
            }
        }
        catch(Exception ex)
        {
            using (StreamWriter sw = File.AppendText(error_log))
            {
                sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
            }
        }           
        
    }

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.