MySQL source error trying to read autoincrement field

Hi, Kafka friends

I’m connecting a simple MySQL database to Kafka, but I’ve got the following error when the connector is running:

[2021-04-16 08:50:19,941] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="sys"."sys_config", query='null', topicPrefix='connect-mysql-demo-', incrementingColumn='id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:404)
java.sql.SQLSyntaxErrorException: Unknown column 'sys.sys_config.id' in 'where clause'
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1003)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:200)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:159)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:371)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:289)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)

The properties of the standalone worker is the following one:

worker.properties

bootstrap.servers=127.0.0.1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter.schemas.enable=false
offset.storage.file.filename=standalone.offsets
offset.flush.interval.ms=10000

The properties for connector is the following one:

mysql.properties

name=connect-source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/kafkademo?useSSL=false
connection.password=example
connection.user=root
mode=incrementing
incrementing.column.name=id
topic.prefix=connect-mysql-demo-

I’m running the connector using the folllowing command:

sudo connect-standalone.sh /usr/local/kafka/config/worker.properties /usr/local/kafka/config/mysql.properties

The connector is working correctly. I mean, I insert new records in MySQL database and they are capture and recorded to the topic in Kafka. But the above error is showing constantly for each certain frequency (I suppose is when connector is checking MySQL for new records). What is happening?

@rafaelhernamperez Can you refer to this document and ensure the source table is compatible with Incrementing Column mode? I’m not certain but it seems like this configuration will attempt to load all tables (?), and the error Unknown column ‘sys.sys_config.id’ in ‘where clause’ implies to me that the Connect is trying to load the sys_config table, which you may or may not want.

Hi, Rick

You’re right with the link. The “mode” property is mandatory for Kafka Connect. I’ve tried to use ‘bulk’, but this mode is continuously sending all the records of the table to the Kafka Topic. In other hand, I’m not using any timestamp field on my table. For these reasons, I need to use the autoincrement properties, in order to send the new records from MySQL to the topic.

The error seems caused and launched by Kafka Connect, but the process is right. I don’t know how to avoid the error.

Thanks so much for your help.

I’m not at all a Connect expert, but I’m wondering why Connect is trying to load the sys_config table at all. I wonder if it has to do with the credentials you are using to connect to the database with. If that user is a root/admin of the DB, does that mean it can “discover” all tables including system level tables. I’m wondering if you could try a “whitelisting” only the tables you want to capture, or use a different user to isolate the permissions to certain tables.

1 Like

You’ve not specified table.whitelist, so the connector is trying to ingest every table that the user has access to, including sys_config.

Try adding table.whitelist and see if that fixes things.

1 Like

Hi, @rmoff

I’ve tried your suggestion. I’ve added to mysql.properties the following line:

table.whitelist=sys_config

also, I’ve tried the following:

table.whitelist=sys.sys_config

And the result is the exactly the same.

Thank you very much for your support.

Hi, again, @rmoff

I’m sorry. I’m understood your suggestion wrongly.

I’ve added the following:

table.whitelist=employees

And now is working well. I must to specify my table (not the system table that lauched the error).

Thank you very much for your help.

1 Like

Your’re right @rick . My example is for educational purposes, and I’ve used the root user for my database. In a real application, I must to define a database user with their constraints.

I’ve defined the whitelisting properties and works well.

Thanks so much.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.