Dear all,
hope to get some help here in the community forum with regard to a question concerning kSQLDB in Confluent.
I am new to Confluent and also to kSQL and am using Confluent and kSQL for an academic project. I already have some experience in using MS SQL Server and kSQL seems to be quite similar in the most cases. Of course, some things aren’t possible that easy as in the SQL Server (due to beta status of kSQL).
Here we go with my use case:
My team and me are using some cryptocurrency data (price, marketshare, percent change etc.) using an open source API and we want to aggregate the data (especially the price data) per minute.
The message from the API contains a timestamp (BIGINT format) in milliseconds and we changed the timestamp to string format within our streams.
At the last step, we want to group the avg(priceusd) by timestamp (string) to get the results of the average price per each timestamp (shown as date and time per minute).
But the table always produces more than one entry per timestamp (which is in the group by clause). This especially happens when setting the auto.offset.reset to “latest” while streaming live data.
Using auto.offset.reset “earliest” to load old data from the underlying topic works fine (one entry per one timestamp as it shoul be via group by).
Here we go with the code for the final table including the group by clause:
CREATE TABLE COINCAP_Table WITH (KAFKA_TOPIC=‘Coincap_Table’, KEY_FORMAT=‘JSON’, PARTITIONS=1, REPLICAS=3, VALUE_FORMAT=‘JSON’) AS SELECT
data->symbol+‘,’+TIMESTAMP_FORMATTED as TIMESTAMP_SYMBOL_KEY,
AVG(data->priceusd) as AVG_priceusd,
AVG(data->volumeusd24hr) as AVG_volumeusd24hr,
AVG(data-> CHANGEPERCENT24HR) as AVG_CHANGEPERCENT24HR,
AVG(data->marketcapusd) as AVG_marketCapUsd
FROM COINCAP_STREAM2
GROUP BY data->symbol+‘,’+TIMESTAMP_FORMATTED
EMIT CHANGES;
PS: We have combined to columns to one (symbol and timestamp) to use the combined attribute for a join later. But that’s not the point.
The TIMEStAMP_FORMATTED was changed from BIGINT (in milliseconds) to STRING as follows:
TIMESTAMPTOSTRING(TIMESTAMP, ‘yyyy-MM-dd ‘‘at’’ HH:mm’) as TIMESTAMP_FORMATTED,
Does anyone know a solution to solve this issue and only get a single row per each key in the group by clause? Why does kSQL produce more than one row (sometimes 2 or 3) for each key attribute in the group by column?
Thanks for your help.
Best,
Sebastian