Hi i am using debezium mongo as a source and elastic sink connector. My inserts are working fine , but for updates since mongo only sends a patch with message structure as given below with only the columns that has changed , the elastic connector replaces the entire documented with only the fields in patch.
So if my initial json has 10 fields and if the updates comes with just 2 fields , the entire document gets replaced with 2 fields and rest of then are lost.
Is there a way to update the original document just for the 2 fields and leave the rest of the field as it is.
{“after”: null, “patch”: “{"$v": 1,"$set": {"orderMode": "ab"}}”, “filter”: “{"_id": "38815002_so_us"}”, “source”: {“version”: “1.6.0.Final”, “connector”: “mongodb”, “name”: “cdc_ordersdb”, “ts_ms”: 1633042350000, “snapshot”: “false”, “db”: “orders”, “sequence”: null, “rs”: “s0”, “collection”: “test”, “ord”: 2, “h”: 1924224555192216145, “tord”: null, “stxnid”: null}, “op”: “u”, “ts_ms”: 1633042350198, “transaction”: null}
Here is. my configuration
{
“name”: “sink_order_orderhistorydb_elastic_v2.test”,
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "cdc_ordersdb.orders.test",
"tasks.max": "1",
"connection.url": "xxx",
"connection.username": "xxx",
"connection.password": "xxx",
"transforms": "unwrap,extractKey,dropPrefix",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "drop",
"transforms.unwrap.array.encoding": "document",
"transforms.dropPrefix.regex": "(.*)\\.(.*)\\.(.*)",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.replacement": "cdc_$3",
"transforms.extractKey.type" :"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field" :"id",
"errors.log.enable": "true",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "__deadletter.sink_order_orderhistorydb_elastic_v2.test",
"errors.deadletterqueue.topic.replication.factor": "3",
"errors.deadletterqueue.context.headers.enable": "true",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "xxx",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "xxx",
"behavior.on.null.values": "DELETE",
"behavior.on.malformed.documents": "WARN",
"write.method": "UPSERT",
"key.ignore": "false",
"schema.ignore": "true",
"consumer.override.max.partition.fetch.bytes":"20971520",
"read.timeout.ms": "60000",
"batch.size": "2000"
}