I am creating a json message inside my app and sending it to kafka. Once in kafka the sink connector is picking it up and sending it to postgresql. Everything is working great with insert and update as long as my connector settings are pk.mode=record_value
and delete.enabled=false
.
However, I now want to delete records from postgresql and I’ve changed my settings to pk.mode=record_key
and delete.enabeled=true
. When I try and add a record I get this message:
“Sink connector 'local-jdbc-sink' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='bin',partition=0,offset=7,timestamp=1647557863533) with a null key and string key schema
”
I’ve been searching for how to add the key to the json message and have not found any examples. Since I’m new at this, I’m not sure where to go to get good documentation on the json structure with the key, if that’s the issue. I’ve seen this documentation: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html, but it doesn’t give any examples. Or, am I missing something that is the real fix?
Below is my sink connector properties and a sample json that works when set to record_value. Any direction or json sample on how to add the key, which is “prrowid
” in the json, would be very much appreciated!
name=local-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://postgres:5432/sports?useSSL=false
connection.user=postgres
connection.password=********
topics=bin
insert.mode=upsert
dialect.name=PostgreSqlDatabaseDialect
pk.mode=record_key
pk.fields=prrowid
delete.enabled=true
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
Json:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "prrowid"
},
{
"type": "int32",
"optional": true,
"field": "WarehouseNum"
},
{
"type": "int32",
"optional": true,
"field": "Itemnum"
},
{
"type": "int32",
"optional": true,
"field": "Qty"
},
{
"type": "int32",
"optional": true,
"field": "BinNum"
},
{
"type": "string",
"optional": true,
"field": "BinName"
},
{
"type": "string",
"optional": false,
"field": "pro2srcpdb"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "pro2created"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "pro2modified"
}
],
"optional": false,
"name": "Bin"
},
"payload": {
"WarehouseNum": 1,
"Itemnum": 1,
"Qty": 9304,
"BinNum": 716,
"BinName": "452917-D",
"prrowid": "0x0000000000000068",
"pro2srcpdb": "mydb",
"pro2created": 58848401,
"pro2modified": 58848401
}
}