SMT for Nested JSON record

How can I access nested fields of JSON record to put id field as a key of the record?
Suppose my JSON data is :

{
“before”: null,
“after”: {
“id”: 4,
“salary”: 5000
},
“source”: {
“version”: “1.5.0.Final”,
“connector”: “mysql”,
“name”: “Try-”,
“ts_ms”: 1623834752000,
“snapshot”: “false”,
“db”: “mysql_db”,
“sequence”: null,
“table”: “EmpSalary”,
“server_id”: 1,
“gtid”: null,
“file”: “binlog.000004”,
“pos”: 374,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “c”,
“ts_ms”: 1623834752982,
“transaction”: null
}

I want to set my id field in after section of this JSON as Key of the record. When I am trying to set it with ValuetoKey and ExtractField it’s not recognizing id. But it’s recognizing fields like op,ts_ms,transaction. Can you please help me in this case?

transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=id
transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=id

These configurations are giving error that id field is not found.

Thanks in advance.

Is id the primary key of your database record? If so, it should get set as the Kafka message key automagically by Debezium

Yes, it is. What should I set key.converter for getting it in simple string format not struct? Does every connector set primary key as a key of message? @rmoff

What should I set key.converter for getting it in simple string format not struct?

org.apache.kafka.connect.storage.StringConverter probably, although maybe you’ll need a org.apache.kafka.connect.transforms.ExtractField$Key Single Message Transform applied to it.

Does every connector set primary key as a key of message?

No, it’s down to the specific connector to implement it

@rmoff Thank you so much. It helped.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.