Title: Kafka Streams Performance Issues with High Message Volume
Hello Kafka Community,
I am currently working on a Kafka Streams application that processes Netflow data. While the application is functional, I have been facing performance issues, particularly with the time it takes to consume and process messages from the Kafka topic.
Setup Details:
Message Volume: The simulator sends a total of 100,000 messages in 200 seconds (500 messages per second).
Processing Time: The application takes around 6 to 7 minutes to process these messages and send them to the target topic.
Problems:
High Latency: The processing time is significantly longer than the message ingestion time.
Message Consumption Rate: Need to measure and optimize how many messages are consumed and processed per second.
Key Concerns:
*Is it normal for a Kafka Streams application to take this long to process 100,000 messages?
What could be the potential reasons for this high processing time?
How can I improve the performance and reduce the latency?
Kafka Streams can in general process tenth of thousands of messages per second per thread, so 500 messages per sec is in general not high volume.
Of course, it always depends what your computation does… Stateless filtering/mapping is light weight, while aggregations, join, windowing is heavy weight and gives lower throughput.
What could be the potential reasons for this high processing time?
Thanks. You should for sure focus on the aggregation and suppress() operations. Did you inspect metrics? The aggregation might need some RocksDB tuning to avoid hitting the disk and having enough stuff in-memory (also applies to the KS internal cache):
Thanks for your reply. When using JConsole, I don’t get all the metrics. For the most part, I am getting NaN. I’ll share my topology building logic. Can you suggest improvements?
private Topology buildMdCongestionTopology() throws IOException {
try {
String[] classifierOrder = trafficClassifierOrderString.split(";");
Map<String, ITrafficClassifier> classifierMap = new HashMap<>();
classifierMap.put("DNS", new DNSProxyTrafficClassifier(redisHost));
classifierMap.put("ASN", new ASNTrafficClassifier());
classifierMap.put("SERVICE", new IANALookupTrafficClassifier());
classifierMap.put("APPLICATION", new ApplicationIdTrafficClassifier());
final StreamsBuilder builder = new StreamsBuilder();
Serde<NetflowPOJO> netflowSerdes = createSerdes(NetflowPOJO.class);
Serde<EnrichedNetflowPOJO> enrichedNetflowSerdes = createSerdes(EnrichedNetflowPOJO.class);
Serde<OutputPOJO> outputSerdes = createSerdes(OutputPOJO.class);
Serde<SubscriberFlows> subscriberFlowSerdes = createSerdes(SubscriberFlows.class);
WindowExporterKeySerdes windowExporterKeySerdes = new WindowExporterKeySerdes();
Serde<Windowed<String>> windowedExporterSerdes = Serdes.serdeFrom(windowExporterKeySerdes, windowExporterKeySerdes);
// Updated TimestampExtractor
TimestampExtractor netflowTimestampExtractor = new TimestampExtractor() {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
NetflowPOJO obj = (NetflowPOJO) record.value();
if (obj == null || obj.getFlow() == null || (obj.getFlow().getFlowEndMilliseconds() == null && obj.getFlow().getFlowEndSysUpTime() == null)) {
return new Date().getTime();
}
if (obj.getFlow().getFlowEndMilliseconds() != null) {
return obj.getFlow().getFlowEndMilliseconds().toEpochMilli();
} else {
return obj.getFlow().getFlowEndSysUpTime().toEpochMilli();
}
}
};
KStream<String, NetflowPOJO> netflowStream = builder
.stream(usageTopics, Consumed.with(Serdes.String(), netflowSerdes, netflowTimestampExtractor, AutoOffsetReset.LATEST));
// Processor: Enrich Netflow Records
KStream<String, EnrichedNetflowPOJO> enrichedNetflowStream = netflowStream.mapValues(value -> {
try {
EnrichedNetflowPOJO enriched = new EnrichedNetflowPOJO(value);
DIRECTION direction = getDirection(enriched);
enriched.setDirection(direction);
if (AGGREGATE_BY_SUBSCRIBER) {
populateSubscriber(enriched);
}
if (AGGREGATE_BY_APPLICATION) {
enriched.getApplicationName(enriched.getFlow().getApplicationId());
}
if (CLASSIFY_TRAFFIC) {
boolean found = false;
for (String classifierName : classifierOrder) {
ITrafficClassifier classifier = classifierMap.get(classifierName);
if (classifier != null) {
found = classifier.lookup(enriched);
if (found) {
break;
}
}
}
if (!found) {
if (enriched.getDestinationClassification() == null) {
enriched.setDestinationClassification("UNKNOWN");
}
if (enriched.getSourceClassification() == null) {
enriched.setSourceClassification("UNKNOWN");
}
}
}
return enriched;
} catch (NullPointerException | IOException e) {
SafeLog.error("Error enriching Netflow record: %s", e.getMessage());
return null;
}
}).filter((key, value) -> value != null); // Filter out null records immediately
if (enrichedNetflowStream != null) {
// Processor: Filter and rekey by Subscriber or Application
enrichedNetflowStream = enrichedNetflowStream
.filter((key, value) -> value != null &&
(AGGREGATE_BY_SUBSCRIBER ? !StringUtils.isBlank(value.getSubscriber()) : true) &&
(AGGREGATE_BY_APPLICATION ? value.getApplicationName() != null &&
!StringUtils.isBlank(value.getApplicationName()) &&
!value.getApplicationName().equals("N/A") : true))
.selectKey((key, value) -> AGGREGATE_BY_SUBSCRIBER ? value.getSubscriber() : (AGGREGATE_BY_APPLICATION ? value.getApplicationName() : "General"));
// Processor: Aggregate data
enrichedNetflowStream.groupByKey(Grouped.with(Serdes.String(), enrichedNetflowSerdes))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowLengthMins))
.grace(Duration.ofMinutes(windowGracePeriodMins))
.advanceBy(Duration.ofMinutes(windowLengthMins)))
.aggregate(SubscriberFlows::new,
(key, value, aggregate) -> {
if (value == null) {
return null;
}
try {
if (value.getDirection() == DIRECTION.UP) {
aggregate.addUpstream(value.getDestinationClassification(), value.getFlow().getOctetTotalCount());
} else if (value.getDirection() == DIRECTION.DOWN) {
aggregate.addDownstream(value.getSourceClassification(), value.getFlow().getOctetTotalCount());
}
SafeLog.aud0("subflow value: %s", value.toString());
} catch (Exception ex) {
SafeLog.error("Error generated while aggregation: %s", ex.getMessage());
}
return aggregate;
}, Materialized.with(Serdes.String(), subscriberFlowSerdes))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.flatMapValues((key, subFlow) -> {
List<OutputPOJO> flows = new ArrayList<>();
for (Map.Entry<String, Usage> entry : subFlow.getUsage().entrySet()) {
SafeLog.aud0("key value: %s", key.toString());
OutputPOJO obj = new OutputPOJO();
obj.setStart(key.window().startTime());
obj.setEnd(key.window().endTime());
obj.setIdentifier(key.key());
obj.setClassification(entry.getKey());
obj.setUs(entry.getValue().getUpstream());
obj.setDs(entry.getValue().getDownstream());
flows.add(obj);
}
return flows;
})
.to(PRODUCING_TOPIC, Produced.with(windowedExporterSerdes, outputSerdes));
}
return builder.build();
} catch (Exception ex) {
SafeLog.error("Error generated while topology creation: %s", ex.getMessage());
}
return null;
}
Perf tuning first requires to measure – otherwise it’s a blind trial-and-error game.
I would also assume that it’s most likely a config question, rather than a code question.
Sorry that I cannot be more specific, but perf tuning is difficult and thus it’s hard to just say change config A from value X to Y – especially w/o measurement which tell us where time is spent…
Not really – of course, you could do KStream#repartition() and shuffle the data through a repartition topic with higher number of partitions to allow for more parallelism downstream. But it’s somehow the same thing?
Subject: Seeking Advice: Apache Flink vs. Kafka Streams for Large-Scale IPFix Processing
I am seeking advice on whether Apache Flink or Kafka Streams would be more suitable for our specific use case involving large-scale IPFix message processing. Here are the details of our use case:
Use Case Overview
We have an IPFix collector that gathers IPFix messages from approximately 12 million router devices. Our IPFix enricher application needs to:
Classify Traffic: Based on ASN, DNS, service, or application.
Aggregate Traffic: By application, calculating upstream (us) and downstream (ds) data.
Windowed Processing: Perform these operations within fixed 15-minute windows.
Output: Send aggregated results to a Kafka topic (enrichedNetflow).
Current Considerations
We are currently evaluating both Apache Flink and Kafka Streams for this purpose. Here are some points we are considering:
Key Questions
Given our use case, which involves processing a high volume of messages with complex classification and aggregation logic, would Apache Flink offer significant advantages over Kafka Streams in terms of performance and scalability?
Are there specific features in Flink that would provide a clear benefit for handling large-scale stateful processing and ensuring low latency?
What are the operational trade-offs when moving from Kafka Streams to Flink, especially in terms of managing a separate Flink cluster?
That’s a difficult question. As rule of thumb, I would say, Flink can be more performant, but managing a Flink cluster is more difficult than running a Kafka Streams application.