Tumbling Window giving incorrect aggregated results

KSQL Windowing (Tumbling Window) is used for aggregating incoming stream of data. 5 min window is defined with a grace period of 30 mins.
The KSQL scripts used to achieve the use case is attached. It is observed that on running aggregation continuously for long periods the aggregated values seem to be calculated incorrectly for few windows randomly.

Data set for reproducing the issue is attached. Data set is prepared/loaded as below:
10K records are loaded for a duration of 40 Mins. Loading interval between each file is 20 seconds. So for every 5 mins a total of 150000 records are loaded. The data is prepared in such a way that records loaded in 5mins interval (150000) all fall under a 5 min interval (Ex: 10:00 - 10:05). The dimension used for aggregation has a granularity of 10 unique values. So when a 5 min window is aggregated expected record count per 5 min window is 10.

On running the aggregation we are seeing that aggregated values for the columns are coming incorrectly for few partitions randomly. The aggregated values seems to be lesser than the actual values expected.

KSQL Scripts for the aggregation flow
CREATE STREAM RT_SMS_RAW_1
(EVENT_TIME STRING, IMSI STRING,MSISDN STRING,IMEI STRING,MCC STRING,MNC STRING,SAC_ID BIGINT,CELL_ID BIGINT,LAC BIGINT,CLEAR_CODE_ID BIGINT,DELIVERY_TIME STRING,RADIO_ACCESS_INFO BIGINT,SMS_TYPE BIGINT,SMS_LENGTH BIGINT,ORIG_CALLING_NUM STRING,CALLED_NUMBER STRING,INCOMING_TIME STRING,SOURCE_ID STRING,SMS_SERV_CENTER_ADDR_NUMBER STRING,SMS_MSC_TYPE BIGINT,SMS_BSC_ID STRING,SMS_RNC_ID STRING,SMS_MSC_ID STRING,STAGE_TYPE STRING,MVNO_ID BIGINT,SMS_START_TIME STRING,SMS_RP_ORIGINATING_ADDRESS STRING,SMS_RP_ORIGINATING_ADDRESS_NOA BIGINT,SMS_RP_ORIGINATING_ADDRESS_NP BIGINT,SMS_RP_DESTINATION_ADDRESS STRING,SMS_RP_DESTINATION_ADDRESS_NOA BIGINT,SMS_RP_DESTINATION_ADDRESS_NP BIGINT,SMS_TP_ORIGINATING_ADDRESS STRING,SMS_TP_ORIGINATING_ADDRESS_NOA BIGINT,SMS_TP_ORIGINATING_ADDRESS_NP BIGINT,SMS_TP_DESTINATION_ADDRESS STRING,SMS_TP_DESTINATION_ADDRESS_NOA BIGINT,SMS_TP_DESTINATION_ADDRESS_NP BIGINT,SMS_MESSAGE_REFERENCE_NUMBER BIGINT,SMS_IE_IDENTIFIER STRING,SMS_CONCATENATED_REFERENCE BIGINT,SMS_CONCATENATED_MAX BIGINT,SMS_CONCATENATED_SEQ BIGINT,SMS_MORE_MESSAGES_TO_SEND BIGINT,SMS_RP_CAUSE BIGINT,SMS_CP_CAUSE BIGINT,SMS_TP_CAUSE BIGINT,SMS_SMSC_TIME STRING,SMS_RP_ACK_TIME STRING,PCSCF_ID STRING,ENB_ID STRING,UUID STRING,PROCESS_SEQUENCE_NUMBER STRING,SIP_START_TIME STRING,SIP_END_TIME STRING,CALL_ID STRING,PROBE_SESSION_ID STRING,REPORT_REASON STRING,USER_AGENT STRING, EXPIRATION BIGINT,MCC_MNC_IMSI STRING,DEVICE_TAC STRING, RETRANSMISSION_COUNT BIGINT,LAST_200_OK_TIME STRING,FROM_URI_USERNAME STRING,TO_URI_USERNAME STRING,FROM_URI_HOST STRING,TO_URI_HOST STRING,FROM_URI_TELEPHONE_SUBSCRIBER STRING,TO_URI_TELEPHONE_SUBSCRIBER STRING,LAST_REASON_PHRASE STRING, SETUP_DURATION BIGINT, STATUS BIGINT,SOURCE_IP_ADDRESS STRING,SOURCE_PORT BIGINT,DESTINATION_IP_ADDRESS STRING,DESTINATION_PORT BIGINT,CSEQ BIGINT,VERSION STRING,P_PREFERRED_USER_ID STRING,VIA_BRANCH STRING,ROUTE STRING,INTERFACE_CODE BIGINT,GTP_CAUSE_CODE BIGINT,GTP_RECORD_TYPE BIGINT,GTP_SESSION_ID STRING,GTP_SIGNALLING_START_TIME STRING,GTP_SOURCE_IP_ADDRESS STRING,GTP_DESTINATION_IP_ADDRESS STRING,ACCESS_TYPE_FIRST_PANI_A STRING,ACCESS_TYPE_FIRST_PANI_B STRING,FIRST_TRYING_TIME STRING,MO_MT_TYPE BIGINT,P_ACCESS_NETWORK_INFO_FIRST_A STRING,P_ACCESS_NETWORK_INFO_FIRST_B STRING,P_ASSERTED_ID STRING,P_CHARGING_VECTOR_ICID STRING,P_CHARGING_VECTOR_ORIGIN_IOI STRING,P_CHARGING_VECTOR_TERM_IOI STRING,P_CHARGING_VECTOR_TRANSIT_IOI STRING,FIRST_REASON STRING,REASON_TEXT STRING,SIP_REASON_HEADER_CODE BIGINT,VLAN_PCP BIGINT,VLAN_CFI BIGINT,VLAN_VID BIGINT,WLAN_NODE_ID_FIRST_PANI_A STRING,WLAN_NODE_ID_FIRST_PANI_B STRING)
WITH (KAFKA_TOPIC=‘sms.data’,VALUE_FORMAT=‘AVRO’,TIMESTAMP=‘EVENT_TIME’,TIMESTAMP_FORMAT=‘yyyy-MM-dd HH:mm:ss.SSS’);

