Replicator creating topics but not migrating topic messages

What are the things to check when the replicator is not migrating the topic messages from the source to destination?

  • replicator pod logs dont have any errors
  • replicator is creating the topic in the destination cluster from the source cluster
  • Able to produce messages from a kafka producer running on the same namespace as replicator
  • Checked __consumer_offsets topic and I see that the messages are consumed from the source cluster.

replicator is also finding the topic. log has it.

[INFO] 2023-10-30 11:17:21,939 [topic-monitor-thread-replicator] io.confluent.connect.replicator.NewTopicMonitorThread listMatchingTopics - Found matching topics: [replicator-test-source, __consumer_timestamps]

Followed Replicator Readme to set up replicator in k8s

The kafka server log also doesnt have error…

[2023-10-30 08:42:09,331] INFO [Admin Manager on Broker 201]: Updating topic replicator-test-source-1 with new configuration : message.timestamp.type -> CreateTime,segment.bytes -> 100000000,flush.messages -> 10000,unclean.leader.election.enable -> true,min.insync.replicas -> 2 (kafka.server.AdminManager) [2023-10-30 08:42:09,343] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener) [2023-10-30 08:42:09,346] INFO Processing override for entityPath: topics/replicator-test-source-1 with config: Map(message.timestamp.type -> CreateTime, segment.bytes -> 100000000, flush.messages -> 10000, unclean.leader.election.enable -> true, min.insync.replicas -> 2) (kafka.server.DynamicConfigManager)

hey @dasasathyan

your configuring replicator from an onprem cluster to confluent cloud right?

has the topic been created on the dest cluster?

best,
michael

hey @mmuehlbeyer
We are migrating from 2 different clusters running onprem not confluent cloud.
Yes. The topic is created in the destination cluster.

any errors in the dest cluster so far?

Nope… Its completely clean

ok
could you share your replicator config?
is the topic compacted?
what about the connect rest api, any errors there?

Replicator Config
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: replicator
  namespace: destination
spec:
  name: replicator
  class: "io.confluent.connect.replicator.ReplicatorSourceConnector"
  taskMax: 4
  configs:
    topic.whitelist: "replicator-test-source-1"
    confluent.topic.replication.factor: "3"
    dest.kafka.bootstrap.servers: "<dest ip addresses>"
    dest.kafka.security.protocol: "SSL"
    dest.kafka.ssl.truststore.location: "/mnt/sslcerts/ssl-certs/truststore.jks"
    dest.kafka.truststore.password: "<dest truststore password>"
    dest.kafka.sasl.mechanism: "PLAIN"
    dest.kafka.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"<jass password>\";"
    dest.kafka.reconnect.backoff.ms: "600000"
    dest.kafka.request.timeout.ms: "600000"
    provenance.headetelr.enable: "true"
    key.converter: "io.confluent.connect.replicator.util.ByteArrayConverter"
    src.consumer.group.id: "replicator-test-source-1-group"
    src.kafka.timestamps.topic.replication.factor: "3"
    src.kafka.sasl.mechanism: "PLAIN"
    src.kafka.ssl.truststore.location: "/mnt/sslcerts/ssl-certs/broker.jks"
    src.kafka.truststore.password: "<src trust store password>"
    src.kafka.bootstrap.servers: "<src ip addresses>"
    src.kafka.security.protocol: "SSL"
    value.converter: "io.confluent.connect.replicator.util.ByteArrayConverter"
    ssl.endpoint.identification.algorithm: ""
    src.kafka.ssl.endpoint.identification.algorithm: ""
    dest.kafka.ssl.endpoint.identification.algorithm: ""
  connectRest:
    tls:
      secretRef: ssl-certs

This is just a test topic, so havent compacted it.
Connect Rest APIs just have a couple of health checks with 200s.

@mmuehlbeyer Are the source and destination clusters required to be Confluent Kafka, or does it matter if they are Apache Kafka? Does this distinction make any impact?
My clusters are on Apache Kafka

hi @dasasathyan
basically it should work with Apache Kafka as well.

Best,
Michael

Thanks Michael.

Did you by any chance find anything wrongly configured with the replicator?

no obvious errors at all.
did you produce some messages to the topic as well?

best,
michael

Yup. I did…
I produced with kafka-producer-perf-test as per https://github.com/confluentinc/confluent-kubernetes-examples/blob/45331dfab5d08e8513b5532016a9c9b7b7e1553e/hybrid/replicator/secure-producer-app-data.yaml

I produced it from the same namespace as replicator and the messages are being produced…

@mmuehlbeyer any suggestions here?

Also, Is there any paid support that I can get from Confluent for this particular issue alone?

you might reset the offset for the consumer or set it to a specific one

Hi @mmuehlbeyer I have tried it with a new consumer group altogether so that the messages are consumed from 0th offset again. Still no luck.

hey @dasasathyan

another try maybe with demo here: