Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument

Hi,
I try to use kafka connect to save data to mongodb. I also use schema registry to de/serialize avro messages from kafka. It’s works fine in my java app but when I try to create MongoSinkConnector i run on this error

[2022-06-13 16:29:47,136] ERROR Unable to process record SinkRecord{kafkaOffset=863932, timestampType=CreateTime} ConnectRecord{topic='zzz', kafkaPartition=1, key=null, keySchema=null, value=java.nio.HeapByteBuffer[pos=0 lim=164 cap=164], valueSchema=Schema{BYTES}, timestamp=1655136458764, headers=ConnectHeaders(headers=[ConnectHeader(key=scst_partition, value=1, schema=Schema{INT8}), ConnectHeader(key=contentType, value=application/vnd.networkpacket.v1+avro, schema=Schema{STRING}), ConnectHeader(key=spring_json_header_types, value={scst_partition=java.lang.Integer, contentType=java.lang.String}, schema=Schema{MAP})])} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
	... 27 more
[2022-06-13 16:29:47,136] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
	... 27 more
[2022-06-13 16:29:47,137] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
	at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
	at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
	at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
	at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
	at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
	... 10 more
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
	at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
	at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
	... 27 more
[2022-06-13 16:29:47,137] INFO Stopping MongoDB sink task (com.mongodb.kafka.connect.sink.MongoSinkTask)
[2022-06-13 16:29:47,139] INFO [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Revoke previously assigned partitions zzz-1 pzzz-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Part of my configuration:

connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
         "connection.uri":"mongodb://mongodb:27017",
         "database":"xxx",
         "collection":yyy",
         "topics":"zzz",
         "value.converter": "io.confluent.connect.avro.AvroConverter",
         "value.converter.schema.registry.url":"http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
	
        "key.converter.schemas.enable":"false",
        "value.converter.schemas.enable":"false",

You can remove value.converter.schemas.enable. Avro always has a schema and cannot be disabled. This property is only applicable to the json converter.

What is the value for your key converter? Is your producer actually sending HeapByteBuffer data?

Have you tried opening this issue in the Mongo sink github repo?

I try value.converter.schemas.enable with same results :confused:

Key is irrelevant because its empty.

I will create issue on Mongo sink github repo.

Like I said, Avro always has a schema, so that property is not used.

You’ll need to share your producer code