Performance Optimization of Realtime Kafka Streams Processing

Title: Kafka Streams Performance Issues with High Message Volume

Hello Kafka Community,

I am currently working on a Kafka Streams application that processes Netflow data. While the application is functional, I have been facing performance issues, particularly with the time it takes to consume and process messages from the Kafka topic.

Setup Details:

  • Message Volume: The simulator sends a total of 100,000 messages in 200 seconds (500 messages per second).
  • Processing Time: The application takes around 6 to 7 minutes to process these messages and send them to the target topic.

Problems:

  1. High Latency: The processing time is significantly longer than the message ingestion time.
  2. Message Consumption Rate: Need to measure and optimize how many messages are consumed and processed per second.

Key Concerns:

*Is it normal for a Kafka Streams application to take this long to process 100,000 messages?

  • What could be the potential reasons for this high processing time?
  • How can I improve the performance and reduce the latency?

Kafka Streams can in general process tenth of thousands of messages per second per thread, so 500 messages per sec is in general not high volume.

Of course, it always depends what your computation does… Stateless filtering/mapping is light weight, while aggregations, join, windowing is heavy weight and gives lower throughput.

What could be the potential reasons for this high processing time?

You should start to look into metric (Monitor Kafka Streams Applications in Confluent Platform | Confluent Documentation) to get some sense where your time is spent. You might also consider consumer and producer metrics to check if your read/writes to Kafka are performant.

1 Like

Thank you for your response
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [ipfix_router, ipfix_olt])
→ KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: )
→ KSTREAM-FILTER-0000000002
← KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FILTER-0000000002 (stores: )
→ KSTREAM-KEY-SELECT-0000000003
← KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-KEY-SELECT-0000000003 (stores: )
→ KSTREAM-FILTER-0000000007
← KSTREAM-FILTER-0000000002
Processor: KSTREAM-FILTER-0000000007 (stores: )
→ KSTREAM-SINK-0000000006
← KSTREAM-KEY-SELECT-0000000003
Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition)
← KSTREAM-FILTER-0000000007

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition])
→ KSTREAM-AGGREGATE-0000000005
Processor: KSTREAM-AGGREGATE-0000000005 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000004])
→ KTABLE-SUPPRESS-0000000009
← KSTREAM-SOURCE-0000000008
Processor: KTABLE-SUPPRESS-0000000009 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000010])
→ KTABLE-TOSTREAM-0000000011
← KSTREAM-AGGREGATE-0000000005
Processor: KTABLE-TOSTREAM-0000000011 (stores: )
→ KSTREAM-FLATMAPVALUES-0000000012
← KTABLE-SUPPRESS-0000000009
Processor: KSTREAM-FLATMAPVALUES-0000000012 (stores: )
→ KSTREAM-SINK-0000000013
← KTABLE-TOSTREAM-0000000011
Sink: KSTREAM-SINK-0000000013 (topic: enrichedNetflow)
← KSTREAM-FLATMAPVALUES-0000000012

This is my topology.Is this optimized??

Thanks. You should for sure focus on the aggregation and suppress() operations. Did you inspect metrics? The aggregation might need some RocksDB tuning to avoid hitting the disk and having enough stuff in-memory (also applies to the KS internal cache):

For the latency, given that you use suppress() a higher latency is expected, because suppress() does hold back the emission of intermediate results.

HTH.

1 Like

Thanks for your reply. When using JConsole, I don’t get all the metrics. For the most part, I am getting NaN. I’ll share my topology building logic. Can you suggest improvements?

