Replication from rds postgres to mssgql

We are exploring an issue with RDS postgres as a source connector and mssql sink connector, we tried both json and non-json format but without luck, any help/hint would be highly appreciated.
Source postgres rds connector:
{
“name”: “inventory-connector”,
“config”: {
“tasks.max”: “1”,
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“plugin.name”: “pgoutput”,
“snapshot.mode”: “always”,
“topic.prefix”: “nonp”,
“database.hostname”: “",
“database.port”: “5432”,
“database.user”: "
",
“database.password”: "
",
“heartbeat.interval.ms”: “1500”,
“database.dbname”: "
",
“table.include.list”: “public.bitestevents”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“schema.include”: “public”,
“publication.autocreate.mode”: “filtered”,
“database.server.name”: "
***”
}
}

sink mssql connector:
{
“name”: “sqlsink-connector”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “",
“connection.url”: "
",
“connection.user”: "
",
“connection.password”: "
",
“table.name.format”: "
**”,
“heartbeat.interval.ms”: “1500”,
“insert.mode”: “upsert”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “false”,
“value.converter.schemas.enable”: “false”,
“pk.mode”: “record_value”,
“pk.fields”: “id”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“auto.create”: “true”
}
}

Error on Kafka-connect:
2023-01-20 14:12:14,098 ERROR || WorkerSinkTask{id=sqlsink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NullPointerException

hey @vladeli

did you check the kafka connect logfiles?

best,
michael

yes part of the logs from kafka connect was in my first post, here is complete error from kafka-connect:

2023-01-20 14:12:14,098 ERROR || WorkerSinkTask{id=sqlsink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null [org.apache.kafka.connect.runtime.WorkerSinkTask]
java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
2023-01-20 14:12:14,099 ERROR || WorkerSinkTask{id=sqlsink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:174)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:72)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
… 10 more
2023-01-20 14:12:14,101 INFO || Stopping task [io.confluent.connect.jdbc.sink.JdbcSinkTask]

mmh ok
did you check the status with a curl request to the api?

see

best,
michael

Yes i see two connectors, one for the source which is RDS Postgres and sink connector which is remote MSSQL, we tested replication from Postgres to Postgres and it works also very simple table from Postgres to MSSQL works but MSSQL can’t have PRIMARY KEY with type TEXT so from my point of view it throws this error, we need some help with transformation either in Json or without json if someone had similar experience

ok I see

nevertheless did you check the status of the connector?

so on the postgres side the column you have the pk is text right?

one thing to try is a SMT

id in postgres is uuid and postgres converter converts UUID type to STRING, and the MSSQL converters converts it into TEXT type, but because MSSQL cant have PRMARY KEY with type TEXT it throws this error at least from my understanding. I tried CAST with many options but cannot find proper one still…

never used it with sql server though it should work like this

I would go for a try with int or float instead of converting to string (as stated in the link above)

best,
michael

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.