Kafka Producer or Consumer are slow?

We have multiple microservices written in GO exchanging messages over the Kafka message bus. A microservice writes on a Kafka topic with a partition count of 3 with a replica factor of 2. We use AWS MSK for kafka brooker. Here are my producer -

package kf

import (
	"fmt"
	"github.com/Shopify/sarama"
	"github.com/segmentio/kafka-go"
	"net"
	"strconv"
)

type Producer struct {
	flowEventProducer sarama.SyncProducer
	topic             string
}

func InitProducer(brokers []string, topic string) *Producer {
	CreateKafkaTopic(brokers[0], topic)
	p := &Producer{}
	prod, err := newFlowWriter(brokers)
	if err != nil {
		panic("failed to connect to producer")
	}
	p.flowEventProducer = prod
	p.topic = topic
	return p
}

func CreateKafkaTopic(kafkaURL, topic string) {
	conn, err := kafka.Dial("tcp", kafkaURL)
	if err != nil {
		panic(err.Error())
	}
	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}

	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}

	defer controllerConn.Close()
	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     3,
			ReplicationFactor: 2,
		},
	}
	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()
}

func newFlowWriter(brokers []string) (sarama.SyncProducer, error) {
	config := sarama.NewConfig()
	version := "2.6.2"
	kafkaVer, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		panic("failed to parse kafka version, producer will not run")
	}
	config.Producer.Partitioner = sarama.NewHashPartitioner
	config.Net.MaxOpenRequests = 10
	config.Producer.RequiredAcks = sarama.WaitForLocal
	config.Producer.Return.Successes = true
	config.Version = kafkaVer
	producer, err := sarama.NewSyncProducer(brokers, config)

	return producer, err
}

func (p *Producer) WriteMessage(uuid string, data []byte) error {
	msg := &sarama.ProducerMessage{
		Topic: p.topic,
		Key:   sarama.ByteEncoder(uuid),
		Value: sarama.ByteEncoder(data),
	}

	part, off, err := p.flowEventProducer.SendMessage(msg)
	if err != nil {
		return err
	} else {
		fmt.Printf("message wriiten on part:%d and offset: %d", part, off)
	}
	return nil
}

and consumer implementation

package kf

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
)

type Consumer struct {
	flowEventReader sarama.ConsumerGroup
	topic           string
	brokerUrls      []string
}

type data struct {
	Name     string `json:"name"`
	Employee string `json:"employee"`
}

func InitConsumer(brokers []string, topic string) *Consumer {
	c := &Consumer{}
	c.topic = topic
	c.brokerUrls = brokers
	var (
		err error
	)
	conf := createSaramaKafkaConf()
	c.flowEventReader, err = sarama.NewConsumerGroup(c.brokerUrls, "myconf", conf)
	if err != nil {
		panic("failed to create consumer group on kafka cluster")
	}

	return c
}

type KafkaConsumerGroupHandler struct {
	Cons *Consumer
}

func (c *Consumer) HandleMessages() {
	// Consume from kafka and process
	for {
		var err = c.flowEventReader.Consume(context.Background(), []string{c.topic}, &KafkaConsumerGroupHandler{Cons: c})
		if err != nil {
			fmt.Println("FAILED")
			continue
		}
	}

}
func (*KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (*KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (l *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		l.Cons.logMessage(msg)
		sess.MarkMessage(msg, "")
	}
	return nil
}

func (c *Consumer) logMessage(msg *sarama.ConsumerMessage) {
	d := &data{}
	err := json.Unmarshal(msg.Value, d)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Printf("messages: key: %s and val:%+v", string(msg.Key), d)
}

func createSaramaKafkaConf() *sarama.Config {
	conf := sarama.NewConfig()
	version := "2.6.2"
	kafkaVer, err := sarama.ParseKafkaVersion(version)
	if err != nil {
		panic("failed to parse kafka version, executor will not run")
	}
	conf.Version = kafkaVer
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest
	conf.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}

	return conf
}

I am facing severe delay in message delivery(order of 25 seconds) if the message load is increased to ~500 events per second, where each events size is order of less than 1Kb.

I want message exchange should take less than a second from production to consumption. Despite multiple efforts, I failed to get any solution. Any help in this regard is much appreciated.