Hi All,
I have one requirement to insert new records in data base using DB sequence to insert primary key(user_id) in the table. I am using Confluent JDBC sink connector to produce data from Kafka topic. Except primary key, rest all values are coming from AVRO.
I believe need to use “pk.mode”:“none”, but how to define sequence for the primary key while insert?
Example table
CREATE TABLE emp_msg_in (
user_id serial PRIMARY KEY,
username VARCHAR ( 50 ) UNIQUE NOT NULL,
password VARCHAR ( 50 ) NOT NULL,
email VARCHAR ( 255 ) UNIQUE NOT NULL,
created_on TIMESTAMP NOT NULL,
last_login TIMESTAMP
);
Connector:
curl -X "POST" "http://localhost:18083/connectors/" -H "Content-Type: application/json" -d '{"name":"emp-decoder-msg-sink-db-connector","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"input.data.format":"AVRO",
"dialect.name":"PostgreSqlDatabaseDialect",
"connection.url":"jdbc:postgresql://postgres:5432/postgres?",
"connection.port":"5432","connection.user":"postgres",
"connection.password":"password",
"table.name.format":"emp_msg_in",
"topics":"lxa-dev-us-emp-decoder-sink",
"insert.mode":"INSERT",
"db.timezone":"UTC",
"auto.evolve":"true",
"pk.mode":"none",
"tasks.max":"1",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":"true",
"key.converter.schema.registry.url":"http://schema-registry:8081",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"transforms":"t1,t2,t3",
"transforms.t1.type":"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames":"empHeader.messageTypeID:EMP_MSG_TYPE_ID,empHeader.owningScac:LOCO_OWNING_CUSTOMER,empHeader.flags:PROCESSED,empHeader.locoScac:LOCOMOTIVE_INITIAL,empHeader.roadNumber:ROAD_NUMBER,empHeader.source:EMP_SRC_ADDRESS,empHeader.destination:EMP_DEST_ADDRESS,empHeader.roadNumber:EMP_MESSAGE_NUMBER,empHeader.crc:MSG_CRC,empHeader.dataLength:MSG_SIZE",
"transforms.t3.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist":"messageTypeID,owningScac,flags,locoScac,roadNumber,source,destination,roadNumber,crc,dataLength"}}'
Schema used
{
"type": "record",
"name": "EMP",
"namespace": "com.wabtec.lxa.protocol.spec",
"fields": [
{
"name": "empHeader",
"type": {
"type": "record",
"name": "EMPHeader",
"fields": [
{
"name": "protocolVersion",
"type": "int"
},
{
"name": "messageTypeID",
"type": "int"
},
{
"name": "messageVersion",
"type": "int"
},
{
"name": "flags",
"type": "int"
},
{
"name": "dataLength",
"type": "int"
},
{
"name": "messageNumber",
"type": "long"
},
{
"name": "messageTimeStamp",
"type": "long"
},
{
"name": "dateTimeStamp",
"type": {
"type": "string",
"avro.java.string": "String"
},
"default": ""
},
{
"name": "variableHeaderSize",
"type": "int"
},
{
"name": "timeToLive",
"type": "long"
},
{
"name": "routingQos",
"type": "int"
},
{
"name": "source",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "destination",
"type": {
"type": "string",
"avro.java.string": "String"
}
},
{
"name": "crc",
"type": "long"
},
{
"name": "owningScac",
"type": {
"type": "string",
"avro.java.string": "String"
},
"default": ""
},
{
"name": "locoScac",
"type": {
"type": "string",
"avro.java.string": "String"
},
"default": ""
},
{
"name": "roadNumber",
"type": "int",
"default": 0
}
]
}
},
{
"name": "empBody",
"type": {
"type": "record",
"name": "EMPBody",
"fields": [
{
"name": "payload",
"type": {
"type": "string",
"avro.java.string": "String"
}
}
]
}
}
]
}
Thanks,
Sumit