JDBC Sink connector using DB Sequence for Primary key

Hi All,

I have one requirement to insert new records in data base using DB sequence to insert primary key(user_id) in the table. I am using Confluent JDBC sink connector to produce data from Kafka topic. Except primary key, rest all values are coming from AVRO.

I believe need to use “pk.mode”:“none”, but how to define sequence for the primary key while insert?

Example table

CREATE TABLE emp_msg_in (
	user_id serial PRIMARY KEY,
	username VARCHAR ( 50 ) UNIQUE NOT NULL,
	password VARCHAR ( 50 ) NOT NULL,
	email VARCHAR ( 255 ) UNIQUE NOT NULL,
	created_on TIMESTAMP NOT NULL,
        last_login TIMESTAMP 
);

Connector:

curl -X "POST" "http://localhost:18083/connectors/" -H "Content-Type: application/json" -d '{"name":"emp-decoder-msg-sink-db-connector","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"input.data.format":"AVRO",
"dialect.name":"PostgreSqlDatabaseDialect",
"connection.url":"jdbc:postgresql://postgres:5432/postgres?",
"connection.port":"5432","connection.user":"postgres",
"connection.password":"password",
"table.name.format":"emp_msg_in",
"topics":"lxa-dev-us-emp-decoder-sink",
"insert.mode":"INSERT",
"db.timezone":"UTC",
"auto.evolve":"true",
"pk.mode":"none",
"tasks.max":"1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":"true",
"key.converter.schema.registry.url":"http://schema-registry:8081",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"transforms":"t1,t2,t3",
"transforms.t1.type":"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames":"empHeader.messageTypeID:EMP_MSG_TYPE_ID,empHeader.owningScac:LOCO_OWNING_CUSTOMER,empHeader.flags:PROCESSED,empHeader.locoScac:LOCOMOTIVE_INITIAL,empHeader.roadNumber:ROAD_NUMBER,empHeader.source:EMP_SRC_ADDRESS,empHeader.destination:EMP_DEST_ADDRESS,empHeader.roadNumber:EMP_MESSAGE_NUMBER,empHeader.crc:MSG_CRC,empHeader.dataLength:MSG_SIZE",
"transforms.t3.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist":"messageTypeID,owningScac,flags,locoScac,roadNumber,source,destination,roadNumber,crc,dataLength"}}'

Schema used

{
  "type": "record",
  "name": "EMP",
  "namespace": "com.wabtec.lxa.protocol.spec",
  "fields": [
    {
      "name": "empHeader",
      "type": {
        "type": "record",
        "name": "EMPHeader",
        "fields": [
          {
            "name": "protocolVersion",
            "type": "int"
          },
          {
            "name": "messageTypeID",
            "type": "int"
          },
          {
            "name": "messageVersion",
            "type": "int"
          },
          {
            "name": "flags",
            "type": "int"
          },
          {
            "name": "dataLength",
            "type": "int"
          },
          {
            "name": "messageNumber",
            "type": "long"
          },
          {
            "name": "messageTimeStamp",
            "type": "long"
          },
          {
            "name": "dateTimeStamp",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "default": ""
          },
          {
            "name": "variableHeaderSize",
            "type": "int"
          },
          {
            "name": "timeToLive",
            "type": "long"
          },
          {
            "name": "routingQos",
            "type": "int"
          },
          {
            "name": "source",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            }
          },
          {
            "name": "destination",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            }
          },
          {
            "name": "crc",
            "type": "long"
          },
          {
            "name": "owningScac",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "default": ""
          },
          {
            "name": "locoScac",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            },
            "default": ""
          },
          {
            "name": "roadNumber",
            "type": "int",
            "default": 0
          }
        ]
      }
    },
    {
      "name": "empBody",
      "type": {
        "type": "record",
        "name": "EMPBody",
        "fields": [
          {
            "name": "payload",
            "type": {
              "type": "string",
              "avro.java.string": "String"
            }
          }
        ]
      }
    }
  ]
}

Thanks,
Sumit

Your table DDL doesn’t match your schema so I’m not quite sure what you’re trying to do currently. However, in principle if you omit a field from the schema sent by the JDBC Sink to the target database it will rely on the default value, or that provided by the sequence.

So in short; ensure that you drop the user_id field (using the ReplaceField Single Message Transform) from the payload that you send, and postgres should use the next sequence value for the inserted record.

Thanks for help, it seems PK got auto generated even though I didn’t pass it from configuration.