private Topology buildMdCongestionTopology() throws IOException {
    try {
      String[] classifierOrder = trafficClassifierOrderString.split(";");
      Map<String, ITrafficClassifier> classifierMap = new HashMap<>();
      classifierMap.put("DNS", new DNSProxyTrafficClassifier(redisHost));
      classifierMap.put("ASN", new ASNTrafficClassifier());
      classifierMap.put("SERVICE", new IANALookupTrafficClassifier());
      classifierMap.put("APPLICATION", new ApplicationIdTrafficClassifier());

      final StreamsBuilder builder = new StreamsBuilder();
      Serde<NetflowPOJO> netflowSerdes = createSerdes(NetflowPOJO.class);
      Serde<EnrichedNetflowPOJO> enrichedNetflowSerdes = createSerdes(EnrichedNetflowPOJO.class);
      Serde<OutputPOJO> outputSerdes = createSerdes(OutputPOJO.class);
      Serde<SubscriberFlows> subscriberFlowSerdes = createSerdes(SubscriberFlows.class);
      WindowExporterKeySerdes windowExporterKeySerdes = new WindowExporterKeySerdes();
      Serde<Windowed<String>> windowedExporterSerdes = Serdes.serdeFrom(windowExporterKeySerdes, windowExporterKeySerdes);

      // Updated TimestampExtractor
      TimestampExtractor netflowTimestampExtractor = new TimestampExtractor() {
        @Override
        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
          NetflowPOJO obj = (NetflowPOJO) record.value();
          if (obj == null || obj.getFlow() == null || (obj.getFlow().getFlowEndMilliseconds() == null && obj.getFlow().getFlowEndSysUpTime() == null)) {
            return new Date().getTime();
          }
          if (obj.getFlow().getFlowEndMilliseconds() != null) {
            return obj.getFlow().getFlowEndMilliseconds().toEpochMilli();
          } else {
            return obj.getFlow().getFlowEndSysUpTime().toEpochMilli();
          }
        }
      };

      KStream<String, NetflowPOJO> netflowStream = builder
              .stream(usageTopics, Consumed.with(Serdes.String(), netflowSerdes, netflowTimestampExtractor, AutoOffsetReset.LATEST));

      // Processor: Enrich Netflow Records
      KStream<String, EnrichedNetflowPOJO> enrichedNetflowStream = netflowStream.mapValues(value -> {
        try {
          EnrichedNetflowPOJO enriched = new EnrichedNetflowPOJO(value);
          DIRECTION direction = getDirection(enriched);
          enriched.setDirection(direction);
          if (AGGREGATE_BY_SUBSCRIBER) {
            populateSubscriber(enriched);
          }
          if (AGGREGATE_BY_APPLICATION) {
            enriched.getApplicationName(enriched.getFlow().getApplicationId());
          }
          if (CLASSIFY_TRAFFIC) {
            boolean found = false;
            for (String classifierName : classifierOrder) {
              ITrafficClassifier classifier = classifierMap.get(classifierName);
              if (classifier != null) {
                found = classifier.lookup(enriched);
                if (found) {
                  break;
                }
              }
            }
            if (!found) {
              if (enriched.getDestinationClassification() == null) {
                enriched.setDestinationClassification("UNKNOWN");
              }
              if (enriched.getSourceClassification() == null) {
                enriched.setSourceClassification("UNKNOWN");
              }
            }
          }
          return enriched;
        } catch (NullPointerException | IOException e) {
          SafeLog.error("Error enriching Netflow record: %s", e.getMessage());
          return null;
        }
      }).filter((key, value) -> value != null); // Filter out null records immediately

      if (enrichedNetflowStream != null) {
        // Processor: Filter and rekey by Subscriber or Application
        enrichedNetflowStream = enrichedNetflowStream
                .filter((key, value) -> value != null &&
                        (AGGREGATE_BY_SUBSCRIBER ? !StringUtils.isBlank(value.getSubscriber()) : true) &&
                        (AGGREGATE_BY_APPLICATION ? value.getApplicationName() != null &&
                                !StringUtils.isBlank(value.getApplicationName()) &&
                                !value.getApplicationName().equals("N/A") : true))
                .selectKey((key, value) -> AGGREGATE_BY_SUBSCRIBER ? value.getSubscriber() : (AGGREGATE_BY_APPLICATION ? value.getApplicationName() : "General"));

        // Processor: Aggregate data
        enrichedNetflowStream.groupByKey(Grouped.with(Serdes.String(), enrichedNetflowSerdes))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(windowLengthMins))
                        .grace(Duration.ofMinutes(windowGracePeriodMins))
                        .advanceBy(Duration.ofMinutes(windowLengthMins)))
                .aggregate(SubscriberFlows::new,
                        (key, value, aggregate) -> {
                          if (value == null) {
                            return null;
                          }
                          try {
                            if (value.getDirection() == DIRECTION.UP) {
                              aggregate.addUpstream(value.getDestinationClassification(), value.getFlow().getOctetTotalCount());
                            } else if (value.getDirection() == DIRECTION.DOWN) {
                              aggregate.addDownstream(value.getSourceClassification(), value.getFlow().getOctetTotalCount());
                            }
                            SafeLog.aud0("subflow value: %s", value.toString());
                          } catch (Exception ex) {
                            SafeLog.error("Error generated while aggregation: %s", ex.getMessage());
                          }
                          return aggregate;
                        }, Materialized.with(Serdes.String(), subscriberFlowSerdes))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .flatMapValues((key, subFlow) -> {
                  List<OutputPOJO> flows = new ArrayList<>();
                  for (Map.Entry<String, Usage> entry : subFlow.getUsage().entrySet()) {
                    SafeLog.aud0("key value: %s", key.toString());
                    OutputPOJO obj = new OutputPOJO();
                    obj.setStart(key.window().startTime());
                    obj.setEnd(key.window().endTime());
                    obj.setIdentifier(key.key());
                    obj.setClassification(entry.getKey());
                    obj.setUs(entry.getValue().getUpstream());
                    obj.setDs(entry.getValue().getDownstream());
                    flows.add(obj);
                  }
                  return flows;
                })
                .to(PRODUCING_TOPIC, Produced.with(windowedExporterSerdes, outputSerdes));
      }

      return builder.build();
    } catch (Exception ex) {
      SafeLog.error("Error generated while topology creation: %s", ex.getMessage());
    }
    return null;
  }

