Data Discrepancy in Tumbling Windowed Table

Describe the bug
We are currently facing a significant discrepancy in the results when creating a windowed table from a stream of IoT data in our ksqlDB setup. The observed output significantly differs from the expected results.

The stream includes a devicedatetime field, which we have designated as the timestamp field for creating windows.

CREATE STREAM STRTRACKERDATA_TRIM WITH (KAFKA_TOPIC=‘strtrackerdata_trim’, PARTITIONS=50, REPLICAS=1, RETENTION_MS=3600000, TIMESTAMP=‘devicedatetime’, VALUE_FORMAT=‘JSON’) AS SELECT DEVICEID, REASON, TENANTGROUPUID, RMSEVENTTYPE, DEVICEDATETIME, LATITUDE, LONGITUDE, SPEED, FMSSPEED, COMMAND, UNIQUEID, HDOP, RECORDSTATUS, LISTENERDATETIME, IGNITIONSTATUS, IBUTTON, ODOMETER, FMSODOMETER, CANODOMETER, GPSDIRECTION, FROM STRTRACKERDATA EMIT CHANGES;

In the table tblrmsabnormalevent_test, our usecase require grouping on windowed aggregation and for uniquely identify the records in the window. We have organized the data by applying the GROUP BY clause to the fields REASON, RMSEVENTTYPE, DEVICEID, and TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’) to structure and store the data.

CREATE TABLE tblrmsabnormalevent_test WITH (KEY_FORMAT=‘JSON’) AS
SELECT
DEVICEID,
REASON,
RMSEVENTTYPE,
AS_VALUE(DEVICEID) AS DEVICE_ID,
AS_VALUE(REASON) AS DEVICE_REASON,
AS_VALUE(RMSEVENTTYPE) AS DEVICE_RMSEVENTTYPE,
COUNT() AS EVENTCOUNT,
FROM_UNIXTIME(WINDOWSTART) as WINDOW_START,
FROM_UNIXTIME(WINDOWEND) as WINDOW_END,
TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’) AS WINDOW_HOUR
FROM
STRTRACKERDATA_TRIM
WINDOW TUMBLING ( SIZE 1 DAY, GRACE PERIOD 1 HOUR )
GROUP BY REASON, RMSEVENTTYPE, DEVICEID,TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’)
HAVING (
(reason=1096 AND count(
)>(37524))
OR (reason=1098 AND count(
)>(1024))
OR (reason=1097 AND count(
)>(424))
OR (reason NOT IN(1096,1098,1097) AND count(
)>(8*24))) emit final;

To Reproduce
Steps to reproduce the behavior, include:

The version of KSQL.
0.29
Sample source data.
In the stream strtrackerdata_trim - we have the following items

STRTRACKERDATA_TRIM originated from another base stream, generating persistent data in the form of a topic. In the stream STRTRACKERDATA_TRIM - we have more than 1 crore records for 3 days data

data in strtrackerdata_trim topic

rowtime: 2024/01/25 13:02:31.662 Z, key: , value: {“DEVICEID”:“354394490123337”,“REASON”:1096,“TENANTGROUPUID”:“86”,“RMSEVENTTYPE”:“none”,“DEVICEDATETIME”:1706187751662,“LATITUDE”:25.0851843,“LONGITUDE”:55.2314691,“SPEED”:7,“FMSSPEED”:0,“COMMAND”:3,“UNIQUEID”:17202,“HDOP”:0,“RECORDSTATUS”:1,“LISTENERDATETIME”:1706187757074,“IGNITIONSTATUS”:1,“IBUTTON”:“”,“ODOMETER”:62356.159,“FMSODOMETER”:62356,“CANODOMETER”:0,“GPSDIRECTION”:195,“GPSFIX”:1}, partition: 33

rowtime: 2024/01/25 08:28:26.366 Z, key: , value: {“DEVICEID”:“354394490178067”,“REASON”:1096,“TENANTGROUPUID”:“86”,“RMSEVENTTYPE”:“none”,“DEVICEDATETIME”:1706171306366,“LATITUDE”:25.1410165,“LONGITUDE”:55.141053,“SPEED”:11,“FMSSPEED”:0,“COMMAND”:3,“UNIQUEID”:21191,“HDOP”:0,“RECORDSTATUS”:1,“LISTENERDATETIME”:1706171310565,“IGNITIONSTATUS”:1,“IBUTTON”:“”,“ODOMETER”:39957.55,“FMSODOMETER”:39957,“CANODOMETER”:0,“GPSDIRECTION”:218,“GPSFIX”:1}, partition: 33

