Hi all, I have a code that consume messages from a topic, do something and than commit each message.
I want to consume in a batch of number of messages or timeout and than iterate over each message. when i finish iterating over them consume another batch.
How can i consumer in a batch with a max number of messages per batch?
c.SubscribeTopics([]string{"topic"}, nil)
run := true
for run {
msg, err := c.ReadMessage(time.Second)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
err = DoSomething(msg.Value)
if err != nil {
fmt.Printf("Failed to DoSomething: %s\n", err)
}
_, err = c.CommitMessage(msg)
if err != nil {
fmt.Printf("Failed to commit message to kafka: %s\n", err)
}
} else if err.(kafka.Error).Code() != kafka.ErrTimedOut {
//add logger error and shut down
}
}