JDBC Sink Connector with JSON message format without schema

Hi There.
I have a problem when parsing data into RDBMS, where Kafka has a Json message format with key strings without schema.

Kafka Message !

{
“magic”: “atMSG”,
“type”: “DT”,
“headers”: null,
“messageSchemaId”: null,
“messageSchema”: null,
“message”: {
“data”: {
“ORDER_ID”: 2,
“PRODUCT_ID”: 222,
“QUANTITY”: 2,
“PRICE”: “22.50”,
“CUSTOMER_ID”: 2000,
“ORDER_DATE”: “2024-01-22”
},
“beforeData”: null,
“headers”: {
“operation”: “INSERT”,
“changeSequence”: 20240121182656000000000000000000005,
“timestamp”: “1970-01-01T00:00:00.000”,
“streamPosition”: “01000000000009DE710000000000352A5E|020000000000000000000000002CBB3331”,
“transactionId”: “00000000000000007C95270000000000”,
“changeMask”: “3F”,
“columnMask”: “3F”,
“externalSchemaId”: null,
“transactionEventCounter”: 1,
“transactionLastEvent”: true
}
}
}

And this is the JDBC Sink connector configuration.

{
“name”: “SID_OLSHOP_MWN-v03”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“errors.log.enable”: “true”,
“errors.log.include.messages”: “true”,

"transforms": "ExtractFieldMessage, ExtractFieldData",

"transforms.ExtractFieldMessage.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldMessage.field": "message",

"transforms.ExtractFieldData.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldData.field" : "data",

"fields.whitelist": "ORDER_ID, PRODUCT_ID, QUANTITY, PRICE, CUSTOMER_ID, ORDER_DATE",

"topics": "SID.OLSHOP",
"table.name.format": "SID_OLSHOP_DEV",

"connection.url": "jdbc:postgresql://10.10.10.12:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",

"insert.mode": "upsert",
"delete.enabled": "true",

"pk.mode": "record_key",
"pk.fields": "ORDER_ID",

"auto.create": "true",
"auto.evolve": "true",

"db.timezone": "Asia/Jakarta",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"

}
}

Is there an SMT that must be used such as Append Schema or do I need to register a schema for the topic?

Thanks
Regards,
MWN

It would be very helpful if you provided the text form of the stack trace and exception.

The actual message you need to be investigating is this:

Caused by: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base...
of loader 'bootstrap'; org.apache.kafka.connect.data.Struct is in unname module of loader 'app';

A quick search for “kafka connect json without schema hashmap” revealed this answer from Robin Moffat in 2021 Kafka Connect without schema, only JSON - Stack Overflow, and a further link with more explanation at Kafka Connect and Schemas.

thanks jmcp,

I don’t think this is the best recommendation.
because there are too many processes which will eat up disk resources on the server.

In this case, the topic used is historical data.

Is there a way so that the JDBC sink connector can process JSON data in nested (struct) format? while the producer cannot register the json schema during the producer topic.

At this point, no, there is no facility within the JSON converter to process nested data. So you might need to string together a heap of HoistField applications in order to produce a flat structure for the JDBC connector to insert to the db. Or perhaps use kSQLdb as an intermediate step

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.