Hi everyone, I am trying to replicate data in CDC from a postgres database to cassandra.
I have a PM table and an address table in postgres which form a single table in cassandra.
Moreover the primary key in postgres is a technical key when in cassandra the primary key corresponds to the functional key.
So to summarize I must first perform a join then a rekey on the join and I want to benefit from Primary-Key Table-Table Joins (left join) so i did this :
CREATE TABLE pm (
`id` VARCHAR PRIMARY KEY, // Technical id from postgres PK
`reference_pm_bytel` VARCHAR, // Functional id for cassandra PK
`reference_pm_oi` VARCHAR,
`etat_deploiement` VARCHAR,
...
`date_creation` VARCHAR,
`date_modification` VARCHAR
)
WITH (
kafka_topic = 'testcdcdb.public.pm',
value_format = 'avro'
);
CREATE TABLE adresse_pm (
`pm_id` VARCHAR PRIMARY KEY,
`hexacle` VARCHAR,
`rivoli` VARCHAR,
`ville` VARCHAR,
`code_postal` VARCHAR,
...
`date_creation` VARCHAR,
`date_modification` VARCHAR
)
WITH (
kafka_topic = 'testcdcdb.public.adresse_pm',
value_format = 'avro'
);
Then I did the left join :
CREATE TABLE PM_WITH_ADRESSE WITH (VALUE_FORMAT='AVRO') AS SELECT
P.`id`,
P.`reference_pm_bytel`,
P.`reference_pm_oi`,
P.`etat_deploiement`,
...
P.`date_creation`,
P.`date_modification`,
STRUCT(
`hexacle` := A.`hexacle`,
`rivoli` := A.`rivoli`,
`ville` := A.`ville`,
`code_postal` := A.`code_postal`,
...
`date_creation` := A.`date_creation`,
`date_modification` := A.`date_modification`) AS `adresse_pm`
FROM pm P
LEFT JOIN adresse_pm A
ON P.`id`=A.`pm_id`
Finally I rekey, so I need first to create a stream :
CREATE STREAM PM_WITH_ADRESSE_STREAM WITH (KAFKA_TOPIC='PM_WITH_ADRESSE', VALUE_FORMAT='AVRO');
then the partition by :
CREATE STREAM PM_WITH_ADRESSE_BY_REFERENCE_PM
WITH (
value_format = 'avro'
) AS SELECT * FROM PM_WITH_ADRESSE_STREAM PARTITION BY `reference_pm_bytel`;
It works very well for the insert and the update which replicates well in cassandra but not for the delete.
Indeed in the join topic I have a correct tombstone with
key: 1 value: null
but in the rekey topic (repartitioning) I have
key:null value: null
Is there a possible workaround to replicate the delete in this case?