Hi Dave,
thaks for your help.
this error is resolved now but i’m getting other error with de value converter avro.
Below my connector config
{
"name" : "connector-debezium-mongo-source",
"config" : {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts" : "mongdbHost",
"mongodb.user" : "user",
"mongodb.password" : "password",
"mongodb.name" : "database",
"database.include.list" : "database",
"mongodb.ssl.enabled":"true",
"collection.include.list" : "collectionName",
"transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"auto.register.schemas" : "true",
"key.converter.auto.register.schemas": "true",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "URL_SCHEMA_REGISTRY",
"value.converter.schema.registry.url" : "URL_SCHEMA_REGISTRY",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.basic.auth.user.info":"",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.retry.delay.max.ms": 60000,
"errors.retry.timeout": 0,
"errors.tolerance": "all"
}
}
here the error message
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic audienceSharing.audienceSharing.customer :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving latest version of Avro schema{"type":"record","name":"Envelope","namespace":"audienceSharing.audienceSharing.customer","fields":[{"name":"after","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"patch","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"filter","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"updateDescription","type":["null",{"type":"record","name":"updatedescription","namespace":"io.debezium.connector.mongodb.changestream","fields":[{"name":"removedFields","type":["null",{"type":"array","items":"string"}],"default":null},{"name":"updatedFields","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"truncatedArrays","type":["null",{"type":"array","items":{"type":"record","name":"truncatedarray","fields":[{"name":"field","type":"string"},{"name":"size","type":"int"}],"connect.name":"io.debezium.connector.mongodb.changestream.truncatedarray"}}],"default":null}],"connect.name":"io.debezium.connector.mongodb.changestream.updatedescription"}],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mongo","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"rs","type":"string"},{"name":"collection","type":"string"},{"name":"ord","type":"int"},{"name":"h","type":["null","long"],"default":null},{"name":"tord","type":["null","long"],"default":null},{"name":"stxnid","type":["null","string"],"default":null},{"name":"lsid","type":["null","string"],"default":null},{"name":"txnNumber","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.mongo.Source"}},{"name":"op","type":["null","string"],"default":null},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"audienceSharing.audienceSharing.customer.Envelope"}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'audienceSharing.audienceSharing.customer-value' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:893)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:884)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:469)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:222)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:206)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:123)
... 17 more
[2022-06-15 12:40:26,073] ERROR Error encountered in task . Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = SourceRecord{sourcePartition={rs=atlas-sc0h29-shard-0, server_id=audienceSharing}, sourceOffset={sec=1655296817, ord=1, initsync=true, h=null}} ConnectRecord{topic='audienceSharing.audienceSharing.customer', kafkaPartition=null, key=Struct{id={"$oid": "62a761f9f004b8106d055f0c"}}, keySchema=Schema{audienceSharing.audienceSharing.customer.Key:STRUCT}, value=Struct{after={"_id": {"$oid": "62a761f9f004b8106d055f0c"},"buCode": "LMIT","clientNumber": "1016307","audiences": [{"audience": {"audienceId": "45012","name": "OFFER_Marketing_Cible_Parcours_Jardin_SCORE","createdDate": null,"updatedDate": null,"maximumNumberOfUseDuringMembership": 1,"membershipDuration": 1,"category": "LOYALTY","description": "","buCode": "LMIT"},"startDate": {"$date": 1655136776815},"remainingNumberOfUse": 1,"lastEntryDate": {"$date": 1655136776815}}]},source=Struct{version=1.9.2.Final,connector=mongodb,name=audienceSharing,ts_ms=1655296817000,snapshot=true,db=audienceSharing,rs=atlas-sc0h29-shard-0,collection=customer,ord=1},op=r,ts_ms=1655296819852}, valueSchema=Schema{audienceSharing.audienceSharing.customer.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic audienceSharing.audienceSharing.customer :
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving latest version of Avro schema{"type":"record","name":"Envelope","namespace":"audienceSharing.audienceSharing.customer","fields":[{"name":"after","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"patch","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"filter","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"updateDescription","type":["null",{"type":"record","name":"updatedescription","namespace":"io.debezium.connector.mongodb.changestream","fields":[{"name":"removedFields","type":["null",{"type":"array","items":"string"}],"default":null},{"name":"updatedFields","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"truncatedArrays","type":["null",{"type":"array","items":{"type":"record","name":"truncatedarray","fields":[{"name":"field","type":"string"},{"name":"size","type":"int"}],"connect.name":"io.debezium.connector.mongodb.changestream.truncatedarray"}}],"default":null}],"connect.name":"io.debezium.connector.mongodb.changestream.updatedescription"}],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mongo","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"rs","type":"string"},{"name":"collection","type":"string"},{"name":"ord","type":"int"},{"name":"h","type":["null","long"],"default":null},{"name":"tord","type":["null","long"],"default":null},{"name":"stxnid","type":["null","string"],"default":null},{"name":"lsid","type":["null","string"],"default":null},{"name":"txnNumber","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.mongo.Source"}},{"name":"op","type":["null","string"],"default":null},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"audienceSharing.audienceSharing.customer.Envelope"}
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'audienceSharing.audienceSharing.customer-value' not found.; error code: 40401
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:893)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:884)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:469)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:222)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:206)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:123)
... 17 more
Thanks,