I’m using the Confluent.Kafka
plugin to read Kafka messages in .Net Core Worker Service
.
Here’s my Code:
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
_logger.LogInformation($"Executed background service at: {DateTime.Now}");
// Create consumer
using var consumer = _kafkaConsumerService.BuildConsumer(GetConsumerSettings());
consumer.Subscribe(_settings.SupportedKafkaTopic.TopicName);
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var currentMsg = consumer.Consume(cancellationToken);
var messageValue = currentMsg.Message.Value;
_logger.LogInformation($"The message:{messageValue} has been consumed from the topic:{currentMsg.Topic} with offset:{currentMsg.Offset} at:{DateTime.Now}");
if (await _messageHandler.AddMessage(_settings.SupportedKafkaTopic.ServiceName, messageValue))
{
_logger.LogInformation($"The message:{messageValue} has been processed from the topic:{currentMsg.Topic} and recorded into SQL Server at:{DateTime.Now}");
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, $"Error occured while saving the data into SQL Server at:{DateTime.Now}");
}
// TODO: Analyse this delay, is this impacting too much?
// Its just fraction of mili seconds. this helps for cancellation and unit tests.
await Task.Delay(50, cancellationToken);
}
}
catch (OperationCanceledException ex)
{
_logger.LogError(ex, $"Error occurred while consuming the kafka topic:{_settings.SupportedKafkaTopic.TopicName}");
consumer.Close();
}
}
Basically, Here, I’m saving the message into the SQL server using this _messageHandler.AddMessage
method. My Consumer config holds EnableAutoCommit = true
. I wanted to know is it the right approach? If anything fails while saving the data into the SQL server, it commits the offset anyhow. How to handle such a scenario? If the message is processed successfully, then only it should commit the offset, otherwise, it should not. So that I can process the failed message again.