Copying Avro-serialized topic data between clusters

Hi Kafkateers!

I need to copy data from Topic A in Cluster 1 to Topic B in Cluster 2.
Topic A key is a string and value is Avro serialized.

So, I am currently doing the copying in three-step operation.

Step 1. Download topic value schema to a file

curl -Ss --cert $KAFKA_KEY_LOCATION:$KAFKA_KEY_PASSPHRASE \
-X GET https://$KAFKA_API_KEY:$KAFKA_API_SECRET@$KAFKA_SCHEMA_REGISTRY_HOST:$KAFKA_SCHEMA_REGISTRY_PORT/subjects/my-topic-value/versions/latest/schema \
> ./my-topic-value-schema.json

Step 2. Download topic messages to a file

./bin/kafka-avro-console-consumer \
--bootstrap-server $KAFKA_BROKER_HOST:$KAFKA_BROKER_PORT \
--topic my-topic \
--property print.key=true \
--property print.value=true \
--property key.separator=":" \
--key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
--value-deserializer "io.confluent.kafka.serializers.KafkaAvroDeserializer" \
--property schema.registry.url=https://$KAFKA_SCHEMA_REGISTRY_HOST:$KAFKA_SCHEMA_REGISTRY_PORT \
--property schema.registry.basic.auth.user.info="$KAFKA_API_KEY:$KAFKA_API_SECRET" \
--property basic.auth.credentials.source=USER_INFO \
--consumer-property security.protocol=SASL_SSL \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$KAFKA_API_KEY\" password=\"$KAFKA_API_SECRET\";" \
--consumer-property ssl.truststore.location=$KAFKA_TRUSTSTORE_LOCATION \
--consumer-property ssl.truststore.password=$KAFKA_TRUSTSTORE_PASSPHRASE \
--consumer-property ssl.keystore.location=$KAFKA_KEYSTORE_LOCATION \
--consumer-property ssl.keystore.password=$KAFKA_KEYSTORE_PASSPHRASE \
--consumer-property ssl.key.password=$KAFKA_KEY_PASSPHRASE \
--consumer-property ssl.truststore.type=JKS \
--consumer-property ssl.keystore.type=JKS \
--from-beginning \
--timeout-ms 20000 \
> ./my-topic-data.json

Step 3. Upload the source data to the destination topic in local sandbox environment

./bin/kafka-avro-console-producer \
--bootstrap-server localhost:9092 \
--topic my-topic-copy \
--property schema.registry.url=http://localhost:8081 \
--property key.serializer="org.apache.kafka.common.serialization.StringSerializer" \
--property value.schema.file=./my-topic-value-schema.json \
--property parse.key=true \
--property key.separator=":" \
< ./my-topic-data.json

And this approach works perfectly for the topics, schema of which didn’t change through time. But unfortunately it’s not the case for me now.

This means, that the topic value has several version of the schema, hence there are messages, which do not comply with the latest schema version.

This leads to an error of course, that when I am trying to write the messages to my destination topic, the producer console crashes on the old messages.

Do you know how to properly overcome this situation? Is there a way to consume only the messages with a particular schema version? Or are there maybe better options?

So no ideas at all?

Probably there are better CLI options to do the job?