Hi,
I try to use kafka connect to save data to mongodb. I also use schema registry to de/serialize avro messages from kafka. It’s works fine in my java app but when I try to create MongoSinkConnector i run on this error
[2022-06-13 16:29:47,136] ERROR Unable to process record SinkRecord{kafkaOffset=863932, timestampType=CreateTime} ConnectRecord{topic='zzz', kafkaPartition=1, key=null, keySchema=null, value=java.nio.HeapByteBuffer[pos=0 lim=164 cap=164], valueSchema=Schema{BYTES}, timestamp=1655136458764, headers=ConnectHeaders(headers=[ConnectHeader(key=scst_partition, value=1, schema=Schema{INT8}), ConnectHeader(key=contentType, value=application/vnd.networkpacket.v1+avro, schema=Schema{STRING}), ConnectHeader(key=spring_json_header_types, value={scst_partition=java.lang.Integer, contentType=java.lang.String}, schema=Schema{MAP})])} (com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
... 27 more
[2022-06-13 16:29:47,136] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
... 27 more
[2022-06-13 16:29:47,137] ERROR WorkerSinkTask{id=mongo-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Could not convert value `java.nio.HeapByteBuffer[pos=0 lim=164 cap=164]` into a BsonDocument.
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:169)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.containsKey(LazyBsonDocument.java:83)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.shouldAppend(DocumentIdAdder.java:68)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.lambda$process$0(DocumentIdAdder.java:51)
at java.base/java.util.Optional.ifPresent(Optional.java:183)
at com.mongodb.kafka.connect.sink.processor.DocumentIdAdder.process(DocumentIdAdder.java:49)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$1(MongoProcessedSinkRecordData.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.lambda$buildWriteModel$2(MongoProcessedSinkRecordData.java:90)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.tryProcess(MongoProcessedSinkRecordData.java:105)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.buildWriteModel(MongoProcessedSinkRecordData.java:85)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.createWriteModel(MongoProcessedSinkRecordData.java:81)
at com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData.<init>(MongoProcessedSinkRecordData.java:51)
at com.mongodb.kafka.connect.sink.MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(MongoSinkRecordProcessor.java:45)
at com.mongodb.kafka.connect.sink.StartedMongoSinkTask.put(StartedMongoSinkTask.java:75)
at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
... 10 more
Caused by: org.apache.kafka.connect.errors.DataException: No converter present due to unexpected object type: java.nio.HeapByteBuffer
at com.mongodb.kafka.connect.sink.converter.SinkConverter.getRecordConverter(SinkConverter.java:92)
at com.mongodb.kafka.connect.sink.converter.SinkConverter.lambda$convert$1(SinkConverter.java:60)
at com.mongodb.kafka.connect.sink.converter.LazyBsonDocument.getUnwrapped(LazyBsonDocument.java:166)
... 27 more
[2022-06-13 16:29:47,137] INFO Stopping MongoDB sink task (com.mongodb.kafka.connect.sink.MongoSinkTask)
[2022-06-13 16:29:47,139] INFO [Consumer clientId=connector-consumer-mongo-sink-0, groupId=connect-mongo-sink] Revoke previously assigned partitions zzz-1 pzzz-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Part of my configuration:
connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
"connection.uri":"mongodb://mongodb:27017",
"database":"xxx",
"collection":yyy",
"topics":"zzz",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",