sessionStore.fetch(someKey) is very slow

Hi all, I am currently implementing Kafka Stream processing for some factory data and I am having a hard time understanding the sesionStore, becaues my fetch operations take upwards of 5seconds, even for a very small store. I receive HbwEvents, that hold the data for a station, which has 2 light sensors that are of importance to me. And I want to see for how long they are broken at a time. Hence a session window made most sense to me.

1st. I created a new Stream that only had the events where a light sensor was broken:

        KStream<String, HbwEvent> lightBarrierBrokenHBW = hbwTypedStream.filterNot((k, v) ->
                v.getData().isI1_light_barrier() && v.getData().isI4_light_barrier());

2nd. I created the SessionWindowedKStream for the event, grouping it by light sensor and then windowing it:

        SessionWindowedKStream<String, HbwEvent> sessionizedHbwEvent = lightBarrierBrokenHBW
                .groupBy((k, v) -> {
                    String sensorKey = "unknown";
                    if (!v.getData().isI4_light_barrier()) {
                        sensorKey = "i4_light_sensor";
                    } else if (!v.getData().isI1_light_barrier()) {
                        sensorKey = "i1_light_sensor";
                    }
                    System.out.println("GroupBy: " + k + " -> " + sensorKey);
                    return sensorKey;
                }, Grouped.with(Serdes.String(), hbwEventSerdes))
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(3)));

3rd. I aggregated the sessions with a custom Aggregator:

        sessionizedHbwEvent.aggregate(
                TimeDifferenceAggregation::new,
                TimeDifferenceAggregation::add,
                TimeDifferenceAggregation::merge,
                Materialized.<String, TimeDifferenceAggregation, SessionStore<Bytes, byte[]>>as("lightSensor")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(timeDifferenceAggregationSerde)
                        .withCachingEnabled()
        );

The TimeDIfferencAggregation class looks like this:

@Setter
@Getter
public class TimeDifferenceAggregation {
    @SerializedName("FirstTimestamp")
    private Instant firstTimestamp;
    @SerializedName("LastTimestamp")
    private Instant lastTimestamp;

    public TimeDifferenceAggregation() {
        this.firstTimestamp = null;
        this.lastTimestamp = null;
    }

    public TimeDifferenceAggregation(Instant firstTimestamp, Instant lastTimestamp) {
        this.firstTimestamp = firstTimestamp;
        this.lastTimestamp = lastTimestamp;
    }

    public static TimeDifferenceAggregation add(String Key, HbwEvent event, TimeDifferenceAggregation agg) {
        Instant newTime = Instant.parse(event.getTime());

        Instant newFirstTimestamp = (agg.firstTimestamp == null || newTime.isBefore(agg.firstTimestamp)) ? newTime : agg.firstTimestamp;
        Instant newLastTimestamp = (agg.lastTimestamp == null || newTime.isAfter(agg.lastTimestamp)) ? newTime : agg.lastTimestamp;

        return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
    }

    public static TimeDifferenceAggregation merge(String Key, TimeDifferenceAggregation agg1, TimeDifferenceAggregation agg2) {
        Instant newFirstTimestamp;
        Instant newLastTimestamp;

        if (agg1.firstTimestamp != null && agg2.firstTimestamp != null) {
            newFirstTimestamp = agg1.firstTimestamp.isBefore(agg2.firstTimestamp) ? agg1.firstTimestamp : agg2.firstTimestamp;
        } else if (agg1.firstTimestamp != null) {
            newFirstTimestamp = agg1.firstTimestamp;
        } else {
            newFirstTimestamp = agg2.firstTimestamp;
        }

        if (agg1.lastTimestamp != null && agg2.lastTimestamp != null) {
            newLastTimestamp = agg1.lastTimestamp.isAfter(agg2.lastTimestamp) ? agg1.lastTimestamp : agg2.lastTimestamp;
        } else if (agg1.lastTimestamp != null) {
            newLastTimestamp = agg1.lastTimestamp;
        } else {
            newLastTimestamp = agg2.lastTimestamp;
        }

        return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
    }

In my topology I am also doing some seperate stream processing, where I materialize a normal KeyValueStore and there the querying only takes a couple ms, which is the expected time.
Does anyone have an idea as to why my sessionStore is so slow?

I did some additional testing and found that the backwardFetch() operation returns a result immediately. I pasted my test endpoint below.

