Error in SMT. Debezium MySQL CDC connector

I am trying to fetch simple CDC data for my table in MySQL which is having 3 columns. I want to make my id field as my Key of record. Basically I want to put this record into Ignite which needs data type of key not to be complex as Struct or Json. I have tried using StringConverter as key converter but it is giving me key in struct format such as “Struct{id=1}”. So I decided to use SMT but it’s giving me this error of field not found for my JSON CDC data.

Please suggest me the solution for it. I am using Confluent Community Edition 6.1.1. for this POC.


[2021-06-25 07:42:23,239] ERROR WorkerSourceTask{id=mysql-cdc-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        ... 11 more

Worker Configuration:

bootstrap.servers=kafka1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=2000
plugin.path=/usr/share/java,/home/ec2-user/confluent-6.1.1/share/java/kafka

Connector Configuration:

name=mysql-cdc-connect
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=mypassword
database.server.id=184054
database.server.name=OPTest9
database.include.list=mysql_db
database.history.kafka.bootstrap.servers=kafka1:9092
database.history.kafka.topic=dbhistory.Try
include.schema.changes=false
tasks.max=1
table.include.list=mysql_db.MyTest
column.include.list=mysql_db.MyTest.id,mysql_db.MyTest.name
database.allowPublicKeyRetrieval=true
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id

My CDC JSON record:

{
  "before": {
    "id": 3,
    "salary": 20000
  },
  "after": {
    "id": 3,
    "salary": 2000
  },
  "source": {
    "version": "1.5.0.Final",
    "connector": "mysql",
    "name": "haha",
    "ts_ms": 1624433940000,
    "snapshot": "false",
    "db": "mysql_db",
    "sequence": null,
    "table": "EmpSalary",
    "server_id": 1,
    "gtid": null,
    "file": "binlog.000011",
    "pos": 383,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "u",
  "ts_ms": 1624433940709,
  "transaction": null
}

Please Suggest Me some solution over this problem.
Thanks in advance.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.