About MirrorSourceConnector

I am now deploying a kafka connect cluster using the confluentinc/cp-kafka-connect image in the local infrastructure of k8s, and then using the rest api to create a MirrorSourceConnctor, but I found that the target cluster of target.clusters.bootsrap.servers specified in my request parameters did not write data. However, according to the log requirements, it prompted me that I had no information about target.cluster.alias#topic. Then I created such a topic in the cluster that stores kafka connect metadata. As a result, there were relevant messages in the kafka cluster that stores metadata, but no messages entered the target cluster that I really need to synchronize. Why is this?

The body parameters are like this:

{
“name”: “kafka2kafka-mirror-source-connector-20250310-to-vm-kafka”,
“config”: {
“connector.class”: “org.apache.kafka.connect.mirror.MirrorSourceConnector”,
“tasks.max”: “3”,
“topics”: “paas_base_db_sit-paas_manager_cronjob_schema-cronjob_service_instance”,
“source.cluster.bootstrap.servers”: “10.57.10.24:9092,10.57.10.23:9092,10.57.10.21:9092”,
“source.cluster.alias”: “gauss-db-vm-kafka-cluster-vm”,
“source.cluster.security.protocol”: “SASL_PLAINTEXT”,
“source.cluster.sasl.mechanism”: “SCRAM-SHA-512”,
“source.cluster.sasl.jaas.config”: “org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="password";”,
“consumer.override.auto.offset.reset”: “earliest”,

    "consumer.group.id": "xxxx-yyyy-zzzz-vm",

    "target.cluster.alias": "gaussdb-container-kafka-cluster",
    "target.cluster.bootstrap.servers": "10.57.10.26:9092,10.57.10.22:9092,10.57.10.27:9092",
    "target.cluster.security.protocol": "SASL_PLAINTEXT",
    "target.cluster.sasl.mechanism": "SCRAM-SHA-512",
    "target.cluster.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"password\";",

    "sync.topic.acls.enabled": "true",  
    "sync.topic.configs.enabled": "true", 
    "sync.group.offsets.enabled": "true",  

    
    "errors.log.enable" : "false",

    "refresh.topics.interval.seconds": "10", 
    "refresh.groups.interval.seconds": "10", 
    "emit.hearbeats.enabled": "true", 

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",


    "producer.override.max.request.size": "31457280",
    "consumer.override.fetch.max.bytes": "31457280",
    "producer.override.linger.ms": "100"             
}

}