Jdbc sink connector not working with apicurio's ExtJsonConverter

I am pushing data from kafka to jdbc sink connector. Data in kafka is like this:
{“schemaId”:31,“payload”:{“id”:“1”,“firstname”:“test1”,“lastname”:“test2”}}
I am using Apicurio’s ExtJsonConverter both in source and sink connectors.

Jdbc sink connector throws following error:

Caused by: java.lang.ClassCastException: class java.util.HashMap cannot be cast to class org.apache.kafka.connect.data.Struct (java.util.HashMap is in module java.base of loader ‘bootstrap’; org.apache.kafka.connect.data.Struct is in unnamed module of loader ‘app’)
at io.confluent.connect.jdbc.sink.PreparedStatementBinder.bindRecord(PreparedStatementBinder.java:86)
at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:183)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:80)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)

Why is this happening, how can I solve this or any alternative I can use. At first I tried with AvroConverter but Avro have issues when I reorder fields, for example, in my case if lastname comes before firstname, it stores firstname column value with lastname value and vice-versa in sink DB.

Hi Gulati,

I’ve asked your question to an AI chatbot and received this reply:

The error message suggests that the JDBC sink connector is expecting a Kafka record with a Struct value, but it is receiving a record with a HashMap value instead. This might be happening because the Apicurio ExtJsonConverter is not properly converting the JSON message into the expected format.

One solution you can try is to use the JsonConverter provided by Kafka Connect instead of the Apicurio ExtJsonConverter. You can configure the JsonConverter in your Kafka Connect worker configuration file by setting the following properties:

arduino

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

This should ensure that the JSON messages are correctly converted to the expected Struct format by the JDBC sink connector.

Alternatively, you can try to modify the JDBC sink connector configuration to use a different value converter that is compatible with the HashMap format. For example, you can try using the org.apache.kafka.connect.storage.StringConverter which simply converts the message payload to a string. You can configure the JDBC sink connector to use the StringConverter as follows:

arduino

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

This approach may require some modifications to the JDBC sink connector configuration and the target database schema to accommodate the different data format.

Let me know if that helps, if not I can take this to a team member here at Confluent

I do not want to use embedded schema, for JsonConverter we need to send schema along with payload.
Yeah, please take it to a team member, it would be of great help.

Thanks

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.