Akka Kafka Consumer processing rate decreases drastically when lag is there in our Kafka Partitions

We are facing a scenario where our akka-stream-kaka-consumer processing rate is decreasing whenever there is a lag. When we start it without any lag in partitions, processing rate increases suddenly.

MSK cluster - 10 topics - 40 partitions each => 400 total leader partitions

To achieve high throughput and parallelism in system we implemented akka-stream-kafka consumers subscribing to each topic-partition separately resulting in 1:1 mapping between consumer and partition.

Here is consumer setup:

  1. Number of ec2 service instances - 7
  2. Each service spins up 6 consumer for each of the 10 topics resulting resulting in 60 consumers from each service instance.
  3. Total consumer = Number of instances (7) * Number of consumers on each service instance (60) = 420

So, in total we are starting 420 consumers spread across different instances. As per the RangeAssignor Partition strategy (Default one), each partition will get assigned to different consumer and 400 consumer will use 400 partitions and 20 consumers will remain unused. We have verified this allocation and looks good.

Instance Type used: c5.xlarge

MSK Config:

Apache Kafka version - 2.4.1.1

Total number of brokers - 9 ( spread across 3 AZs)

Broker Type: kafka.m5.large

Broker per Zone: 3

auto.create.topics.enable=true

default.replication.factor=3

min.insync.replicas=2

num.io.threads=8

num.network.threads=5

num.partitions=40

num.replica.fetchers=2

replica.lag.time.max.ms=30000

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

socket.send.buffer.bytes=102400

unclean.leader.election.enable=true

zookeeper.session.timeout.ms=18000

log.retention.ms=259200000

This is the configuration we are using for each consumers

akka.kafka.consumer {
 kafka-clients {
  bootstrap.servers = "localhost:9092"
  client.id = "consumer1"
  group.id = "consumer1"
  auto.offset.reset="latest"
 }
 aws.glue.registry.name="Registry1"
 aws.glue.avroRecordType = "GENERIC_RECORD"
 aws.glue.region = "region"
 

    kafka.value.deserializer.class="com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer"

 # Settings for checking the connection to the Kafka broker. Connection checking uses `listTopics` requests with the timeout
 # configured by `consumer.metadata-request-timeout`
 connection-checker {

  #Flag to turn on connection checker
  enable = true

  # Amount of attempts to be performed after a first connection failure occurs
  # Required, non-negative integer
  max-retries = 3

  # Interval for the connection check. Used as the base for exponential retry.
  check-interval = 15s

  # Check interval multiplier for backoff interval
  # Required, positive number
  backoff-factor = 2.0
 }
}

akka.kafka.committer {

 # Maximum number of messages in a single commit batch
 max-batch = 10000

 # Maximum interval between commits
 max-interval = 5s

 # Parallelism for async committing
 parallelism = 1500

 # API may change.
 # Delivery of commits to the internal actor
 # WaitForAck: Expect replies for commits, and backpressure the stream if replies do not arrive.
 # SendAndForget: Send off commits to the internal actor without expecting replies (experimental feature since 1.1)
 delivery = WaitForAck

 # API may change.
 # Controls when a `Committable` message is queued to be committed.
 # OffsetFirstObserved: When the offset of a message has been successfully produced.
 # NextOffsetObserved: When the next offset is observed.
 when = OffsetFirstObserved
}


akka.http {
 client {
  idle-timeout = 10s
 }
 host-connection-pool {
  idle-timeout = 10s
  client {
   idle-timeout = 10s
  }
 }
}

consumer.parallelism=1500

We are using below code to to materialised the flow from Kafka to empty sink

override implicit val actorSystem = ActorSystem("Consumer1")
override implicit val materializer = ActorMaterializer()
override implicit val ec = system.dispatcher
val topicsName = "Set of Topic Names"
val parallelism = conf.getInt("consumer.parallelism")


val supervisionDecider: Supervision.Decider = {
 case _ => Supervision.Resume
}

val commiter = committerSettings.getOrElse(CommitterSettings(actorSystem))
val supervisionStrategy = ActorAttributes.supervisionStrategy(supervisionDecider)
Consumer
 .committableSource(consumerSettings, Subscriptions.topics(topicsName))
 .mapAsync(parallelism) {
  msg =>
   f(msg.record.key(), msg.record.value())
    .map(_ => msg.committableOffset)
    .recoverWith {
     case _ => Future.successful(msg.committableOffset)
    }
 }
 .toMat(Committer.sink(commiter).withAttributes(supervisionStrategy))(DrainingControl.apply)
 .withAttributes(supervisionStrategy)

Library versions in code

"com.typesafe.akka" %% "akka-http"            % "10.1.11",
 "com.typesafe.akka" %% "akka-stream-kafka" % "2.0.3",
 "com.typesafe.akka" %% "akka-stream" % "2.5.30"

The observation are as follows,

  1. In successive intervals of 1 hour lets say, only some of consumers
    are actively consuming the lag and processing at the expected rate.
  2. In next 1 hours, some other consumers become active and actively
    consumes from its partitions and then stop processing.
  3. All the lag gets cleared in a single shot as observed from the offsetLag Graph.

We want all the consumers to be running in parallel and processing the messages in real time. This lag of 3 days in processing is causing a major downtime for us. I tried following the given link but we are already on the fixed version
[Consumer fetches duplicate records when lagging, resulting in high network use, but low throughput · Issue #549 · akka/alpakka-kafka · GitHub]


(Consumer fetches duplicate records when lagging, resulting in high network use, but low throughput · Issue #549 · akka/alpakka-kafka · GitHub)

Can anyone help what we are missing in terms of configuration of consumer or some other issue.