Kafka connect debezium : connector source altas mongodb

Hi,

I’m trying to use kafka connect like CDC.
My connector source is atlas mongodb

And here my connector

{
    "name": "testmdia-connector",
    "config": {
        "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
        "tasks.max" : "1",
        "mongodb.hosts" : "mongodb-testmdia.ehnb8.mongodb.net",
        "mongodb.name" : "testmdia",
        "mongodb.user" : "",
        "mongodb.password" : "",
        "database.include.list" : "testmdia",
        "collection.include.list": "customer",
        "heartbea.topics.prefix" : "testmdia_my_topic",
        "poll.interval.ms" : 500,
        "connect.max.attempts" : 16
    }
}

But when i try to create the connector, i got this message error

com.mongodb.MongoSocketException: mongodb-testmdia.ehnb8.mongodb.net
	at com.mongodb.ServerAddress.getSocketAddresses(ServerAddress.java:211)
	at com.mongodb.internal.connection.SocketStream.initializeSocket(SocketStream.java:75)
	at com.mongodb.internal.connection.SocketStream.open(SocketStream.java:65)
	at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:165)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.lookupServerDescription(DefaultServerMonitor.java:195)
	at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:151)
	at java.base/java.lang.Thread.run(Thread.java:829)

Someone can help me please.
DIABY

Hello Diaby :wave:

One thing that jumps out to me is the need to set mongodb.ssl.enabled to true for connecting to Atlas. Give that a shot and see if it gets any closer. You may also want to explicitly set port 27017 for sanity’s sake, though I don’t think that’s required.

(Unrelated to the exception you’re hitting, there’s a typo in heartbea.topics.prefix)

Hope this helps! Please let us know if this gets you past that error.

Dave

1 Like

Hi Dave,

thaks for your help.
this error is resolved now but i’m getting other error with de value converter avro.

Below my connector config

{
  "name" : "connector-debezium-mongo-source",
  "config" : {
    "connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
    
    "mongodb.hosts" : "mongdbHost",
    "mongodb.user" : "user",
    "mongodb.password" : "password",
    "mongodb.name" : "database",
    "database.include.list" : "database",
    "mongodb.ssl.enabled":"true",
    "collection.include.list" : "collectionName",
    
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",

    "auto.register.schemas" : "true",
    "key.converter.auto.register.schemas": "true",
    "value.converter.schemas.enable": "true",
    "key.converter.schemas.enable": "true",

    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
	
    "key.converter.schema.registry.url": "URL_SCHEMA_REGISTRY",
    "value.converter.schema.registry.url" : "URL_SCHEMA_REGISTRY",
	"key.converter.basic.auth.credentials.source": "USER_INFO",
	"key.converter.basic.auth.user.info":"",
    
    "errors.log.enable": "true",
	"errors.log.include.messages": "true",
	"errors.retry.delay.max.ms": 60000,
	"errors.retry.timeout": 0,
	"errors.tolerance": "all"
  }
}

here the error message

org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic audienceSharing.audienceSharing.customer :
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving latest version of Avro schema{"type":"record","name":"Envelope","namespace":"audienceSharing.audienceSharing.customer","fields":[{"name":"after","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"patch","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"filter","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"updateDescription","type":["null",{"type":"record","name":"updatedescription","namespace":"io.debezium.connector.mongodb.changestream","fields":[{"name":"removedFields","type":["null",{"type":"array","items":"string"}],"default":null},{"name":"updatedFields","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"truncatedArrays","type":["null",{"type":"array","items":{"type":"record","name":"truncatedarray","fields":[{"name":"field","type":"string"},{"name":"size","type":"int"}],"connect.name":"io.debezium.connector.mongodb.changestream.truncatedarray"}}],"default":null}],"connect.name":"io.debezium.connector.mongodb.changestream.updatedescription"}],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mongo","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"rs","type":"string"},{"name":"collection","type":"string"},{"name":"ord","type":"int"},{"name":"h","type":["null","long"],"default":null},{"name":"tord","type":["null","long"],"default":null},{"name":"stxnid","type":["null","string"],"default":null},{"name":"lsid","type":["null","string"],"default":null},{"name":"txnNumber","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.mongo.Source"}},{"name":"op","type":["null","string"],"default":null},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"audienceSharing.audienceSharing.customer.Envelope"}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
	... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'audienceSharing.audienceSharing.customer-value' not found.; error code: 40401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:893)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:884)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:469)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:222)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:206)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:123)
	... 17 more