rowtime: 2024/01/25 12:59:26.000 Z, key: , value: {“DEVICEID”:“864004046339377”,“REASON”:1096,“TENANTGROUPUID”:“86”,“RMSEVENTTYPE”:“none”,“DEVICEDATETIME”:1706187566000,“LATITUDE”:25.0739784,“LONGITUDE”:55.3989295,“SPEED”:0,“FMSSPEED”:0,“COMMAND”:3,“UNIQUEID”:28145,“HDOP”:0,“RECORDSTATUS”:1,“LISTENERDATETIME”:1706187569258,“IGNITIONSTATUS”:1,“IBUTTON”:“”,“ODOMETER”:86240.568,“FMSODOMETER”:86240,“CANODOMETER”:0,“GPSDIRECTION”:17,“GPSFIX”:1}, partition: 33

rowtime: 2024/01/25 13:28:25.000 Z, key: , value: {“DEVICEID”:“864004046276900”,“REASON”:1096,“TENANTGROUPUID”:“86”,“RMSEVENTTYPE”:“none”,“DEVICEDATETIME”:1706189305000,“LATITUDE”:25.2299365,“LONGITUDE”:55.2874107,“SPEED”:0,“FMSSPEED”:0,“COMMAND”:3,“UNIQUEID”:63235,“HDOP”:0,“RECORDSTATUS”:1,“LISTENERDATETIME”:1706189305560,“IGNITIONSTATUS”:1,“IBUTTON”:“”,“ODOMETER”:66513.472,“FMSODOMETER”:66513,“CANODOMETER”:0,“GPSDIRECTION”:29,“GPSFIX”:1}, partition: 33

expected behaviour:
When we are doing aggreation by grouping GROUP BY REASON, RMSEVENTTYPE, DEVICEID,TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’) with windowing or data extraction and aggregation with external application we are getting similar result as below

CREATE TABLE rmsanalysis_withoutwindowing WITH (KEY_FORMAT=‘JSON’) AS
SELECT
DEVICEID,
REASON,
RMSEVENTTYPE,
AS_VALUE(DEVICEID) AS DEVICE_ID,
AS_VALUE(REASON) AS DEVICE_REASON,
AS_VALUE(RMSEVENTTYPE) AS DEVICE_RMSEVENTTYPE,
COUNT() AS EVENTCOUNT,
TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’) AS WINDOW_HOUR
FROM
STRTRACKERDATA_TRIM
GROUP BY REASON, RMSEVENTTYPE, DEVICEID,TIMESTAMPTOSTRING(ROWTIME, ‘yyyy-MM-dd’,‘Asia/Dubai’)
HAVING (
(reason=1096 AND count(
)>(37524))
OR (reason=1098 AND count(
)>(1024))
OR (reason=1097 AND count(
)>(424))
OR (reason NOT IN(1096,1098,1097) AND count(
)>(8*24))) emit final;

select * from rmsanalysis_withoutwindowing where window_hour = ‘2024-01-26’ and eventcount > 10000 emit changes;
±------------------±------------------±------------------±------------------±------------------±------------------±------------------+
REASON RMSEVENTTYPE DEVICEID WINDOW_HOUR DEVICE_ID DEVICE_REASON DEVICE_RMSEVENTTYPE EVENTCOUNT
±------------------±------------------±------------------±------------------±------------------±------------------±------------------+
1102 “accident” 869395034886220 2024-01-26 869395034886220 1102 “accident” 71840
1096 none 864004044727409 2024-01-26 864004044727409 1096 none 55813
1096 none 868259024679634 2024-01-26 868259024679634 1096 none 13476
1098 none 359804081928998 2024-01-26 359804081928998 1098 none 41384
1096 none 864004044727235 2024-01-26 864004044727235 1096 none 10258
1096 none 864004044732086 2024-01-26 864004044732086 1096 none 16627
1098 none 359804081279582 2024-01-26 359804081279582 1098 none 17590

Actual behaviour:
When we are running the same query for the windowed table we are not getting the records with eventcount > 10000 at all.

select * from tblrmsabnormalevent_test where window_hour = ‘2024-01-26’ and eventcount > 10000 emit changes;
±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------+
REASON RMSEVENTTYPE DEVICEID WINDOW_HOUR WINDOWSTART WINDOWEND DEVICE_ID DEVICE_REASO DEVICE_RMSEV EVENTCOUNT WINDOW_START WINDOW_END
N ENTTYPE
±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------±-----------+
Query terminated

when eventcount> 2000 we are getting below ouput.

We don’t understand where the issue was and why there is so much data Discrepancy. On running the aggregation we are seeing that aggregated values for the columns are coming incorrectly. The aggregated values seems to be lesser than the actual values expected.

This topic was automatically closed after 30 days. New replies are no longer allowed.