My Test:
Produce (RdKafka::Producer::produce(..)) 200 messages with payload 1Kb
In a loop (until 200 messages consumed):
KafkaConsumer::consume (timeout = 100 ms)
I hit the timeout 10 times while retrieving messages (ERR__TIMED_OUT)
If I increase the timeout to 2000 ms then I don’t hit the timeout
My question is:
Is this expected behaviour?
Which specific configuration are you experimenting with? max.poll.interval.ms?
With lots of messages in the queue
int timeout_ms=100;
We are calling the method:
RdKafka::KafkaConsumer::consume(timeout_ms)
Instead of returning a message, we are getting error ERR__TIMED_OUT
Why is that?
max.poll.interval.ms is set to 5 minutes and is not being reached in this test
Got it. Are you sure that messages are available for consumption when you start consuming (since produce is async)? If not, you might try flushing the producer before consuming.
Unfortunately I don’t see much / any debug logging in the consume path that might help with this.
FWIW the consume code examples I see tend to pick half a second or second poll timeout. Maybe 100ms is on the low side.
Do you have a reliable repro for this? If you’re able to package up steps and client code that would help to get eyes on it.
Hi dtroiano
Thanks for your help
flush is called after every produced message - so it’s serial
In other tests we’ve seen messages in the queue for 24 hours and the consume call is still returning ERR__TIMED_OUT
I may be able to put together a test that demonstrates this issue that I can send
Made progress with this
We have a skipAll method:
skipAll()
{
std::vectorRdKafka::TopicPartition\* topicPartitions;
use query_watermark_offsets to find highest offset and add to topicPartitions
kafkaConsumer->assign(topicPartitions)
kafkaConsumer->commitSync(topicPartitions)
}
We found that the call to assign above was essential - commitSync had no effect without it
We were also calling assign when commiting offsets after calling kafkaConsumer->consume()
This was causing subsequent calls to kafkaConsumer->consume() to time out
So we have solved our issue
However its not clear to me why the assign is necessary when skipAll messages is executing
Thanks!