CREATE STREAM RT_SMS_DENORM_1 WITH (KAFKA_TOPIC=‘rt_sms_denorm_1_t’,TIMESTAMP=‘EVENT_TIME’,TIMESTAMP_FORMAT=‘yyyy-MM-dd HH:mm:ss.SSS’,PARTITIONS=25) AS SELECT FORMAT_TIMESTAMP(CONVERT_TZ(PARSE_TIMESTAMP(s.EVENT_TIME, ‘yyyy-MM-dd HH:mm:ss.SSS’),‘LOCAL_TZ’,‘UTC’),‘yyyy-MM-dd HH:mm:ss.SSS’) AS EVENT_TIME, S.CLEAR_CODE_ID AS CLEAR_CODE_ID, S.PCSCF_ID AS PCSCF_ID, S.SMS_SERV_CENTER_ADDR_NUMBER AS SMS_SERV_CENTER_ADDR_NUMBER, S.SMS_TYPE AS SMS_TYPE, S.RADIO_ACCESS_INFO AS RADIO_ACCESS_INFO, (CASE WHEN e.CLEAR_CODE_STATUS = ‘fail’ THEN 1 ELSE 0 END) AS NUMBER_OF_SMS_FAILURES, (CASE WHEN s.SMS_TYPE=0 AND e.CLEAR_CODE_STATUS = ‘success’ THEN (UNIX_TIMESTAMP(PARSE_TIMESTAMP(DELIVERY_TIME, ‘yyyy-MM-dd HH:mm:ss.SSS’))-UNIX_TIMESTAMP(PARSE_TIMESTAMP(INCOMING_TIME, ‘yyyy-MM-dd HH:mm:ss.SSS’)))/1000 ELSE NULL END) AS MO_DELIVERY_TIME, (CASE WHEN s.SMS_TYPE=2 AND e.CLEAR_CODE_STATUS = ‘success’ THEN (UNIX_TIMESTAMP(PARSE_TIMESTAMP(DELIVERY_TIME, ‘yyyy-MM-dd HH:mm:ss.SSS’))-UNIX_TIMESTAMP(PARSE_TIMESTAMP(INCOMING_TIME, ‘yyyy-MM-dd HH:mm:ss.SSS’)))/1000 ELSE NULL END) AS MT_DELIVERY_TIME, (CASE WHEN s.SMS_TYPE=0 AND e.CLEAR_CODE_STATUS = ‘success’ THEN 1 ELSE 0 END) AS MO_TOTAL_SUCCESS_SMS, (CASE WHEN s.SMS_TYPE=2 AND e.CLEAR_CODE_STATUS = ‘success’ THEN 1 ELSE 0 END) AS MT_TOTAL_SUCCESS_SMS FROM RT_SMS_RAW_1 S LEFT OUTER JOIN RT_DIM_CLEAR_CODE_1 E ON S.CLEAR_CODE_ID=E.CLEAR_CODE_ID EMIT CHANGES;

