Hi,
Given a stream, dataStream (id BIGINT, data VARCHAR), how can a new ID (newId) be generated?
The only solution we found is to CTAS, group by ID and generate a new ID enclosed in earliest_by_offset.
The next problem we ran into is a timing problem. If we CSAS from the incoming dataStream and join to the mapping table, then the message is never emitted as the table is not updated by the time the CSAS query is triggered. We can also not JOIN WITHIN in a stream-table join.
We kind of solved the problem by including the data payload in the mapping table and then creating a stream on the table changelog topic.
CREATE TABLE IdMappingTable
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='IdMapping')
AS SELECT
id,
EARLIEST_BY_OFFSET(uuid()) AS newId,
LATEST_BY_OFFSET(data) AS data
FROM dataStream
GROUP BY id;
CREATE STREAM IdMappingStream(id BIGINT, newId VARCHAR, data VARCHAR)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='IdMapping');
I have some concerns about this solution:
-
Is creating a stream on the changelog topic safe?
-
This pattern creates a persistent store of the entire data payload to work around a timing issue. The data we are working with has about 2.5mil unique IDs and the latest data for all 2.5mil IDs will be stored to work around a timing issue.
Question:
Is there a better way to solve this problem that doesn’t require persistence of the entire data payload, only the key mapping, and avoids/handles the race condition?