Perf tuning first requires to measure – otherwise it’s a blind trial-and-error game.

I would also assume that it’s most likely a config question, rather than a code question.

Sorry that I cannot be more specific, but perf tuning is difficult and thus it’s hard to just say change config A from value X to Y – especially w/o measurement which tell us where time is spent…

1 Like

Ok.Thank you.Are there any possibilities to achieve parallelism in Kafka Streams without partitioning the source topic?

Not really – of course, you could do KStream#repartition() and shuffle the data through a repartition topic with higher number of partitions to allow for more parallelism downstream. But it’s somehow the same thing?

The unit of parallel “work units” in Kafka Streams is tasks. For each sub-topology of your program, tasks are created based on the number of partitions from the sub-topologies input topics: Kafka Streams Architecture for Confluent Platform | Confluent Documentation

1 Like

Thank you for your support and instant replies.I have attached the metrics of the application in this doc. Metrics of Streams Application - Google Docs

Could you please look into it and provide me with some insights?

Subject: Seeking Advice: Apache Flink vs. Kafka Streams for Large-Scale IPFix Processing

I am seeking advice on whether Apache Flink or Kafka Streams would be more suitable for our specific use case involving large-scale IPFix message processing. Here are the details of our use case:

Use Case Overview

We have an IPFix collector that gathers IPFix messages from approximately 12 million router devices. Our IPFix enricher application needs to:

  1. Classify Traffic: Based on ASN, DNS, service, or application.
  2. Aggregate Traffic: By application, calculating upstream (us) and downstream (ds) data.
  3. Windowed Processing: Perform these operations within fixed 15-minute windows.
  4. Output: Send aggregated results to a Kafka topic (enrichedNetflow).

Current Considerations

We are currently evaluating both Apache Flink and Kafka Streams for this purpose. Here are some points we are considering:

Key Questions

  1. Given our use case, which involves processing a high volume of messages with complex classification and aggregation logic, would Apache Flink offer significant advantages over Kafka Streams in terms of performance and scalability?
  2. Are there specific features in Flink that would provide a clear benefit for handling large-scale stateful processing and ensuring low latency?
  3. What are the operational trade-offs when moving from Kafka Streams to Flink, especially in terms of managing a separate Flink cluster?

That’s a difficult question. As rule of thumb, I would say, Flink can be more performant, but managing a Flink cluster is more difficult than running a Kafka Streams application.

1 Like

What will be the approximate throughput of a single-stream thread application solely performing filtering, complex aggregation, and stream joining?

@Thayaruban Are you asking this question about Kafka Streams or Flink?