In my usecase tha data source (CNC machine) is sending data (flow value of a liquid) to a Kafka topic in one second interval. There a machanism in the source side that provides that the data is send to Kafka in the correct order. The Kafka topic has three partitions (that’s a mandatory standard in our company). Kafka is deciding itself where to write the incoming data each time. (This I can also not change).
With my C# code I have to draw a live chart of the flow data. (I use Confluent-Kafka Library) Therefore I try to read always the last incoming data from the CNC machine to Kafka. As Kafka writes the data each time to another partition, I do following every second:
1) I read from all three partitions P0,P1,P2 the last incoming (the newest) data message with Offset.End.
2) I compare the timestamps of these "newest" messages of P0,P1 and P2
3) I take the one with the greatest Timestamp . I pass this data to my live chart.
But in my code, I think that I do something wrong, because one data reading cycle from Kafka takes in avarage 7-8 seconds. (I have marked the part that takes 2-3 seconds for each partition).
Do I have a mistake in my code? Or is my srategie wrong? I can definitly say that there is no network problem or Kafka performance problem, as other consumers can consume data without problem from the same Kafka server.
My code:
public void Read_from_Kafka()
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = server_uri,
GroupId = "foo",
AutoOffsetReset = AutoOffsetReset.Latest,
SecurityProtocol = SecurityProtocol.Ssl,
SslCaLocation = path_1,
SslCertificateLocation = path_2,
SslKeyLocation = path_3,
SslKeyPassword = password_string,
EnableAutoCommit = false
};
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("My_Topic");
while (var_true)
{
//Reading newest data from partition-0
TopicPartitionOffset tps_0 = new TopicPartitionOffset(new TopicPartition("My_Topic", 0),Offset.End);
// this part of code takes 2-3 seconds-start*****
consumer.Assign(tps_0);
var consumeResult_0 = consumer.Consume(cancellationToken);
// this part of code takes 2-3 seconds-stop*****
Kafka_message_P0 = consumeResult_0.Message.Value;
//Reading newest data from partition-1
TopicPartitionOffset tps_1 = new TopicPartitionOffset(new TopicPartition("My_Topic", 1), Offset.End);
// this part of code takes 2-3 seconds-start*****
consumer.Assign(tps_1);
var consumeResult_1 = consumer.Consume(cancellationToken);
// this part of code takes 2-3 seconds-stop*****
Kafka_message_P1 = consumeResult_1.Message.Value;
//Reading newest data from partition-2
TopicPartitionOffset tps_2 = new TopicPartitionOffset(new TopicPartition("My_Topic", 2), Offset.End);
// this part of code takes 2-3 seconds-start*****
consumer.Assign(tps_2);
var consumeResult_2 = consumer.Consume(cancellationToken);
// this part of code takes 2-3 seconds-stop*****
Kafka_message_P2 = consumeResult_2.Message.Value;
// Reading the time stamps of the last written message in each partition, and finding out the newest (most actual) data.
if(TimeStamp_dateTime_P0> TimeStamp_dateTime_P1 && TimeStamp_dateTime_P0 > TimeStamp_dateTime_P2)
{
newest_Kafka_value = Kafka_value_P0;
}
if (TimeStamp_dateTime_P1 > TimeStamp_dateTime_P0 && TimeStamp_dateTime_P1 > TimeStamp_dateTime_P2)
{
newest_Kafka_value = Kafka_value_P1;
}
if (TimeStamp_dateTime_P2 > TimeStamp_dateTime_P1 && TimeStamp_dateTime_P2 > TimeStamp_dateTime_P0)
{
newest_Kafka_value = Kafka_value_P2;
}
// send this data to live chart
System.Threading.Thread.Sleep(1000);
}
consumer.Close();
}
}
catch(Exception ex)
{
using (StreamWriter sw = File.AppendText(error_log))
{
sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
}
}
}