Adding Primary Key in Json kafka message

I am creating a json message inside my app and sending it to kafka. Once in kafka the sink connector is picking it up and sending it to postgresql. Everything is working great with insert and update as long as my connector settings are pk.mode=record_value and delete.enabled=false.

However, I now want to delete records from postgresql and I’ve changed my settings to pk.mode=record_key and delete.enabeled=true. When I try and add a record I get this message:
Sink connector 'local-jdbc-sink' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='bin',partition=0,offset=7,timestamp=1647557863533) with a null key and string key schema

I’ve been searching for how to add the key to the json message and have not found any examples. Since I’m new at this, I’m not sure where to go to get good documentation on the json structure with the key, if that’s the issue. I’ve seen this documentation: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html, but it doesn’t give any examples. Or, am I missing something that is the real fix?

Below is my sink connector properties and a sample json that works when set to record_value. Any direction or json sample on how to add the key, which is “prrowid” in the json, would be very much appreciated!

name=local-jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.url=jdbc:postgresql://postgres:5432/sports?useSSL=false
connection.user=postgres
connection.password=********

topics=bin
insert.mode=upsert
dialect.name=PostgreSqlDatabaseDialect
pk.mode=record_key
pk.fields=prrowid
delete.enabled=true
auto.create=true
key.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

Json:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "prrowid"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "WarehouseNum"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "Itemnum"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "Qty"
      },
      {
        "type": "int32",
        "optional": true,
        "field": "BinNum"
      },
      {
        "type": "string",
        "optional": true,
        "field": "BinName"
      },
      {
        "type": "string",
        "optional": false,
        "field": "pro2srcpdb"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "pro2created"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "pro2modified"
      }
    ],
    "optional": false,
    "name": "Bin"
  },
  "payload": {
    "WarehouseNum": 1,
    "Itemnum": 1,
    "Qty": 9304,
    "BinNum": 716,
    "BinName": "452917-D",
    "prrowid": "0x0000000000000068",
    "pro2srcpdb": "mydb",
    "pro2created": 58848401,
    "pro2modified": 58848401
  }
}

The error message explains the problem, if you parse out some of the text:

requires records with a non-null key … but found record … with a null key

This should cover what you need to know. The tl;dr is that when your app produces data to Kafka, is needs to set a key. At the moment it looks like it’s just setting a value.

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.