This is my first time working with Kafka connector.
I am trying to achieve the following usecase:
Can someone please help me improve the connector configuration?
Data Sink : Kafka Topic - > SQL Server Database
Kafka topic has the following fields:
method
personNumber
personId
pmPersonNumber
Approved
ApprovedDate
comments
modifiedBy
SQL query explaining what we are trying to achieve through the sink:
UPDATE db.AdminDetail
SET AB_Approved = event.Approved, AB_Approved_On = event.ApprovedDate, UpdatedBy = event.personId, UpdatedOn = date(), comments = event.comments
WHERE Admin = event.personNumber;
“UpdatedOn” column gets updated with the value of date and time on which the sink happened.
Kafka JDBC Connector Configuration:
{
"name": "abc.persondata.jdbcsink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"insert.mode": "upsert",
"key.converter": “org.apache.kafka.connect.storage.StringConverter” ,
"value.converter": “io.confluent.connect.avro.AvroConverter” ,
"value.converter.schema.registry.url":"http://ab-1111111-001.sdi.corp.anyorg.com:8081",
"connection.url": "jdbc:sqlserver://xxxx-xx-xxx.anyorg.com:15000;databaseName=ABC",
"connection.user": "**",
"connection.password": "**",
"kafka.bootstrap.servers":"ab-1111111-001.sdi.corp.anyorg.com:9095",
"dialect.name": "SqlServerDatabaseDialect",
"errors.tolerance": "none",
"table.name.format":" AdminDetail",
"pk.mode":"record_value",
"pk.fields":"Admin",
"transforms": "renameFields, selectFields, insertMessageDate, convertDate",
"transforms.renameFields.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.renameFields.renames":"Approved:AM_Approved, ApprovedDate:AM_Approved_On, personId:UpdatedBy, personNumber:Admin",
"transforms.selectFields.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.selectFields.whitelist":"AB_Approved,AB_Approved_On,UpdatedBy,comments",
"transforms.insertMessageDate.type:"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertMessageDate.timestamp.field":"UpdatedOn", * how can I get this column value updated on database side?*
"transforms.convertDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", * **not sure** *
"transforms.convertDate.format":"YYYY-MM-DD hh:mm:ss", * **format I need for UpdatedOn value** *
"transforms.convertDate.field":"UpdatedOn",
"transforms.convertDate.target.type":"TimeStamp"
"topics": " abc.persondata"
}