Hi,
I am trying to use ValueToKey SMT for chaning Key of the record, for data coming from MySQL. Connector being used is debezium CDC source connector for MySQL.
When I am running it with SMT it throws error as field does not exist
. But if I remove SMT configs and run for initial load and after inital load if I update same connector config with same SMT configs, it is working fine for other real time transactions. It does not give any error.
This is self managed debezium connector pointing to confluent cloud. But behavior is same for onprem kafka setup as well.
Following are the configs, Kindly help :
{
"name": "New_Connector_123",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "valuetokey, router, extract",
"transforms.valuetokey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.valuetokey.fields": "skuid",
"transforms.router.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.router.topic.regex": "(.*)[.](.*)[.](.*)",
"transforms.router.topic.replacement": "$3",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field": "skuid",
"database.hostname": "",
"database.port": "3306",
"database.user": "root",
"database.password": "",
"database.server.name": "new_connector",
"database.history.kafka.bootstrap.servers": "",
"database.history.kafka.topic": "history_topic",
"table.include.list": "sku",
"database.include.list": "events",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";",
"database.history.producer.ssl.endpoint.identification.algorithm": "https",
"database.history.producer.security.protocol": "SASL_SSL",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"database.history.consumer.sasl.mechanism": "PLAIN"
}
}