ExtractField$Key SMT fails after processing entire table

I’m creating a source connector that uses Debezium to capture all changes made to a DB table. I’m attempting to use org.apache.kafka.connect.transforms.ExtractField$Key to convert the message key format to a single Long representing the “id” field of the table, so that I can easily join tables together by their ID fields.

I’m creating the connector with:

curl --location --request PUT 'http://127.0.0.1:8083/connectors/test_connector/config' \
--header 'Content-Type: application/json; charset=UTF-8' \
--data '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "redacted",
"database.dbname": "redacted",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "redacted:9092",
"database.history.kafka.topic": "dbhistory.redacted",
"database.server.name": "redacted",
"database.port": "5432",
"plugin.name": "pgoutput",
"heartbeat.interval.ms": "10000",
"table.include.list": "public.mytable",
"database.hostname": "localhost",
"database.password": "redacted",
"name": "test_connector",
"transforms": "extractIdForKey",
"transforms.extractIdForKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractIdForKey.field": "id",
"key.converter": "org.apache.kafka.connect.converters.LongConverter",
"slot.name": "test_connector",
"publication.name": "test_connector",
"publication.autocreate.mode": "filtered"
}'

When I create the connector, it correctly remaps the “id” field of each row as a Long, and produces a message for every row in the table. After it’s correctly processed every row in the table, the connector outputs this error and dies:

[ld-kafka-connect-con] [2022-03-31 16:56:25,061] ERROR WorkerSourceTask{id=test_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [ld-kafka-connect-con]
 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:354) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) [ld-kafka-connect-con]
 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [ld-kafka-connect-con]
 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) [ld-kafka-connect-con]
 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [ld-kafka-connect-con]
 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [ld-kafka-connect-con]
 	at java.base/java.lang.Thread.run(Thread.java:829) [ld-kafka-connect-con] Caused by: java.lang.IllegalArgumentException: Unknown field: id [ld-kafka-connect-con]
 	at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) [ld-kafka-connect-con]
 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) [ld-kafka-connect-con]
 	... 11 more [ld-kafka-connect-con] [2022-03-31 16:56:25,061]
 INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask)

I’ve tested with CP 6.2.2 and 7.0.1, Postgres 11 and 14, and all Debezium versions between 1.2 and 1.8 inclusive; this behavior is consistent across all of these versions.

When I create the connector without the transformation, it doesn’t output any more messages than it does with the transformation, and it wouldn’t make sense for the offending messages to be tombstone messages because a tombstone message without a valid key is pointless. After a couple days of research, I haven’t been able to find any info about these “ghost messages” that have no “id” field.

Is there something obviously wrong with my connector or SMT config, or is this possibly a bug in kafka-connect or debezium?

One other possibly-relevant piece of information is that, when I instead implement this with a separate function that rekeys the messages based on the “id” field, in such a way that it would fail if the “id” field doesn’t exist, there are no errors. It seems like kafka-connect is attempting to process a message that doesn’t exist when it hits the end of the topic, rather than sitting idle until another message shows up. The separate function is usable for now, but feels far inferior to using the ExtractField SMT and avoiding the extra function and extra conversion.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.