Generate new ID for incoming data timing issue

Hi,

Given a stream, dataStream (id BIGINT, data VARCHAR), how can a new ID (newId) be generated?

The only solution we found is to CTAS, group by ID and generate a new ID enclosed in earliest_by_offset.

The next problem we ran into is a timing problem. If we CSAS from the incoming dataStream and join to the mapping table, then the message is never emitted as the table is not updated by the time the CSAS query is triggered. We can also not JOIN WITHIN in a stream-table join.

We kind of solved the problem by including the data payload in the mapping table and then creating a stream on the table changelog topic.

CREATE TABLE IdMappingTable
    WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='IdMapping')
    AS SELECT
        id,
        EARLIEST_BY_OFFSET(uuid()) AS newId,
        LATEST_BY_OFFSET(data) AS data
    FROM dataStream
    GROUP BY id;

CREATE STREAM IdMappingStream(id BIGINT, newId VARCHAR, data VARCHAR)
    WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='IdMapping');

I have some concerns about this solution:

  1. Is creating a stream on the changelog topic safe?

  2. This pattern creates a persistent store of the entire data payload to work around a timing issue. The data we are working with has about 2.5mil unique IDs and the latest data for all 2.5mil IDs will be stored to work around a timing issue.

Question:
Is there a better way to solve this problem that doesn’t require persistence of the entire data payload, only the key mapping, and avoids/handles the race condition?

Just realized that creating the mapping table with the IDs only and still creating a stream on the changelog will enable a CSAS from dataStream and JOIN WITHIN to the changelog stream.

Will try it this way but still feels hacky to have to create a stream on the changelog to get around a timing problem.

This topic was automatically closed after 30 days. New replies are no longer allowed.