Copying Avro-serialized topic data between clusters

Hi Kafkateers!

I need to copy data from Topic A in Cluster 1 to Topic B in Cluster 2.
Topic A key is a string and value is Avro serialized.

So, I am currently doing the copying in three-step operation.

Step 1. Download topic value schema to a file

curl -Ss --cert $KAFKA_KEY_LOCATION:$KAFKA_KEY_PASSPHRASE \
-X GET https://$KAFKA_API_KEY:$KAFKA_API_SECRET@$KAFKA_SCHEMA_REGISTRY_HOST:$KAFKA_SCHEMA_REGISTRY_PORT/subjects/my-topic-value/versions/latest/schema \
> ./my-topic-value-schema.json

Step 2. Download topic messages to a file

./bin/kafka-avro-console-consumer \
--bootstrap-server $KAFKA_BROKER_HOST:$KAFKA_BROKER_PORT \
--topic my-topic \
--property print.key=true \
--property print.value=true \
--property key.separator=":" \
--key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
--value-deserializer "io.confluent.kafka.serializers.KafkaAvroDeserializer" \
--property schema.registry.url=https://$KAFKA_SCHEMA_REGISTRY_HOST:$KAFKA_SCHEMA_REGISTRY_PORT \
--property schema.registry.basic.auth.user.info="$KAFKA_API_KEY:$KAFKA_API_SECRET" \
--property basic.auth.credentials.source=USER_INFO \
--consumer-property security.protocol=SASL_SSL \
--consumer-property sasl.mechanism=PLAIN \
--consumer-property sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$KAFKA_API_KEY\" password=\"$KAFKA_API_SECRET\";" \
--consumer-property ssl.truststore.location=$KAFKA_TRUSTSTORE_LOCATION \
--consumer-property ssl.truststore.password=$KAFKA_TRUSTSTORE_PASSPHRASE \
--consumer-property ssl.keystore.location=$KAFKA_KEYSTORE_LOCATION \
--consumer-property ssl.keystore.password=$KAFKA_KEYSTORE_PASSPHRASE \
--consumer-property ssl.key.password=$KAFKA_KEY_PASSPHRASE \
--consumer-property ssl.truststore.type=JKS \
--consumer-property ssl.keystore.type=JKS \
--from-beginning \
--timeout-ms 20000 \
> ./my-topic-data.json

Step 3. Upload the source data to the destination topic in local sandbox environment

./bin/kafka-avro-console-producer \
--bootstrap-server localhost:9092 \
--topic my-topic-copy \
--property schema.registry.url=http://localhost:8081 \
--property key.serializer="org.apache.kafka.common.serialization.StringSerializer" \
--property value.schema.file=./my-topic-value-schema.json \
--property parse.key=true \
--property key.separator=":" \
< ./my-topic-data.json

And this approach works perfectly for the topics, schema of which didn’t change through time. But unfortunately it’s not the case for me now.

This means, that the topic value has several version of the schema, hence there are messages, which do not comply with the latest schema version.

This leads to an error of course, that when I am trying to write the messages to my destination topic, the producer console crashes on the old messages.

Do you know how to properly overcome this situation? Is there a way to consume only the messages with a particular schema version? Or are there maybe better options?

So no ideas at all?

Probably there are better CLI options to do the job?

hi @whatsupbros, the way to overcome this limitation is to not de-serialize these values as you are copying them, as this causes the consumer to force an interpretation of the byte array.

Instead, I suggest downloading and saving these records as byte arrays, you can do so by not interpreting the messages with the org.apache.kafka.common.serialization.StringDeserializer and io.confluent.kafka.serializers.KafkaAvroDeserializer, but instead, use the org.apache.kafka.common.serialization.ByteArrayDeserializer for both, value and key, and similarly, the org.apache.kafka.common.serialization.ByteArraySerializer for sending them to a new topic. This will keep the original record intact.

What does this mean? it means the avro-serialized record will still contain the reference to the schema ID in Schema Registry, if you are using two different schema registries, this might be a problem, I invite you to look into Confluent Replicator or my own project, ccloud-schema-exporter for schema migration between two schema registries. If the same schema registry is being used, then no need to worry about schema migration.

I hope this helps, and if I can help clarify anything, please let me know. :slight_smile:

2 Likes

Also, I’m not sure if you are aware, but there has been significant work to simplify this flow, Confluent Replicator, and Cluster Linking do this flow for you, if you wanted to save a headache :slight_smile:

Hi @Abraham!

Thank you for your answer!

Yes, the problem is that I need to replicate the topic to the local cluster, which uses local Schema Registry. And, as you mentioned, if I replicate the data without deserialization and serialization, the messages will have schema IDs within themselves, and this will not be consistent with the local environment.

Currently, I don’t know of an easy way to take control over this process.

I had a look at Confluent Replicator and it seems to be exactly what I need (but using Kafka Connect approach instead of CLI Tools), but it is an Enterprise Edition feature, and I am trying to stick to the Community License at the moment, especially in my local sandbox environment.

Do you probably know about any alternatives, probably another Kafka-to-Kafka Source Connector, which is available for the Community Edition users? I found one, but it seems to be deprecated, and it looks like in fact, Confluent Replicartor replaced it…

And I believe this is in fact quite a common use-case, when developers need to copy a topic from their “real” environment to their local sandbox on their laptop, just to play around and to test a couple of things.

Hey! So two points:

  1. There is a community made one! MirrorMaker is the OS version (right now in version 2). It comes in every Kafka Connect packaging, if you’d like to learn more about it I invite you to read the KIP for it. However, mirror maker doesn’t help you for the schema migration. In this case, ccloud-schema-exporter can help you migrate your schemas from one SR to the other.

  2. Confluent Platform is actually free forever in a single-node cluster according to its developer license here. So if you’d like to try out those features, they are there :slight_smile:

Let me know if you have any questions!

1 Like

Hi @Abraham! Ha! I totally forgot about the fact, that Enterprise Edition of Confluent Platform can be used freely on a single-broker setup. Thanks for this reminder, probably this would be the easiest option for me atm.

I will be able to play around with Confluent Replicator probably on Tuesday, so I will definitely come back to you here to share my results with you. Thank you once again!

Thank you for mentioning MirrorMaker 2.0 as well, I should definitely have a look at it and its capabilities as well sooner or later.

However, it would be really nice, if standard CLI tools, which are available for Confluent Platform (both Community and Enterprise), supported multiple schemas for a topic data consumption and production. Will leave it here just as a humble suggestion.

1 Like

Just to leave it here as an additional information, which was new to me.

Both MirrorMaker 2.0 and Confluent Replicator can operate in the connector mode, which means that they can work within existing Kafka Connect cluster, and they look just as a standard connector.

There are also other options of how they can be executed, but this is not much relevant with the issue from the original message here.