Writing to a SQLServer database schema from the JDBC Sink Connector

Greetings All,

I am trying to write into a sql server 2019 database that has tables organized into various sql server schema.

I can write to a test table in the dbo schema just fine (the implicit default), but when I try to write to a table within a non-default schema, an error is thrown in the connector. As I’ve configured it, the connector interprets the schema as the database name. E.g. from the logs:

(org.apache.kafka.connect.runtime.WorkerSinkTask)
edipa-sinks-connect    | org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain:
edipa-sinks-connect    | com.microsoft.sqlserver.jdbc.SQLServerException: Database 'en' does not exist. Make sure that the name is entered correctly.
edipa-sinks-connect    |
edipa-sinks-connect    |        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
edipa-sinks-connect    |        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
edipa-sinks-connect    |        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
edipa-sinks-connect    |        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
edipa-sinks-connect    |        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
edipa-sinks-connect    |        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
edipa-sinks-connect    |        at java.base/java.lang.Thread.run(Thread.java:829)
edipa-sinks-connect    | Caused by: java.sql.SQLException: Exception chain:
edipa-sinks-connect    | com.microsoft.sqlserver.jdbc.SQLServerException: Database 'en' does not exist. Make sure that the name is entered correctly.

I’m using the JDBC Sink connector. From the connector-plugins endpoint:

{
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "5.5.2"
  },

And here’s the config:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.password": "HackMe",
  "topics": "test01,en.test01",
  "transforms": "addSome",
  "value.converter.schema.registry.url": "http://schema-registry:8081",
  "auto.evolve": "true",
  "connection.user": "sa",
  "transforms.addSome.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "name": "memberapi-connector",
  "auto.create": "true",
  "connection.url": "jdbc:sqlserver://memberapi:1433;databaseName=MemberApiDb;",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "insert.mode": "upsert",
  "transforms.addSome.timestamp.field": "RECORD_TS",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "pk.mode": "record_key",
  "pk.fields": "MESSAGE_KEY"
}

I checked the confluent docs for the jdbc sink properties, but nothing is immediately obvious. Maybe some magics with the table.name.format?

Any responses appreciated.

Cheers
_T

I would have a look at table.name.format config and perhaps fully qualify (db.schema.table) the target. e.g.

table.name.format=mydb.${topic}

Alternatively if the topic name can’t simply be appended to a database prefix to be valid, look at Single Message Transform to manipulate the name e.g. with RegExRouter

Wonderful, table.name.format=mydb.${topic} worked perfectly on this issue.

I created the underlying stream with a topic named en.test01, (i.e. schema.table) so jdbc was able to parse the fully qualified table correctly. (i.e. MemberApiDb.en.test01)

Thanks Robin, you’re epic :))

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.