Greetings All,
I am trying to write into a sql server 2019 database that has tables organized into various sql server schema.
I can write to a test table in the dbo schema just fine (the implicit default), but when I try to write to a table within a non-default schema, an error is thrown in the connector. As I’ve configured it, the connector interprets the schema as the database name. E.g. from the logs:
(org.apache.kafka.connect.runtime.WorkerSinkTask)
edipa-sinks-connect | org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
edipa-sinks-connect | com.microsoft.sqlserver.jdbc.SQLServerException: Database 'en' does not exist. Make sure that the name is entered correctly.
edipa-sinks-connect |
edipa-sinks-connect | at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
edipa-sinks-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
edipa-sinks-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
edipa-sinks-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
edipa-sinks-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
edipa-sinks-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
edipa-sinks-connect | at java.base/java.lang.Thread.run(Thread.java:829)
edipa-sinks-connect | Caused by: java.sql.SQLException: Exception chain:
edipa-sinks-connect | com.microsoft.sqlserver.jdbc.SQLServerException: Database 'en' does not exist. Make sure that the name is entered correctly.
I’m using the JDBC Sink connector. From the connector-plugins endpoint:
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "5.5.2"
},
And here’s the config:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.password": "HackMe",
"topics": "test01,en.test01",
"transforms": "addSome",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"auto.evolve": "true",
"connection.user": "sa",
"transforms.addSome.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"name": "memberapi-connector",
"auto.create": "true",
"connection.url": "jdbc:sqlserver://memberapi:1433;databaseName=MemberApiDb;",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"insert.mode": "upsert",
"transforms.addSome.timestamp.field": "RECORD_TS",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"pk.mode": "record_key",
"pk.fields": "MESSAGE_KEY"
}
I checked the confluent docs for the jdbc sink properties, but nothing is immediately obvious. Maybe some magics with the table.name.format
?
Any responses appreciated.
Cheers
_T