Sql server Sink Connector table name

I am using confluent kafka connector. I want to insert data into a specific table, TB_TEST_KAFKA, in the sink connector. I have already created the table.

auto.create=false.

I don’t know a properties table name key ​​of the sink connector. I’m trying to insert into specific table, TB_TEST_KAFKA,

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA

But error

Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
	at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	... 10 more

Is there any way to put it into the table??

did you try
table.name.format

If it(TB_TEST_KAFKA) is synonyms, do I need to set another setting??

mmh I think table.name.format should be sufficient

I mean is there a way to insert the data in a table different from the topic name???
And a table is synonyms
ex) topic name = test-topic
synonyms = TB_TEST_KAFKA
Table name = TB_TEST

ah I see
yes did this with the jdbc connector

    "topics": mytest_topic",
    "pk.mode":"none",
    "table.name.format": "postgres_test_topic"

yes, I get the same error even with the method you gave me.
I found a way.

transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA

This way the data entered the table as normal.
However, if the table(TB_TEST_KAFKA)is synonyms, the same error occurs.

[2021-08-11 07:34:41,034] ERROR WorkerSinkTask{id=mssql-sink-connector-02-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
	at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
[2021-08-11 07:34:41,034] ERROR WorkerSinkTask{id=mssql-sink-connector-02-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
	at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
	at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
	at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)

ok I see
I guess the table is already existing in the database? :wink:

just for curiosity:
what happens if you enable auto-creation for the topic?

yes, I have already created the table(TB_TEST_KAFKA).

and I set auto.create=false

auto-creation for the topic is default

I was talking about the auto create feature of the JDBC connector
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/sink_config_options.html#ddl-support

if you set auto.create to true the connector tries to create the table by
itself
would be nice to see what happens then

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.