I am trying to fetch simple CDC data for my table in MySQL which is having 3 columns. I want to make my id field as my Key of record. Basically I want to put this record into Ignite which needs data type of key not to be complex as Struct or Json. I have tried using StringConverter as key converter but it is giving me key in struct format such as “Struct{id=1}”. So I decided to use SMT but it’s giving me this error of field not found for my JSON CDC data.
Please suggest me the solution for it. I am using Confluent Community Edition 6.1.1. for this POC.
[2021-06-25 07:42:23,239] ERROR WorkerSourceTask{id=mysql-cdc-connect-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 11 more
Worker Configuration:
bootstrap.servers=kafka1:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=2000
plugin.path=/usr/share/java,/home/ec2-user/confluent-6.1.1/share/java/kafka
Connector Configuration:
name=mysql-cdc-connect
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=127.0.0.1
database.port=3306
database.user=root
database.password=mypassword
database.server.id=184054
database.server.name=OPTest9
database.include.list=mysql_db
database.history.kafka.bootstrap.servers=kafka1:9092
database.history.kafka.topic=dbhistory.Try
include.schema.changes=false
tasks.max=1
table.include.list=mysql_db.MyTest
column.include.list=mysql_db.MyTest.id,mysql_db.MyTest.name
database.allowPublicKeyRetrieval=true
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id
My CDC JSON record:
{
"before": {
"id": 3,
"salary": 20000
},
"after": {
"id": 3,
"salary": 2000
},
"source": {
"version": "1.5.0.Final",
"connector": "mysql",
"name": "haha",
"ts_ms": 1624433940000,
"snapshot": "false",
"db": "mysql_db",
"sequence": null,
"table": "EmpSalary",
"server_id": 1,
"gtid": null,
"file": "binlog.000011",
"pos": 383,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1624433940709,
"transaction": null
}
Please Suggest Me some solution over this problem.
Thanks in advance.