I have a list of offsets with their corresponding partition and I need to commit them manually.
To do so I am looping through the list and assigning partition to the consumer and then seeking to a particular offset.
then I am consuming the message and passing the ConsumerBulider to commit method.
Sometimes it executes smoothly but sometimes it throws “Local:Waiting for Coordinator” exception.
But in both the cases , when I try consuming messages afterwards I re-consume the same series of messages I already have committed or should I say I tried committing. Which means I never really could commit them
foreach(var item in cmdparamslist)
{
Partition p = new Partition(Int16.Parse(item.PartitionID));
TopicPartition tp = new TopicPartition(configuration.GetSection(“KafkaSettings”).GetSection(“Topic”).Value, p);
Offset o = new Offset(long.Parse(item.Offset));
TopicPartitionOffset tpo = new TopicPartitionOffset(tp,o);
//Ltpo.Add(tpo);
try
{
KafkaConsumer.Assign(tpo);
await Task.Delay(TimeSpan.FromSeconds(1));
KafkaConsumer.Seek(tpo);
var cr = KafkaConsumer.Consume(cts.Token);
try
{
KafkaConsumer.Commit(cr);
}
catch (TopicPartitionOffsetException e1)
{
Console.WriteLine("exception "+e);
}
catch (KafkaException e)
{
Console.WriteLine("exception "+e);
}
}
catch (KafkaException e)
{
Console.WriteLine("exception "+e);
}
}
KafkaConsumer.Close();
}
catch(Exception e)
{
Console.WriteLine("exception "+e);
}
}
Consumer / Client configuration:
var conf = new ConsumerConfig
{
GroupId = Guid.NewGuid().ToString(),
BootstrapServers = configuration.GetSection(“KafkaSettings”).GetSection(“RemoteServers”).Value,
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslMechanism = SaslMechanism.Gssapi,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
EnableAutoCommit = false
//EnableAutoOffsetStore = false
};
I am using Confluent.Kafka 1.6.2 version
Could someone please help me ?