Mongo DB sink connector failing with Avro Convertor

Below is my Kafka Source Connector confiugration

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
connection.uri=mongodb://127.0.0.1:27017
database=source_db
collection=source_test
topic.prefix=
topic.suffix=avro
startup.mode=copy_existing
output.format.key=json
output.format.value=schema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.id=1

I am trying to use the sink connector to sink the information from topic to mongo db, using the below configuration

name=mongo-sink-connector
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=source_db.source_test.avro
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.id=1
connection.uri=mongodb://127.0.0.1:27017
database=destination_db
collection=avro_system_infos
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

I am getting the below error
org.apache.kafka.connect.errors.DataException: Unexpected documentKey field type, expecting a document but found BsonString{value=‘{“_id”: {“$oid”: “650316254be50b6b73612588”}}’}: {“_id”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}, “copyingData”: true}”, “operationType”: “insert”, “fullDocumentBeforeChange”: null, “fullDocument”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}, “name”: “random_name”, “age”: 36, “address”: “sample test address, street name sample, whatever”, “city”: “BLR”, “country”: “KL”}”, “ns”: {“db”: “source_db”, “coll”: “source_test”}, “to”: null, “documentKey”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}}”, “updateDescription”: null, “clusterTime”: null, “txnNumber”: null, “lsid”: null}

Looks to me the issue is corresponding to change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler

If I am not using it, the data is getting dumped as a complete as in Value part of the topic, instead of just the payload.
With the configuration, it is giving the error related to documentkey as providede above

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.