CREATE TABLE RT_SMS_PCSCF_4G_1_5MIN WITH (KEY_FORMAT=‘AVRO’,KAFKA_TOPIC=‘rt_sms_pcscf_4g_1_t’,VALUE_FORMAT=‘AVRO’,PARTITIONS=25) AS SELECT CAST(WINDOWSTART AS STRING) AS WINDOW_START_TIME, ‘P-CSCF’ AS NETWORK_ELEMENT_TYPE, CASE WHEN PCSCF_ID IS NULL THEN ‘NULL IN SOURCE’ ELSE PCSCF_ID END AS NETWORK_ELEMENT_ID, AS_VALUE(‘P-CSCF’) AS NE_TYPE, AS_VALUE(CASE WHEN PCSCF_ID IS NULL THEN ‘NULL IN SOURCE’ ELSE PCSCF_ID END) AS NE_ID, SUM(NUMBER_OF_SMS_FAILURES) AS NUMBER_OF_SMS_FAILURES, COUNT(*) AS NUMBER_OF_SMS, ROUND(SUM(MO_DELIVERY_TIME)) AS MO_DELIVERY_TIME, ROUND(SUM(MT_DELIVERY_TIME)) AS MT_DELIVERY_TIME, SUM(MO_TOTAL_SUCCESS_SMS) AS MO_TOTAL_SUCCESS_SMS, SUM(MT_TOTAL_SUCCESS_SMS) AS MT_TOTAL_SUCCESS_SMS FROM RT_SMS_DENORM_1 WINDOW TUMBLING(SIZE 5 MINUTE,RETENTION 6 HOURS, GRACE PERIOD 35 MINUTES) WHERE SMS_TYPE IN (0, 2) AND RADIO_ACCESS_INFO = 5 GROUP BY ‘P-CSCF’, CASE WHEN PCSCF_ID IS NULL THEN ‘NULL IN SOURCE’ ELSE PCSCF_ID END EMIT CHANGES;

CREATE STREAM RT_SMS_PCSCF_TS_4G_1_5MIN ( WINDOW_START_TIME STRING, NE_TYPE STRING, NE_ID STRING, NUMBER_OF_SMS_FAILURES BIGINT, NUMBER_OF_SMS BIGINT, MO_DELIVERY_TIME BIGINT, MT_DELIVERY_TIME BIGINT, MO_TOTAL_SUCCESS_SMS BIGINT, MT_TOTAL_SUCCESS_SMS BIGINT ) WITH(KAFKA_TOPIC=‘rt_sms_pcscf_4g_1_t’,VALUE_FORMAT=‘AVRO’,PARTITIONS=25);

create stream RT_SMS_PCSCF_REKEY_4G_1_5MIN WITH (KEY_FORMAT=‘KAFKA’,kafka_topic=‘rt_sms_pcscf_4g_rekey_1_t’,value_format=‘JSON’,partitions=25) as select concat (WINDOW_START_TIME + ‘:’ + NE_ID) as COMPOSITE_KEY, FORMAT_TIMESTAMP(FROM_UNIXTIME(CAST(WINDOW_START_TIME AS BIGINT)),‘yyyy-MM-dd HH:mm:ss.SSS’,‘LOCAL_TZ’) as report_time, ne_type, ne_id, number_of_sms_failures, number_of_sms, mo_delivery_time, mt_delivery_time, mo_total_success_sms, mt_total_success_sms from RT_SMS_PCSCF_TS_4G_1_5MIN partition by CONCAT(WINDOW_START_TIME + ‘:’ + NE_ID);

