Kafka Connect Elastic Connector issue with updates

Hi i am using debezium mongo as a source and elastic sink connector. My inserts are working fine , but for updates since mongo only sends a patch with message structure as given below with only the columns that has changed , the elastic connector replaces the entire documented with only the fields in patch.

So if my initial json has 10 fields and if the updates comes with just 2 fields , the entire document gets replaced with 2 fields and rest of then are lost.

Is there a way to update the original document just for the 2 fields and leave the rest of the field as it is.

{“after”: null, “patch”: “{"$v": 1,"$set": {"orderMode": "ab"}}”, “filter”: “{"_id": "38815002_so_us"}”, “source”: {“version”: “1.6.0.Final”, “connector”: “mongodb”, “name”: “cdc_ordersdb”, “ts_ms”: 1633042350000, “snapshot”: “false”, “db”: “orders”, “sequence”: null, “rs”: “s0”, “collection”: “test”, “ord”: 2, “h”: 1924224555192216145, “tord”: null, “stxnid”: null}, “op”: “u”, “ts_ms”: 1633042350198, “transaction”: null}

Here is. my configuration
{
“name”: “sink_order_orderhistorydb_elastic_v2.test”,

  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "cdc_ordersdb.orders.test",
  "tasks.max": "1",

  "connection.url": "xxx",
  "connection.username": "xxx",
  "connection.password": "xxx",


  "transforms": "unwrap,extractKey,dropPrefix",
  "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
  "transforms.unwrap.drop.tombstones": "false",
  "transforms.unwrap.delete.handling.mode": "drop",
  "transforms.unwrap.array.encoding": "document",
  
  "transforms.dropPrefix.regex": "(.*)\\.(.*)\\.(.*)",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.dropPrefix.replacement": "cdc_$3",

  "transforms.extractKey.type"  :"org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field" :"id",

  "errors.log.enable": "true",
  "errors.tolerance": "all",
  "errors.log.include.messages": "true",
  "errors.deadletterqueue.topic.name": "__deadletter.sink_order_orderhistorydb_elastic_v2.test",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.deadletterqueue.context.headers.enable": "true",

  "value.converter.schemas.enable": "true",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "xxx",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schemas.enable": "true",
  "key.converter.schema.registry.url": "xxx",

  "behavior.on.null.values": "DELETE",
  "behavior.on.malformed.documents": "WARN",
  "write.method": "UPSERT",

  "key.ignore": "false",
  "schema.ignore": "true",
  "consumer.override.max.partition.fetch.bytes":"20971520",

  "read.timeout.ms": "60000",
  "batch.size": "2000"

}

Can you, please, provide an example of how the document on Elasticsearch looks like?

I have the feeling that this is happening because you are sending the documents AS-IS from what Debezium delivers to the Kafka topic. Given Elasticsearch’s immutable nature, a complete replacement is occurring. If this is the case, you might need to implement a transformation layer between Kafka and Elasticsearch — using either SMT’s or maybe ksqlDB to produce a well-formed document.

1 Like

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.