Hi, Kafka friends
I’m connecting a simple MySQL database to Kafka, but I’ve got the following error when the connector is running:
[2021-04-16 08:50:19,941] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="sys"."sys_config", query='null', topicPrefix='connect-mysql-demo-', incrementingColumn='id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:404)
java.sql.SQLSyntaxErrorException: Unknown column 'sys.sys_config.id' in 'where clause'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1003)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:200)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:159)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:371)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
The properties of the standalone worker is the following one:
worker.properties
bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000
The properties for connector is the following one:
mysql.properties
name=connect-source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/kafkademo?useSSL=false
connection.password=example
connection.user=root
mode=incrementing
incrementing.column.name=id
topic.prefix=connect-mysql-demo-
I’m running the connector using the folllowing command:
sudo connect-standalone.sh /usr/local/kafka/config/worker.properties /usr/local/kafka/config/mysql.properties
The connector is working correctly. I mean, I insert new records in MySQL database and they are capture and recorded to the topic in Kafka. But the above error is showing constantly for each certain frequency (I suppose is when connector is checking MySQL for new records). What is happening?