Produce AVRO messages to kafka topic using debezium connector using Linux platform

Platform form details:

Zookeeper: 3 nodes
Broker: 3 nodes
Schema registry: running
Kafka Connect: running
Connector : Debezium


Connector conf file:

name=mysql-connector-1
connector.class=io.debezium.connector.mysql.MySqlConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://node1:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://node1:8081
database.hostname=10.10.10.x
database.port=330x
database.user=debezium
database.password=debezium134
database.server.id=1
database.server.name=mysql
database.connectionTimeZone=America/New_York
database.include.list=test
table.include.list=test.poc1,test.poc3
database.history.kafka.bootstrap.servers=node1:9092
database.history.kafka.topic=kafka_poc
include.schema.changes=true
tombstones.on.delete=true
value.converter.schemas.enable=false
key.converter.schemas.enable=false

On kafka we can see two topics created:
1> kafka_poc
2>mysql.test.poc3

We can consume messages from “mysql.kafka.poc3” using kafka-avro-console-consumer:

/data/confluent-kafka/confluent-7.0.1/bin/kafka-avro-console-consumer --bootstrap-server node1:9092 --from-beginning --topic mysql.test.poc3 --property schema.registry.url=node1:8081

We are not able to consume messages using kafka-avro-console-consumer from topic “kafka_poc” which includes DDL of our table schema that is coming from MySQL using debezium.

/data/confluent-kafka/confluent-7.0.1/bin/kafka-avro-console-consumer --topic kafka_poc --bootstrap-server node1:9092 --property schema.registry.url=node1:8081 --property print.key=true --from-beginning
null Processed a total of 1 messages
[2022-10-27 14:27:32,777] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:43)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

In order to consume the messages from topic kafka_poc we need to invoke kafka-console-consumer.

/data/confluent-kafka/confluent-7.0.1/bin/kafka-console-consumer --topic kafka_poc --bootstrap-server node1:9092 --property schema.registry.url=node1:8081 --property print.key=true --from-beginning

How we can plugin kafka avro serializer and deserializer into our connector to be able to consume messages from both topics “kafka_poc” & “mysql.test.poc3” using kafka-avro-console-consumer?
Any suggestions other than that?

Don’t confuse it with (schema.registry.url=node1:8081), I changed it due to new user limitation. It looks like the below in each command to invoke consumers:

/data/confluent-kafka/confluent-7.0.1/bin/kafka-avro-console-consumer --bootstrap-server node1:9092 --from-beginning --topic mysql.test.poc3 --property schema.registry.url=http://node1:8081

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.