I am encountering a challenge while using Confluent.Kafka C# library. The consumer is able to consume the first message successfully, but when it comes to the second message on the same topic, it remains in a waiting state and is unable to be consumed. Concurrently, I have confirmed through the console consumer that the message is flowing correctly.
Refer to the code of consumer:
MyConsumer myConsumer = new MyConsumer(consumeTopic);
nirvanaConsumer.Consume(requestResponseObj.RequestID);
using Confluent.Kafka;
namespace KafkaLibrary
{
public class MyConsumer : IDisposable
{
private IConsumer<Null, RequestResponseModel> _kafkaConsumer = null;
private CancellationTokenSource _cancellationTokenSource = null;
internal MyConsumer(string topic)
{
try
{
ConsumerConfig cc = new ConsumerConfig();
cc.BootstrapServers = "localhost:9092";
cc.GroupId = "nirvana-consumer-group" + "-" + topic + "-" + Guid.NewGuid();
cc.HeartbeatIntervalMs = 1000;
cc.SessionTimeoutMs = 20000;
cc.MaxPollIntervalMs = 86400000;
cc.EnableAutoOffsetStore = false;
cc.AutoOffsetReset = AutoOffsetReset.Latest;
_kafkaConsumer = new ConsumerBuilder<Null, RequestResponseModel>(cc).SetValueDeserializer(new KafkaSerializer()).Build();
_kafkaConsumer.Subscribe(topic);
_cancellationTokenSource = new CancellationTokenSource();
}
catch (Exception)
{
throw;
}
}
internal async Task<string> Consume(Guid requestId)
{
try
{
ConsumeResult<Null, RequestResponseModel> consumerResult = null;
return await Task.Factory.StartNew(() =>
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
try
{
consumerResult = _kafkaConsumer.Consume(_cancellationTokenSource.Token);
if (consumerResult != null && consumerResult.Message != null && consumerResult.Message.Value != null)
{
if (consumerResult.Message.Value.RequestID == requestId)
{
_kafkaConsumer.Commit(consumerResult);
_cancellationTokenSource.Cancel();
}
}
}
catch (Exception exp)
{
Console.WriteLine(exp.Message + Environment.NewLine + exp.StackTrace);
throw;
}
}
return consumerResult.Message.Value.Data;
});
}
catch (Exception exp)
{
Console.WriteLine(exp.Message + Environment.NewLine + exp.StackTrace);
throw;
}
}
}
}