CREATE TABLE RT_SMS_SMSC_4G_1_5MIN WITH (KEY_FORMAT=‘AVRO’,KAFKA_TOPIC=‘rt_sms_smsc_4g_1_t’,VALUE_FORMAT=‘AVRO’,PARTITIONS=25) AS SELECT CAST(WINDOWSTART AS STRING) AS WINDOW_START_TIME, ‘SMSC’ AS NETWORK_ELEMENT_TYPE, CASE WHEN SMS_SERV_CENTER_ADDR_NUMBER IS NULL THEN ‘NULL IN SOURCE’ ELSE SMS_SERV_CENTER_ADDR_NUMBER END AS NETWORK_ELEMENT_ID, AS_VALUE(‘SMSC’) AS NE_TYPE, AS_VALUE(CASE WHEN SMS_SERV_CENTER_ADDR_NUMBER IS NULL THEN ‘NULL IN SOURCE’ ELSE SMS_SERV_CENTER_ADDR_NUMBER END) AS NE_ID, SUM(NUMBER_OF_SMS_FAILURES) AS NUMBER_OF_SMS_FAILURES, COUNT(*) AS NUMBER_OF_SMS, ROUND(SUM(MO_DELIVERY_TIME)) AS MO_DELIVERY_TIME, ROUND(SUM(MT_DELIVERY_TIME)) AS MT_DELIVERY_TIME, SUM(MO_TOTAL_SUCCESS_SMS) AS MO_TOTAL_SUCCESS_SMS, SUM(MT_TOTAL_SUCCESS_SMS) AS MT_TOTAL_SUCCESS_SMS FROM RT_SMS_DENORM_1 WINDOW TUMBLING(SIZE 5 MINUTE,RETENTION 6 HOURS, GRACE PERIOD 35 MINUTES) WHERE SMS_TYPE IN (0, 2) AND RADIO_ACCESS_INFO = 5 GROUP BY ‘SMSC’, CASE WHEN SMS_SERV_CENTER_ADDR_NUMBER IS NULL THEN ‘NULL IN SOURCE’ ELSE SMS_SERV_CENTER_ADDR_NUMBER END EMIT CHANGES;

CREATE STREAM RT_SMS_SMSC_TS_4G_1_5MIN ( WINDOW_START_TIME STRING, NE_TYPE STRING, NE_ID STRING, NUMBER_OF_SMS_FAILURES BIGINT, NUMBER_OF_SMS BIGINT, MO_DELIVERY_TIME BIGINT, MT_DELIVERY_TIME BIGINT, MO_TOTAL_SUCCESS_SMS BIGINT, MT_TOTAL_SUCCESS_SMS BIGINT ) WITH(KAFKA_TOPIC=‘rt_sms_smsc_4g_1_t’,VALUE_FORMAT=‘AVRO’,PARTITIONS=25);

create stream RT_SMS_SMSC_REKEY_4G_1_5MIN WITH (KEY_FORMAT=‘KAFKA’,kafka_topic=‘rt_sms_smsc_4g_rekey_1_t’,value_format=‘JSON’,partitions=25) as select concat(WINDOW_START_TIME + ‘:’ + NE_ID) as COMPOSITE_KEY, FORMAT_TIMESTAMP(FROM_UNIXTIME(CAST(WINDOW_START_TIME AS BIGINT)),‘yyyy-MM-dd HH:mm:ss.SSS’,‘LOCAL_TZ’) as report_time, ne_type, ne_id, number_of_sms_failures, number_of_sms, mo_delivery_time , mt_delivery_time , mo_total_success_sms, mt_total_success_sms from RT_SMS_SMSC_TS_4G_1_5MIN partition by concat (WINDOW_START_TIME + ‘:’ + NE_ID);

This topic was automatically closed after 30 days. New replies are no longer allowed.