Using a source topic from main mysql table to sink into an audit table

I have a table “books” in database motor. This is my source and for source connection I created a topic “mysql-books”. So far all good I am able to see messages on confluent platform UI. Now these messages I want to sink into another database called motor-audit so that in audit I am able to see all the changes that happened to the table “books”. I have given the topic “mysql-books” in my sink curl for sink connector since changes are being published to this topic ,but this is not coming up and giving errors like

  1. This is with respect to the basic streams example you showed - Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 3 -

  2. Error from my objective -

[2021-04-28 18:14:48,231] ERROR WorkerSinkTask{id=sink-jdbc-mysql-02-json-0} Error converting message value in topic 'mysql-books' partition 0 at offset 0 and timestamp 1619602271548: Converting byte[] to Kafka Connect data failed due to serialization error:  (org.apache.kafka.connect.runtime.WorkerSinkTask:547)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 

I am pasting my curls for source and sink -

source -

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_06",
        "config": {
                "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                "connection.url": "jdbc:mysql://localhost:3306/motor",
                "connection.user": "yagnesh",
                "connection.password": "yagnesh123",
                "topic.prefix": "mysql-",
                "poll.interval.ms" : 3600000,
                "catalog.pattern" : "motor"
                }
        }'

Sink -

curl -X PUT http://localhost:8083/connectors/sink-jdbc-mysql-02-json/config \
     -H "Content-Type: application/json" -d '{
    "connector.class"               : "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url"                : "jdbc:mysql://mysql:3306/motor",
    "topics"                        : "mysql-books",
    "key.converter"                 : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter"               : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "connection.user"               : "yagnesh",
    "connection.password"           : "yagnesh123",
    "auto.create"                   : true,
    "auto.evolve"                   : true,
    "insert.mode"                   : "insert"
}'

I have tried AVRO &Json both in the converters but no luck.

You need to use the same converter on both source and sink. I would recommend Avro, Protobuf, or JSON Schema.

Add/replace this config in both source and sink connectors:

"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

Replace http://schema-registry:8081 with wherever your Schema Registry is located.

I have changed my config to below this helped me in resolving the existing errors but even though my kakfa source message has a key I am facing this error upon launching my sink -

io.confluent.rest.exceptions.RestNotFoundException: Subject 'mysql-books-key' not found.
at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.subjectNotFoundException(Errors.java:69)

My source config -

    curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
        "name": "jdbc_source_mysql_001",
        "config": {
                "value.converter.schema.registry.url": "http://0.0.0.0:8081",
				"key.converter.schema.registry.url": "http://0.0.0.0:8081",
				"name": "jdbc_source_mysql_001",
				"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
				"key.converter": "io.confluent.connect.avro.AvroConverter",
				"value.converter": "io.confluent.connect.avro.AvroConverter",
				"connection.url": "jdbc:mysql://localhost:3306/motor",
				"connection.user": "yagnesh",
				"connection.password": "yagnesh123",
				"catalog.pattern": "motor",
				"mode": "bulk",
				"poll.interval.ms": "10000",
				"topic.prefix": "mysql-",
                "transforms":"createKey,extractInt",
                "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.createKey.fields":"id",
                "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
                "transforms.extractInt.field":"id"
                }
        }

Sink config -

curl -X PUT http://localhost:8083/connectors/jdbc_sink_mysql_001/config \
     -H "Content-Type: application/json" -d '{
      "value.converter.schema.registry.url": "http://0.0.0.0:8081",
	  "value.converter.schemas.enable": "true",
	  "key.converter.schema.registry.url": "http://0.0.0.0:8081",
	  "name": "jdbc_sink_mysql_001",
	  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
	  "key.converter": "io.confluent.connect.avro.AvroConverter",
	  "value.converter": "io.confluent.connect.avro.AvroConverter",
	  "topics":"mysql-books",
	  "connection.url": "jdbc:mysql://mysql:3306/motor",
	  "connection.user": "yagnesh",
	  "connection.password": "yagnesh123",
	  "insert.mode": "insert",
	  "auto.create": "true",
	  "auto.evolve": "true"
}'

This is how my topic looks on confluent platform UI


The keys are seen in bytes but even if I use either AvroConverter or StringConverter for the key and keep it same in both source and sink still I face the same error.

Maybe you could try debugging the Schema Registry state using the REST API: Schema Registry API Reference | Confluent Documentation

I would investigate the subjects to ensure that it’s being created as you expect prior to launching the sink connector.

The command would be similar to:
curl -XGET http://localhost:8081/subjects

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.