Hi There.
I have a problem when parsing data into RDBMS, where Kafka has a Json message format with key strings without schema.
Kafka Message !
{
“magic”: “atMSG”,
“type”: “DT”,
“headers”: null,
“messageSchemaId”: null,
“messageSchema”: null,
“message”: {
“data”: {
“ORDER_ID”: 2,
“PRODUCT_ID”: 222,
“QUANTITY”: 2,
“PRICE”: “22.50”,
“CUSTOMER_ID”: 2000,
“ORDER_DATE”: “2024-01-22”
},
“beforeData”: null,
“headers”: {
“operation”: “INSERT”,
“changeSequence”: 20240121182656000000000000000000005,
“timestamp”: “1970-01-01T00:00:00.000”,
“streamPosition”: “01000000000009DE710000000000352A5E|020000000000000000000000002CBB3331”,
“transactionId”: “00000000000000007C95270000000000”,
“changeMask”: “3F”,
“columnMask”: “3F”,
“externalSchemaId”: null,
“transactionEventCounter”: 1,
“transactionLastEvent”: true
}
}
}
And this is the JDBC Sink connector configuration.
{
“name”: “SID_OLSHOP_MWN-v03”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,
"transforms": "ExtractFieldMessage, ExtractFieldData",
"transforms.ExtractFieldMessage.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldMessage.field": "message",
"transforms.ExtractFieldData.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldData.field" : "data",
"fields.whitelist": "ORDER_ID, PRODUCT_ID, QUANTITY, PRICE, CUSTOMER_ID, ORDER_DATE",
"topics": "SID.OLSHOP",
"table.name.format": "SID_OLSHOP_DEV",
"connection.url": "jdbc:postgresql://10.10.10.12:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "ORDER_ID",
"auto.create": "true",
"auto.evolve": "true",
"db.timezone": "Asia/Jakarta",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
}
Is there an SMT that must be used such as Append Schema or do I need to register a schema for the topic?
Thanks
Regards,
MWN