i have a sink connector ( Kafka → sink jdbc → Postgres )running which was configured with the following.
insert.mode =insert, pk.mode =record_key
I now changed the sink connector to mode = upsert and pk.mode = record_value
pk.fields=spaceId,invitationId,statusId
After change ,getting the following error.
Batch entry 0 INSERT INTO “StatusHistory” (“spaceId”,“invitationId”,“statusId”,“timestamp”) VALUES (980,43149990,1,‘2022-01-18 13:47:59.403+00’) ON CONFLICT (“spaceId”,“invitationId”,“statusId”)
DO UPDATE SET “timestamp”=EXCLUDED.“timestamp” was aborted:
ERROR: there is no unique or exclusion constraint matching the ON CONFLICT specification
What we understood is that we need to give a unique key constraint (combination of 3 keys ) in the “StatusHistory” table also .Although we have specified the pk-fields in sink connector properties( “pk.fields -spaceId,invitationId,statusId”)
(Setting Unique key constraint to the existing table is a huge task , since table has millions of duplicate rows)
My question, is this the correct approach or is there a way to handle this in sink connector ?
why pk_fields scpedified is not working as expected ?