Need help with EARLIEST_BY_OFFSET and GROUP BY

My objective is to create an alert stream for a condition when speed is non zero and Geo-spacial data was not updating. I want to do this by comparing the two events continuously. I understood that we can’t do that directly. So, i wanted to create an stream from the main stream containing that both current record and last record or ( last two records) data for further analysis.

CREATE STREAM STRGEOALERT_01
WITH (VALUE_FORMAT = ‘JSON’, RETENTION_MS = 3600000)
AS
SELECT deviceid,
LATEST_BY_OFFSET(speed, 2) AS speed_,
LATEST_BY_OFFSET(uniqueid, 2) AS uniqueid_,
LATEST_BY_OFFSET(devicedatetime,2) AS devicedatetime_,
LATEST_BY_OFFSET(latitude, 2) as latitude_,
LATEST_BY_OFFSET(longitude, 2) as longitude_
FROM STRGEOALERT
GROUP BY DEVICEID
emit changes;

But this query was not behaving as expected. How does the group by behaves? Will it activate for every event coming into the stream? How can i achieve desired behaviour.

By default it won’t necessarily output an update for every input record because there is record caching that comes into play and affords performance benefits. There are two parameters commit.interval.ms and cache.max.bytes.buffering that dictate caching behavior (ksqlDB docs, relevant underlying Kafka Streams docs on caching behavior here). To get updates in the output for every input, you’d need to set one or both of these parameters to zero.

You could consider alerting in one shot by using HAVING. E.g., to find instances where the speed is non-zero for at least one of the two consecutive records and both lat and long haven’t changed, something like this should work:

SELECT deviceid,
    LATEST_BY_OFFSET(speed, 2) AS speed_,
    LATEST_BY_OFFSET(uniqueid, 2) AS uniqueid_,
    LATEST_BY_OFFSET(devicedatetime,2) AS devicedatetime_,
    LATEST_BY_OFFSET(latitude, 2) as latitude_,
    LATEST_BY_OFFSET(longitude, 2) as longitude_
FROM STRGEOALERT
GROUP BY DEVICEID
HAVING (LATEST_BY_OFFSET(speed, 2)[1] > 0 OR LATEST_BY_OFFSET(speed, 2)[2] > 0) AND
        LATEST_BY_OFFSET(latitude, 2)[1] = LATEST_BY_OFFSET(latitude, 2)[2] AND
        LATEST_BY_OFFSET(longitude, 2)[1] = LATEST_BY_OFFSET(longitude, 2)[2]
EMIT CHANGES;

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.