Below is my Kafka Source Connector confiugration
name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1
connection.uri=mongodb://127.0.0.1:27017
database=source_db
collection=source_test
topic.prefix=
topic.suffix=avro
startup.mode=copy_existing
output.format.key=json
output.format.value=schema
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.id=1
I am trying to use the sink connector to sink the information from topic to mongo db, using the below configuration
name=mongo-sink-connector
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=source_db.source_test.avro
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
value.converter.schema.registry.id=1
connection.uri=mongodb://127.0.0.1:27017
database=destination_db
collection=avro_system_infos
document.id.strategy=com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy
change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
I am getting the below error
org.apache.kafka.connect.errors.DataException: Unexpected documentKey field type, expecting a document but found BsonString{value=‘{“_id”: {“$oid”: “650316254be50b6b73612588”}}’}: {“_id”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}, “copyingData”: true}”, “operationType”: “insert”, “fullDocumentBeforeChange”: null, “fullDocument”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}, “name”: “random_name”, “age”: 36, “address”: “sample test address, street name sample, whatever”, “city”: “BLR”, “country”: “KL”}”, “ns”: {“db”: “source_db”, “coll”: “source_test”}, “to”: null, “documentKey”: “{“_id”: {“$oid”: “650316254be50b6b73612588”}}”, “updateDescription”: null, “clusterTime”: null, “txnNumber”: null, “lsid”: null}
Looks to me the issue is corresponding to change.data.capture.handler=com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler
If I am not using it, the data is getting dumped as a complete as in Value part of the topic, instead of just the payload.
With the configuration, it is giving the error related to documentkey as providede above