We have multiple microservices written in GO exchanging messages over the Kafka message bus. A microservice writes on a Kafka topic with a partition count of 3 with a replica factor of 2. We use AWS MSK for kafka brooker. Here are my producer -
package kf
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/segmentio/kafka-go"
"net"
"strconv"
)
type Producer struct {
flowEventProducer sarama.SyncProducer
topic string
}
func InitProducer(brokers []string, topic string) *Producer {
CreateKafkaTopic(brokers[0], topic)
p := &Producer{}
prod, err := newFlowWriter(brokers)
if err != nil {
panic("failed to connect to producer")
}
p.flowEventProducer = prod
p.topic = topic
return p
}
func CreateKafkaTopic(kafkaURL, topic string) {
conn, err := kafka.Dial("tcp", kafkaURL)
if err != nil {
panic(err.Error())
}
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 3,
ReplicationFactor: 2,
},
}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
defer conn.Close()
}
func newFlowWriter(brokers []string) (sarama.SyncProducer, error) {
config := sarama.NewConfig()
version := "2.6.2"
kafkaVer, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic("failed to parse kafka version, producer will not run")
}
config.Producer.Partitioner = sarama.NewHashPartitioner
config.Net.MaxOpenRequests = 10
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Return.Successes = true
config.Version = kafkaVer
producer, err := sarama.NewSyncProducer(brokers, config)
return producer, err
}
func (p *Producer) WriteMessage(uuid string, data []byte) error {
msg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.ByteEncoder(uuid),
Value: sarama.ByteEncoder(data),
}
part, off, err := p.flowEventProducer.SendMessage(msg)
if err != nil {
return err
} else {
fmt.Printf("message wriiten on part:%d and offset: %d", part, off)
}
return nil
}
and consumer implementation
package kf
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
)
type Consumer struct {
flowEventReader sarama.ConsumerGroup
topic string
brokerUrls []string
}
type data struct {
Name string `json:"name"`
Employee string `json:"employee"`
}
func InitConsumer(brokers []string, topic string) *Consumer {
c := &Consumer{}
c.topic = topic
c.brokerUrls = brokers
var (
err error
)
conf := createSaramaKafkaConf()
c.flowEventReader, err = sarama.NewConsumerGroup(c.brokerUrls, "myconf", conf)
if err != nil {
panic("failed to create consumer group on kafka cluster")
}
return c
}
type KafkaConsumerGroupHandler struct {
Cons *Consumer
}
func (c *Consumer) HandleMessages() {
// Consume from kafka and process
for {
var err = c.flowEventReader.Consume(context.Background(), []string{c.topic}, &KafkaConsumerGroupHandler{Cons: c})
if err != nil {
fmt.Println("FAILED")
continue
}
}
}
func (*KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (l *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
l.Cons.logMessage(msg)
sess.MarkMessage(msg, "")
}
return nil
}
func (c *Consumer) logMessage(msg *sarama.ConsumerMessage) {
d := &data{}
err := json.Unmarshal(msg.Value, d)
if err != nil {
fmt.Println(err)
}
fmt.Printf("messages: key: %s and val:%+v", string(msg.Key), d)
}
func createSaramaKafkaConf() *sarama.Config {
conf := sarama.NewConfig()
version := "2.6.2"
kafkaVer, err := sarama.ParseKafkaVersion(version)
if err != nil {
panic("failed to parse kafka version, executor will not run")
}
conf.Version = kafkaVer
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
return conf
}
I am facing severe delay in message delivery(order of 25 seconds) if the message load is increased to ~500 events per second, where each events size is order of less than 1Kb.
I want message exchange should take less than a second from production to consumption. Despite multiple efforts, I failed to get any solution. Any help in this regard is much appreciated.