JDBC Sink Error - Update count did not sum to total number of records inserted

Hi,

I have JDBC Sink Connector (with four tasks) that I am using to insert into a SQL Server database. The topic I am reading has about 54M messages. It successfully inserted around 6M rows into the target and then it started throwing errors similar to the following:

[2021-04-13 08:29:31,272] ERROR WorkerSinkTask{id=cdp_snk_member_coreInsert_0-3} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Update count (499) did not sum up to total number of records inserted (500) (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Update count (499) did not sum up to total number of records inserted (500)
	at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:194)
	at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:79)
	at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Yesterday, I could just restart the failed tasks and it would continue for a bit and insert more rows (10K - 100K) before failing again. I setup a Linux script (inspired by Automatically restarting failed Kafka Connect tasks) that checks for failed tasks periodically and restarts them. Today, it just fails immediately with the same error and does not insert any more rows.

The counts in the error message always appear to be the same:
Update count (**499**) did not sum up to total number of records inserted (**500**)

It is not clear to me what is causing this error. All of the messages in the source topic have unique primary keys. I tried adding a Dead Letter Queue to the connector but that did not make a difference. No messages went to the DLQ topic.

Does anybody have a suggestion?

Thanks!

What version of the connector are you using?

I am using Confluent 6.0.0

It could be worth installing the latest version of the connector (10.0.1) and seeing if that helps - that version also supports DLQ.

My next step would probably be to check the RDBMS logs for errors, and then bump up the Kafka Connect logging to TRACE and see if you can spot the problem in that - but with millions of messages I guess this could get a bit difficult. https://rmoff.dev/kc-dynamic-log-level could help out for targeting specific loggers.

Good suggestions! We will try them and let you know if we have other problems with it. Thank you!!!

One other question related to this. I noticed that it is inserting 500 rows at a time. I am thinking this is because the max.poll.records is the default, which is 500. I tried making that value larger in the connector with both ‘max.poll.records’ and ‘consumer.max.poll.records’ but neither worked. How can I change that property for the connector?

Also check out this for useful context from @rhauch: JDBC Sink fails on duplicate violation · Issue #327 · confluentinc/kafka-connect-jdbc · GitHub

Update - I recreated the table without the index and then the connector did not error. The insert speed also seemed to be much better as well - which isn’t really a surprise. I then created the index after the initial load was completed.

Glad it’s working. From what you describe it sounds like within your data you had some PK violations then?

We had a very small number of PK violations (5 of over 55M rows) because our source was a case sensitive Oracle table and our target was a case insensitive SQL Server table. The rows that were causing a problem were actually invalid and removed before creating the index.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.