Creating new column in sink database using primary_key from source database (debezium - jdbc) connectors

I created a cdc set up using debezium for source and jdbc as my sink connector. The process is working fine as expected. Now we’re facing some use case issue regarding existing data on both database but has different primary_key id. So with this we’re plannning on using an additional column (record_id) which will be populated by the id(pk) from the source and will be the basis for updates and delete operations.

What should I add in the configuration for sink-connector to achieve this:

{
“name”: “test_table-sink-connector”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “test_table”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “http://host.docker.internal:8081”,
“key.converter.schema.registry.url”: “http://host.docker.internal:8081”,
“connection.url”: “jdbc:postgresql://db:5432/db?user=user&password=changeme”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“auto.create”: “false”,
“auto.evolve”: “true”,
“insert.mode”: “upsert”,
“pk.mode”: “record_key”,
“transforms”: “addNewColumn”,
“table.name.format”: “${topic}”,
“delete.enabled”: true,
“delete.handling.mode”: “none”,
“delete.tombstone.handling.mode”: “drop”,
“transforms.addNewColumn.type”: “org.apache.kafka.connect.transforms.InsertField$Value”,
“transforms.addNewColumn.static.field”: “record_id”,
“transforms.addNewColumn.static.value”: “id”
}
}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.