I have a table “books” in database motor. This is my source and for source connection I created a topic “mysql-books”. So far all good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table “books”. I have given the topic “mysql-books” in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like
-
This is with respect to the basic streams example you showed -
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 -
-
Error from my objective -
[2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error: (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
I am pasting my curls for source and sink -
source -
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_06",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/motor",
"connection.user": "yagnesh",
"connection.password": "yagnesh123",
"topic.prefix": "mysql-",
"poll.interval.ms" : 3600000,
"catalog.pattern" : "motor"
}
}'
Sink -
curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/motor",
"topics" : "mysql-books",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"connection.user" : "yagnesh",
"connection.password" : "yagnesh123",
"auto.create" : true,
"auto.evolve" : true,
"insert.mode" : "insert"
}'
I have tried AVRO &Json both in the converters but no luck.