[2022-06-15 12:40:26,073] ERROR Error encountered in task . Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = SourceRecord{sourcePartition={rs=atlas-sc0h29-shard-0, server_id=audienceSharing}, sourceOffset={sec=1655296817, ord=1, initsync=true, h=null}} ConnectRecord{topic='audienceSharing.audienceSharing.customer', kafkaPartition=null, key=Struct{id={"$oid": "62a761f9f004b8106d055f0c"}}, keySchema=Schema{audienceSharing.audienceSharing.customer.Key:STRUCT}, value=Struct{after={"_id": {"$oid": "62a761f9f004b8106d055f0c"},"buCode": "LMIT","clientNumber": "1016307","audiences": [{"audience": {"audienceId": "45012","name": "OFFER_Marketing_Cible_Parcours_Jardin_SCORE","createdDate": null,"updatedDate": null,"maximumNumberOfUseDuringMembership": 1,"membershipDuration": 1,"category": "LOYALTY","description": "","buCode": "LMIT"},"startDate": {"$date": 1655136776815},"remainingNumberOfUse": 1,"lastEntryDate": {"$date": 1655136776815}}]},source=Struct{version=1.9.2.Final,connector=mongodb,name=audienceSharing,ts_ms=1655296817000,snapshot=true,db=audienceSharing,rs=atlas-sc0h29-shard-0,collection=customer,ord=1},op=r,ts_ms=1655296819852}, valueSchema=Schema{audienceSharing.audienceSharing.customer.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic audienceSharing.audienceSharing.customer :
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving latest version of Avro schema{"type":"record","name":"Envelope","namespace":"audienceSharing.audienceSharing.customer","fields":[{"name":"after","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"patch","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"filter","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"updateDescription","type":["null",{"type":"record","name":"updatedescription","namespace":"io.debezium.connector.mongodb.changestream","fields":[{"name":"removedFields","type":["null",{"type":"array","items":"string"}],"default":null},{"name":"updatedFields","type":["null",{"type":"string","connect.version":1,"connect.name":"io.debezium.data.Json"}],"default":null},{"name":"truncatedArrays","type":["null",{"type":"array","items":{"type":"record","name":"truncatedarray","fields":[{"name":"field","type":"string"},{"name":"size","type":"int"}],"connect.name":"io.debezium.connector.mongodb.changestream.truncatedarray"}}],"default":null}],"connect.name":"io.debezium.connector.mongodb.changestream.updatedescription"}],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.mongo","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"rs","type":"string"},{"name":"collection","type":"string"},{"name":"ord","type":"int"},{"name":"h","type":["null","long"],"default":null},{"name":"tord","type":["null","long"],"default":null},{"name":"stxnid","type":["null","string"],"default":null},{"name":"lsid","type":["null","string"],"default":null},{"name":"txnNumber","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.mongo.Source"}},{"name":"op","type":["null","string"],"default":null},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"audienceSharing.audienceSharing.customer.Envelope"}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
	... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'audienceSharing.audienceSharing.customer-value' not found.; error code: 40401
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:893)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:884)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:469)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:222)
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupLatestVersion(AbstractKafkaSchemaSerDe.java:206)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:123)
	... 17 more

Thanks,

“Subject not found” - Parse through the /subjects endpoint of your schema registry to see which ones match the data you’re trying to use.

You may need to set a different strategy on the Avro converter than the default that appends “-value” to the topic name