kSQL Data Streaming - grouping a stream's data on timestamp as string

Dear all,

hope to get some help here in the community forum with regard to a question concerning kSQLDB in Confluent.
I am new to Confluent and also to kSQL and am using Confluent and kSQL for an academic project. I already have some experience in using MS SQL Server and kSQL seems to be quite similar in the most cases. Of course, some things aren’t possible that easy as in the SQL Server (due to beta status of kSQL).

Here we go with my use case:
My team and me are using some cryptocurrency data (price, marketshare, percent change etc.) using an open source API and we want to aggregate the data (especially the price data) per minute.

The message from the API contains a timestamp (BIGINT format) in milliseconds and we changed the timestamp to string format within our streams.

At the last step, we want to group the avg(priceusd) by timestamp (string) to get the results of the average price per each timestamp (shown as date and time per minute).
But the table always produces more than one entry per timestamp (which is in the group by clause). This especially happens when setting the auto.offset.reset to “latest” while streaming live data.
Using auto.offset.reset “earliest” to load old data from the underlying topic works fine (one entry per one timestamp as it shoul be via group by).

Here we go with the code for the final table including the group by clause:

CREATE TABLE COINCAP_Table WITH (KAFKA_TOPIC=‘Coincap_Table’, KEY_FORMAT=‘JSON’, PARTITIONS=1, REPLICAS=3, VALUE_FORMAT=‘JSON’) AS SELECT
data->symbol+‘,’+TIMESTAMP_FORMATTED as TIMESTAMP_SYMBOL_KEY,
AVG(data->priceusd) as AVG_priceusd,
AVG(data->volumeusd24hr) as AVG_volumeusd24hr,
AVG(data-> CHANGEPERCENT24HR) as AVG_CHANGEPERCENT24HR,
AVG(data->marketcapusd) as AVG_marketCapUsd
FROM COINCAP_STREAM2
GROUP BY data->symbol+‘,’+TIMESTAMP_FORMATTED
EMIT CHANGES;

PS: We have combined to columns to one (symbol and timestamp) to use the combined attribute for a join later. But that’s not the point.

The TIMEStAMP_FORMATTED was changed from BIGINT (in milliseconds) to STRING as follows:
TIMESTAMPTOSTRING(TIMESTAMP, ‘yyyy-MM-dd ‘‘at’’ HH:mm’) as TIMESTAMP_FORMATTED,

Does anyone know a solution to solve this issue and only get a single row per each key in the group by clause? Why does kSQL produce more than one row (sometimes 2 or 3) for each key attribute in the group by column?

Thanks for your help.

Best,
Sebastian

some things aren’t possible that easy as in the SQL Server (due to beta status of kSQL)

ksqlDB is not in beta status, even if we did not reach 1.0 yet… Also note, that ksqlDB is not a relational database system and thus it’s quite different to SQL Server by design: we call it a “Streaming Database” for a reason :slight_smile:

To your question: As a streaming database, ksqlDB follows a continuous update model and there is no such thing as a “final result”: inputs are streams (either event streams or changelog streams for the TABLE case) and thus conceptually infinite. The result of a query is continuously updated, and the corresponding result changelog stream is written into the result table. Thus, ksqlDB does not produce duplicates but refinements.

Internally, ksqlDB uses caches that buffers updates for the same key, thus by default you don’t get every update/refinement of the result table in the output topic (and because caching is non-deterministic, you may see a different number of refinement in the output topic). If you disable caching (by setting the cache size to zero), you would see an output record for each input record.

ksqlDB is not built to run a query that terminates when the (current) end of the topic is reached: queries run forever and they will update the result when new input data arrives. – As I mentioned in the beginning, ksqlDB is not a relational database system, and works quite differently by design.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.