Hello all,
I’m writing a Windows Service to consume certain messages from a Kafka server and I have a problem with it.
I can consume messages (my sample code is below), and it works for about 2-3 hours.
After that time I get errors that the partition is revoked. So I tried using a fixed time in the Consume method (with value 300000) in stead of a cancellation token and then restart after the Consume stops on it’s own after that timer.
I get the “Revoked”, and “Assigned” partition messages after this, but my Consume somehow doesn’t pick up new messages anymore after all this.
I have no clue what i’m doing wrong. Can anyone give suggestions on how to set this up?
Sidenote : I have to consume 5 topics, so I’m actually running each consumer on a thread as you can see in the following code.
Marc
var consumerConfig = new ConsumerConfig
{
BootstrapServers = ConfigurationManager.AppSettings["KafkaServer"],
SecurityProtocol = SecurityProtocol.SaslSsl,
SslCaLocation = ConfigurationManager.AppSettings["KafkaPEMLocation"],
SaslMechanism = SaslMechanism.Plain,
GroupId = "TestInterface",
AutoOffsetReset = AutoOffsetReset.Earliest,
SaslPassword = ConfigurationManager.AppSettings["KafkaPwd"],
SaslUsername = ConfigurationManager.AppSettings["KafkaUser"]
};
var consumerWO = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetErrorHandler((_, e) =>
{
Console.WriteLine($"Error: {e.Reason}");
return;
})
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build();
{
string topics = wotopicWorkOrder;
consumerWO.Subscribe(topics);
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
try
{
Thread t = new Thread(() =>
{
try
{
isRunning = true;
while (!mustStop)
{
var consumeResult = consumerWO.Consume(cancellationToken );
if (consumeResult != null)
{
lock (this)
{
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
}
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
ProcessWO(consumeResult.Message);
}
}
}
}
catch (Exception ex)
{
logger.Error("Error during Consume WO : " + ex.ToString());
}
});
t.Start();
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}