    public String getTest() {
        Logger logger = LoggerFactory.getLogger(MonitoringRestController.class);

        long fetchStartTime = System.currentTimeMillis();
        KeyValueIterator<Windowed<String>, TimeDifferenceAggregation> range = lightSensorStore.backwardFetch("i4_light_sensor");
        long fetchEndTime = System.currentTimeMillis();

        logger.info("Fetch and process time: {} ms", (fetchEndTime - fetchStartTime)); // Fetch and process time: 1 ms

        long processStartTime = System.currentTimeMillis();

        range.forEachRemaining(n ->
                logger.info("Key: {}, First Timestamp: {}, Last Timestamp: {}", n.key.key(), n.value.getFirstTimestamp(), n.value.getLastTimestamp())); // both expected sessions got logged instantely

        range.close();

        long processEndTime = System.currentTimeMillis(); // Processing time: 9438 ms
        logger.info("Processing time: {} ms", (processEndTime - processStartTime));

        long responseTime = System.currentTimeMillis();
        logger.info("Total response time: {} ms", (responseTime - fetchStartTime)); // Total response time: 9440 ms

        return "success";
    }

The results make me believe, that when the iterator has iterated through the two valid sessions, he does something that I am unaware of?

Hard to say. In general, fetch(key) creates an iterator what is relatively slow, compared to a KeyValueStore.get(key) lookup, which is an efficient point lookup. Because there is no “time boundaries” the search happens across all “segments” (ie, the full timeline/history of the store), which should be up to 3, so internally 3 iterators would be created. Thus, you would get all sessions for a key.

When you say your fetch() takes up to 5 seconds, are you referring to inside a custom Processor or via IQ? From the code I cannot think of a reason why backwardFetch() might be quicker… The difference is just, that it would return the newest session first, while fetch(key) given you the oldest session first.

I didnt crate a custom Processor and am just accessing the store via interactive query. So the sessionStore is made available through a bean:

    @Bean
    public ReadOnlySessionStore<String, TimeDifferenceAggregation> lightSensorStore(KafkaStreams streams) {
        return streams.store(StoreQueryParameters.fromNameAndType("lightSensor", QueryableStoreTypes.sessionStore()));
    }

What i found through testing, is that it is slow to find the last / end of the iteration (if that makes sense). I timed every session that was iterated over through backwardFetch() and the 2 sessions that i knew would be in there were logged immediately. Then it took a couple seconds and the iteration was finished. I think the initial wait time for fetch() is for the same reason, just that in fetch() it needs to do the slow part first?

Is this a wrong way to handle a sessionStore? Im pretty new to kafka so I am not sure what the best practices are. All I know is, that I have a stream that can be split into sessionedWindows and the metric that is most useful, is to report the startTime and endTime of each window, where the timestamps need to be taken from the event objects. And to query this knowledge materializing it and then using the interactive query made the most sense to me.

Anyways thanks already for the input!

If I understand you correctly, you are saying to finish the whole loop before the iterator is closed takes the same amount of time for both fetch() and fetchBackwards()? But for fetch() it takes longer to return the first result?

Given how iterators are implemented internally, this could be the case. In the end, given the RocksDB key-byte layout, we need to scan over many RocksDB records, and might need to filter some that don’t qualify for the key you are looking for (don’t want to go into too much, but in the end, the range bounds on RocksDB are not strict, so we scan more data than we return from the iterator; it’s not possible to make the “search space” smaller unfortunately). If the sessions are at the end of the range we need to scan, it make sense that backward fetch returns data more quickly, while fetch first reads a lot of data and skips over it before it find the first session (and backward fetch would just scan-and-skip after it did return the two sessions it found and returned).

So you don’t do anything wrong. It’s just how internal implementation of Kafka Streams works.

If I understand you correctly, you are saying to finish the whole loop before the iterator is closed takes the same amount of time for both fetch() and fetchBackwards() ? But for fetch() it takes longer to return the first result?

That is exactly what I am seeing. But even if this is how it works internally, I find it hard to believe, that 5seconds is the wanted amount of time for a query haha. Does the fetch() and backwardsfetch() just not get used in the way that I am using it? Should I be using a different method to query the store?

I find it hard to believe, that 5seconds is the wanted amount of time for a query haha.

Well, maybe not “wanted” but I would say it is what it is… Guess you could try to look into RocksDB tuning to make it faster? But in the end, iterators are kinda slow, and as said, we don’t always have “sharp” bound for the search range, and might read a lot more data than the iterator effectively returns.

RocksDB is a key-value store, and thus has limited capabilities what implies that we have to put some tricks into place in Kafka Streams to make it work, what sometimes has negative impact on performance.

Does the fetch() and backwardsfetch() just not get used in the way that I am using it?

I do think it’s getting used just the way you do. How good the search bounds are depends in multiple factors, like key data type an serialization… So perf profile can vary. If you want to dig deeper, you would need to look into internals… It’s complicated: kafka/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java at bb6a042e99a3d011968a75f4e2bfb51d57637a25 · apache/kafka · GitHub

Should I be using a different method to query the store?

Well, if you only have the key for querying, there is no other way. If you could limit the search space by time, you could use findSession(key, ts1, ts2) but not sure if you have this information available.

The other alternative you would try it to switch to in-memory store to see who latency changes.