I am now deploying a kafka connect cluster using the confluentinc/cp-kafka-connect image in the local infrastructure of k8s, and then using the rest api to create a MirrorSourceConnctor, but I found that the target cluster of target.clusters.bootsrap.servers specified in my request parameters did not write data. However, according to the log requirements, it prompted me that I had no information about target.cluster.alias#topic. Then I created such a topic in the cluster that stores kafka connect metadata. As a result, there were relevant messages in the kafka cluster that stores metadata, but no messages entered the target cluster that I really need to synchronize. Why is this?
The body parameters are like this:
{
“name”: “kafka2kafka-mirror-source-connector-20250310-to-vm-kafka”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“tasks.max”: “3”,
“topics”: “paas_base_db_sit-paas_manager_cronjob_schema-cronjob_service_instance”,
“source.cluster.bootstrap.servers”: “10.57.10.24:9092,10.57.10.23:9092,10.57.10.21:9092”,
“source.cluster.alias”: “gauss-db-vm-kafka-cluster-vm”,
“source.cluster.security.protocol”: “SASL_PLAINTEXT”,
“source.cluster.sasl.mechanism”: “SCRAM-SHA-512”,
“source.cluster.sasl.jaas.config”: “org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password";”,
“consumer.override.auto.offset.reset”: “earliest”,
"consumer.group.id": "xxxx-yyyy-zzzz-vm",
"target.cluster.alias": "gaussdb-container-kafka-cluster",
"target.cluster.bootstrap.servers": "10.57.10.26:9092,10.57.10.22:9092,10.57.10.27:9092",
"target.cluster.security.protocol": "SASL_PLAINTEXT",
"target.cluster.sasl.mechanism": "SCRAM-SHA-512",
"target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"password\";",
"sync.topic.acls.enabled": "true",
"sync.topic.configs.enabled": "true",
"sync.group.offsets.enabled": "true",
"errors.log.enable" : "false",
"refresh.topics.interval.seconds": "10",
"refresh.groups.interval.seconds": "10",
"emit.hearbeats.enabled": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"producer.override.max.request.size": "31457280",
"consumer.override.fetch.max.bytes": "31457280",
"producer.override.linger.ms": "100"
}
}