Hi all, I am currently implementing Kafka Stream processing for some factory data and I am having a hard time understanding the sesionStore, becaues my fetch operations take upwards of 5seconds, even for a very small store. I receive HbwEvents, that hold the data for a station, which has 2 light sensors that are of importance to me. And I want to see for how long they are broken at a time. Hence a session window made most sense to me.
1st. I created a new Stream that only had the events where a light sensor was broken:
KStream<String, HbwEvent> lightBarrierBrokenHBW = hbwTypedStream.filterNot((k, v) ->
v.getData().isI1_light_barrier() && v.getData().isI4_light_barrier());
2nd. I created the SessionWindowedKStream for the event, grouping it by light sensor and then windowing it:
SessionWindowedKStream<String, HbwEvent> sessionizedHbwEvent = lightBarrierBrokenHBW
.groupBy((k, v) -> {
String sensorKey = "unknown";
if (!v.getData().isI4_light_barrier()) {
sensorKey = "i4_light_sensor";
} else if (!v.getData().isI1_light_barrier()) {
sensorKey = "i1_light_sensor";
}
System.out.println("GroupBy: " + k + " -> " + sensorKey);
return sensorKey;
}, Grouped.with(Serdes.String(), hbwEventSerdes))
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(3)));
3rd. I aggregated the sessions with a custom Aggregator:
sessionizedHbwEvent.aggregate(
TimeDifferenceAggregation::new,
TimeDifferenceAggregation::add,
TimeDifferenceAggregation::merge,
Materialized.<String, TimeDifferenceAggregation, SessionStore<Bytes, byte[]>>as("lightSensor")
.withKeySerde(Serdes.String())
.withValueSerde(timeDifferenceAggregationSerde)
.withCachingEnabled()
);
The TimeDIfferencAggregation class looks like this:
@Setter
@Getter
public class TimeDifferenceAggregation {
@SerializedName("FirstTimestamp")
private Instant firstTimestamp;
@SerializedName("LastTimestamp")
private Instant lastTimestamp;
public TimeDifferenceAggregation() {
this.firstTimestamp = null;
this.lastTimestamp = null;
}
public TimeDifferenceAggregation(Instant firstTimestamp, Instant lastTimestamp) {
this.firstTimestamp = firstTimestamp;
this.lastTimestamp = lastTimestamp;
}
public static TimeDifferenceAggregation add(String Key, HbwEvent event, TimeDifferenceAggregation agg) {
Instant newTime = Instant.parse(event.getTime());
Instant newFirstTimestamp = (agg.firstTimestamp == null || newTime.isBefore(agg.firstTimestamp)) ? newTime : agg.firstTimestamp;
Instant newLastTimestamp = (agg.lastTimestamp == null || newTime.isAfter(agg.lastTimestamp)) ? newTime : agg.lastTimestamp;
return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}
public static TimeDifferenceAggregation merge(String Key, TimeDifferenceAggregation agg1, TimeDifferenceAggregation agg2) {
Instant newFirstTimestamp;
Instant newLastTimestamp;
if (agg1.firstTimestamp != null && agg2.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp.isBefore(agg2.firstTimestamp) ? agg1.firstTimestamp : agg2.firstTimestamp;
} else if (agg1.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp;
} else {
newFirstTimestamp = agg2.firstTimestamp;
}
if (agg1.lastTimestamp != null && agg2.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp.isAfter(agg2.lastTimestamp) ? agg1.lastTimestamp : agg2.lastTimestamp;
} else if (agg1.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp;
} else {
newLastTimestamp = agg2.lastTimestamp;
}
return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}
In my topology I am also doing some seperate stream processing, where I materialize a normal KeyValueStore and there the querying only takes a couple ms, which is the expected time.
Does anyone have an idea as to why my sessionStore is so slow?