I’m creating a source connector that uses Debezium to capture all changes made to a DB table. I’m attempting to use org.apache.kafka.connect.transforms.ExtractField$Key
to convert the message key format to a single Long representing the “id” field of the table, so that I can easily join tables together by their ID fields.
I’m creating the connector with:
curl --location --request PUT 'http://127.0.0.1:8083/connectors/test_connector/config' \
--header 'Content-Type: application/json; charset=UTF-8' \
--data '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "redacted",
"database.dbname": "redacted",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "redacted:9092",
"database.history.kafka.topic": "dbhistory.redacted",
"database.server.name": "redacted",
"database.port": "5432",
"plugin.name": "pgoutput",
"heartbeat.interval.ms": "10000",
"table.include.list": "public.mytable",
"database.hostname": "localhost",
"database.password": "redacted",
"name": "test_connector",
"transforms": "extractIdForKey",
"transforms.extractIdForKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractIdForKey.field": "id",
"key.converter": "org.apache.kafka.connect.converters.LongConverter",
"slot.name": "test_connector",
"publication.name": "test_connector",
"publication.autocreate.mode": "filtered"
}'
When I create the connector, it correctly remaps the “id” field of each row as a Long, and produces a message for every row in the table. After it’s correctly processed every row in the table, the connector outputs this error and dies:
[ld-kafka-connect-con] [2022-03-31 16:56:25,061] ERROR WorkerSourceTask{id=test_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [ld-kafka-connect-con]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:354) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) [ld-kafka-connect-con]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [ld-kafka-connect-con]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [ld-kafka-connect-con]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [ld-kafka-connect-con]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [ld-kafka-connect-con]
at java.base/java.lang.Thread.run(Thread.java:829) [ld-kafka-connect-con] Caused by: java.lang.IllegalArgumentException: Unknown field: id [ld-kafka-connect-con]
at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) [ld-kafka-connect-con]
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) [ld-kafka-connect-con]
... 11 more [ld-kafka-connect-con] [2022-03-31 16:56:25,061]
INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask)
I’ve tested with CP 6.2.2 and 7.0.1, Postgres 11 and 14, and all Debezium versions between 1.2 and 1.8 inclusive; this behavior is consistent across all of these versions.
When I create the connector without the transformation, it doesn’t output any more messages than it does with the transformation, and it wouldn’t make sense for the offending messages to be tombstone messages because a tombstone message without a valid key is pointless. After a couple days of research, I haven’t been able to find any info about these “ghost messages” that have no “id” field.
Is there something obviously wrong with my connector or SMT config, or is this possibly a bug in kafka-connect or debezium?