How to rekey in KSQLDB when CDC can delete row and produce null values

Hi everyone, I am trying to replicate data in CDC from a postgres database to cassandra.
I have a PM table and an address table in postgres which form a single table in cassandra.
Moreover the primary key in postgres is a technical key when in cassandra the primary key corresponds to the functional key.
So to summarize I must first perform a join then a rekey on the join and I want to benefit from Primary-Key Table-Table Joins (left join) so i did this :

CREATE TABLE pm (
`id` VARCHAR PRIMARY KEY, // Technical id from postgres PK
`reference_pm_bytel` VARCHAR, // Functional id for cassandra PK
`reference_pm_oi` VARCHAR,
`etat_deploiement` VARCHAR,
...
`date_creation` VARCHAR,
`date_modification` VARCHAR
)
WITH (
    kafka_topic = 'testcdcdb.public.pm',
    value_format = 'avro'
);
CREATE TABLE adresse_pm (
`pm_id` VARCHAR PRIMARY KEY,
`hexacle` VARCHAR,
`rivoli` VARCHAR,
`ville` VARCHAR,
`code_postal` VARCHAR,
...
`date_creation` VARCHAR,
`date_modification` VARCHAR
)
WITH (
    kafka_topic = 'testcdcdb.public.adresse_pm',
    value_format = 'avro'
);

Then I did the left join :

CREATE TABLE PM_WITH_ADRESSE WITH (VALUE_FORMAT='AVRO') AS SELECT 
    P.`id`,
    P.`reference_pm_bytel`,
    P.`reference_pm_oi`,
    P.`etat_deploiement`,
    ...
    P.`date_creation`,
    P.`date_modification`,
    STRUCT(
    `hexacle` := A.`hexacle`,
    `rivoli` := A.`rivoli`,
    `ville` := A.`ville`,
    `code_postal` := A.`code_postal`,
    ...
    `date_creation` := A.`date_creation`,
    `date_modification` := A.`date_modification`) AS `adresse_pm`
    FROM pm P 
    LEFT JOIN adresse_pm A
    ON P.`id`=A.`pm_id`

Finally I rekey, so I need first to create a stream :

CREATE STREAM PM_WITH_ADRESSE_STREAM WITH (KAFKA_TOPIC='PM_WITH_ADRESSE', VALUE_FORMAT='AVRO');

then the partition by :

CREATE STREAM PM_WITH_ADRESSE_BY_REFERENCE_PM 
WITH (
    value_format = 'avro'
) AS SELECT * FROM PM_WITH_ADRESSE_STREAM PARTITION BY `reference_pm_bytel`;

It works very well for the insert and the update which replicates well in cassandra but not for the delete.
Indeed in the join topic I have a correct tombstone with

key: 1 value: null

but in the rekey topic (repartitioning) I have

key:null value: null

Is there a possible workaround to replicate the delete in this case?

1 Like

What you observe makes sense. For upstream insert/update records, you get <k,v> and thus you can extract the new key v – however, for <k,null> you don’t know that the original v was to create <v,null>

Maybe and aggregation using latest_by_offset can help? It allows you to keep the last to records. This way, to get access to <k,v> and <k,null> and thus can emit <v,null>?

Thank you for the answer.
I tried several solutions with latest_by_offset but without success.
Can you detail me a bit more how to emit <v,null> from <k,v> and <k,null>.
I can give you more details about my use case if you need.

This topic was automatically closed after 30 days. New replies are no longer allowed.