JDBC Sink connector SMT on upsert mode

This is my first time working with Kafka connector.
I am trying to achieve the following usecase:
Can someone please help me improve the connector configuration?

Data Sink : Kafka Topic - > SQL Server Database

Kafka topic has the following fields:

method

personNumber

personId

pmPersonNumber

Approved

ApprovedDate

comments

modifiedBy

SQL query explaining what we are trying to achieve through the sink:

UPDATE db.AdminDetail

SET AB_Approved = event.Approved, AB_Approved_On = event.ApprovedDate, UpdatedBy = event.personId, UpdatedOn = date(), comments = event.comments

WHERE Admin = event.personNumber;

“UpdatedOn” column gets updated with the value of date and time on which the sink happened.

Kafka JDBC Connector Configuration:

{

"name": "abc.persondata.jdbcsink",

"config": {

"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

"tasks.max": "1",

"insert.mode": "upsert",

"key.converter": “org.apache.kafka.connect.storage.StringConverter” ,

"value.converter": “io.confluent.connect.avro.AvroConverter” ,

"value.converter.schema.registry.url":"http://ab-1111111-001.sdi.corp.anyorg.com:8081",

"connection.url": "jdbc:sqlserver://xxxx-xx-xxx.anyorg.com:15000;databaseName=ABC",

"connection.user": "**",

"connection.password": "**",

"kafka.bootstrap.servers":"ab-1111111-001.sdi.corp.anyorg.com:9095",

"dialect.name": "SqlServerDatabaseDialect",

"errors.tolerance": "none",

"table.name.format":" AdminDetail",

"pk.mode":"record_value",

"pk.fields":"Admin",

"transforms": "renameFields, selectFields, insertMessageDate, convertDate",

"transforms.renameFields.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.renameFields.renames":"Approved:AM_Approved, ApprovedDate:AM_Approved_On, personId:UpdatedBy, personNumber:Admin",

"transforms.selectFields.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",

"transforms.selectFields.whitelist":"AB_Approved,AB_Approved_On,UpdatedBy,comments",

"transforms.insertMessageDate.type:"org.apache.kafka.connect.transforms.InsertField$Value",

"transforms.insertMessageDate.timestamp.field":"UpdatedOn", * how can I get this column value updated on database side?*

"transforms.convertDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", * **not sure** *

"transforms.convertDate.format":"YYYY-MM-DD hh:mm:ss", * **format I need for UpdatedOn value** *

"transforms.convertDate.field":"UpdatedOn",

"transforms.convertDate.target.type":"TimeStamp"

"topics": " abc.persondata"

}

To do the UPDATE, what you’ve described looks mostly correct.

You’ve set "insert.mode": "upsert",, you’ve set "pk.mode":"record_value", and "pk.fields":"Admin", and renamed the source PersonNumber field to Admin with ReplaceField$Value (although you should include it in the whitelist since this will be excluding it as it stands.

If it’s not working, check out Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

It depends what timestamp you want to use.

  • In your sample SQL you say date() which implies the time at which the operation occurs.
  • You could just add a database trigger to do it on the RDBMS side of things.
  • If you want the timestamp of the Kafka message then InsertField$Value Single Message Transform which you have currently is the correct route. IIRC it should set the datatype correctly and not need the convertDate transform that you’ve also specified.

If either of these things aren’t working then can you provide the error that you get (or behaviour that you’re seeing) to help us troubleshoot further please.

1 Like

Thank you Robin for verifying the configuration for me.

After multiple testing I have found out that if I use Windows Active Directory user and pswd for kafka connector. The connector fails with " Login for user failed".

"connection.url": "jdbc:sqlserver://xxxx-xx-xxx.anyorg.com:15000;databaseName=ABC",
"connection.user": "",
"connection.password": "",
"dialect.name": "SqlServerDatabaseDialect",

As a work around I used SQLLogin credentials as user and pswd for the same SQL Server. I was about to get the data sink happening.

I wanted to check what Dialect dialect.name can be used for Windows Active Directory user and pswd?

I don’t know that, sorry. So far as I know, you need to use the generic JDBC URL for specifying credentials

Sorry, I fail to explain the issue that I have came across.

If I try to connect the SQL Server with Windows Authentication username and password. The connector fails and throw a login failure error.

but the connector works fine and does the data sink if SQL Server Login username and password is used in the connector

I am looking for a solution to use Windows Authentication username and password in connector to connect to SQL Server.

This is more of a question for your DBA really; it’s specific to SQL Server.

Based on a quick look on StackOverflow you could try adding ;integratedSecurity=true to the JDBC URL:

"connection.url": "jdbc:sqlserver://xxxx-xx-xxx.anyorg.com:15000;databaseName=ABC;integratedSecurity=true",

I am still getting errors.
I will contact DBA for it.
Thank you so much for your quick replies and support.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.