How to improve performance for JDBC sink connector attempting INSERT IGNORE operations on the RDS?

Hello everyone, I hope you are doing well.

I am using a JDBC sink connector to sync 500 million JSONs into MySQL. Roughly 40 topics (50 partitions each) & 40 tables.

Requirement: All the tables have a single primary key (called ssn_id) & I am supposed to insert a record into the table only if the value of “ssn_id” does not exist in the table. If the record does exist, I am supposed to ignore the change. There should be no update operation performed on the row. I need to perform this operation on a combination of 500 million records daily.

Here’s the configurations I am using:

{
“connection.url”: “jdbc:mysql://[endpoint]/[DB]?useServerPrepStmts=false&rewriteBatchedStatements=true”,
“connection.password”: “[password]”,
“connection.user”: “[user]”,
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“errors.tolerance”: “all”,
“insert.mode”: “insert”,
“pk.fields”: “ssn”,
“pk.mode”: “record_value”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”: “true”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “true”,
“tasks.max”: 50,
“auto.create”: “true”,
“auto.evolve”: “true”,
“batch.size”: “10000”,
“topics.regex”: “[regex]”
}

My thought process for the configuraton: insert.mode set to INSERT ensures that we will only be performing inserts into the table. pk.mode & pk.fields set to record_value & ssn_id ensures that an insert would only happen if the ssn_id value is new for the table.

Problem: The performance of this connector is around 2% of the same connector in UPSERT mode.

For example, while benchmarking with a preexisting lag of 100 million, we got insertion rates of 2.1 million records per minute (on an average) if the insert.mode was set to UPSERT. But if we benchmark the connector with insert.mode set to INSERT, the insertion rates come down to roughly 50,000 records per minute.

My Assessment of the slowdown: I think the root cause of this problem is due to presence of duplicate ssn_id’s in a batch of records or rather existence of ssn_id’s, which already exist in the RDS table, in a batch of records that’s supposed to be inserted into the RDS. This duplicate ssn_id prevents a “batch insert” from being executed successfully & leads to one-by-one insertion into the RDS, which I think is the slowed form of insertion.

Example exception:

[2024-09-28 18:38:49,793] WARN [HOME_BASE_JDBC_SINK_TEST_INSERT_IGNORE_CONNECTOR_003|task-0] Write of 10 records failed, remainingRetries=0 (io.confluent.connect.jdbc.sink.JdbcSinkTask:101)
java.sql.BatchUpdateException: Duplicate entry ‘sample_ssn’ for key ‘TEST_INSERT_IGNORE.PRIMARY’
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
at com.mysql.cj.util.Util.handleNewInstance(Util.java:192)
at com.mysql.cj.util.Util.getInstance(Util.java:167)
at com.mysql.cj.util.Util.getInstance(Util.java:174)
at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:816)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:418)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
at io.confluent.connect.jdbc.sink.BufferedRecords.executeUpdates(BufferedRecords.java:196)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:186)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘sample_ssn’ for key ‘TEST_INSERT_IGNORE.PRIMARY’
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:916)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1061)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchSerially(ClientPreparedStatement.java:795)
… 17 more

The above logs point at a SQLIntegrityConstraintViolationException leading to failure of the insertion of the batch. That is, batch update of 10 records failed because a duplicate entry “sample_ssn” was found for the column ssn_id which is a primary key in table TEST_INSERT_IGNORE.

Call to executeBatch failed & now the connector will rollback the operation [1] & then will attempt to the rewrite each record present in the batch one-by-one & perform commits along the way [2].

[1] kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java at 5c8bacdcb82109d6b1930668d595f37579facf2e · confluentinc/kafka-connect-jdbc · GitHub
[2] kafka-connect-jdbc/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java at 5c8bacdcb82109d6b1930668d595f37579facf2e · confluentinc/kafka-connect-jdbc · GitHub

I think this is the reason that INSERT/IGNORE operations with pk constraints are very slow on MySQL compared to UPSERTS with pk constraints.

My Question:
Is there a performance efficient way to do insert/ignore operations with the JDBC Sink Connector than the configurations I mentioned above?

I have a feeling that I might have missed something, maybe some combination of connector properties that perform better. I have tried doing INSERT/IGNORE with MongoDB & the performance drop is around 30%, not as drastic.

If there are any questions, please let me know.

Thank you.