I want to read just the last (newest) message in a Kafka topic

Hello
I want to read just the latest (newest) data (JSON message) from a Kafka topic. I will do that periodically and draw a live chart with this data. I use Confluent.Kafka library for .NET in C#.
I am using following code, but I start reading data somewhere from the past (16 hours at least).

public void Read_from_Kafka()
{
try
{
    var  config = new ConsumerConfig
    {
        BootstrapServers = kafka_URI,
        GroupId = "group",
        AutoOffsetReset = AutoOffsetReset.Latest,
        SecurityProtocol = SecurityProtocol.Ssl,
        SslCaLocation = "path1",
        SslCertificateLocation = "path2",
        SslKeyLocation = "path3",
        SslKeyPassword = "password",                  

    };

    CancellationTokenSource source = new CancellationTokenSource();
    CancellationToken cancellationToken = source.Token;

    using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
    {
        consumer.Subscribe(topic_name);
        while (!cancellationToken.IsCancellationRequested)                    
        {
            var consumeResult = consumer.Consume(cancellationToken);
            Kafka_message_total = consumeResult.Message.Value;

            using (StreamWriter sw = File.AppendText(json_log_file))
            {
                sw.WriteLine("JSON: " + Kafka_message_total + " " + Convert.ToString(DateTime.Now));
            }                        
            System.Threading.Thread.Sleep(2000);
        }
        consumer.Close();
    }
}

catch(Exception ex)
{

    using (StreamWriter sw = File.AppendText(error_log))
    {
        sw.WriteLine("Kafka Read Error: " + ex + " " + Convert.ToString(DateTime.Now));
    }
}

I’ve removed my previous answer, it wasn’t correct. All you need to do is use the Consumer.seek method and use the Offset.End to specify the end of the topic partition

I would look like this:

consumer.Seek(new TopicPartitionOffset("foo", new Partition(1), Offset.End));

HTH

Hello, thanks for reaching out.

I’m not actually sure if I fully understand the question. Are you trying to only read the last 16 hours of data? Or are you trying to read only the latest data and actually seeing data from at least 16 hours ago?

In any case, I have a few comments that may help depending on what you are trying to achieve.

The setting AutoOffsetReset.Latest applies only when your consumer connects and doesn’t know where to start. However, once it starts committing offsets, it’s going to use that to determine where to start. Imagine if you connect the consumer, read data, and commit offsets and then disconnect it for 16 hours. When you reconnect it will start from the last committed offset (i.e. 16 hours ago), rather than starting from the latest. If that’s not what you want then you will need to either change how you commit offsets, or use a technique like @bbejeck suggested to seek to a specific location.

I don’t see a value for EnableAutoCommit in your config. That means that it will be using the default (which is true). So a background thread will be running to periodically commit offsets. This means that you don’t really control when your offsets are being committed. If you are not worried about consuming every message, that may not be a problem. However, if you were expecting to consume every message then this setting by itself is insufficient. You would want to manually store your offsets (see ConsumerConfig.EnableAutoOffsetStore and IConsumer.StoreOffset) or manually commit your offsets (see IConsumer.Commit).

Hopefully that helps, but if you need more assistance, let us know.