ElasticSearch Sink connector issue

I am using confluent elastic search sink connector, which is used to sink the Kafka messages of a topic (X) to Elastic search.
The topic (X) gets message on join of other topics,
I had observed that many messages in the topic (X) has same (ROWKEY,ROWTIME) e.g.
rk,rt, p1
rk,rt, p2
here rk and rt are rowkey and rowtime and p1 and p2 are payload. message are in order to offset.
so, my expectation is p2 has to be updated in the ES rt document, but this is random.
please suggest how to ensure that p2 has to be written always in such scenario.

Hi @joshi, welcome to the forum.

I’m struggling a bit to understand your question. Can you elaborate with specific record examples from the topic and what you see on Elasticsearch? And can you share your sink connector configuration please?

My goal is to flattening MySQL tables in Kafka using KSQL and then sink the data into Elastic search.
on stream-stream join I see the records having same ROWTIME and ROWKEY,
In elastic search sink connector I am using key.ignore=false to make sure update/Insert of the messages (having same ROWKEY) in Elastic search document.
I was expecting the latest record must be updated into the elastic search, but this was random.
ex. messages in Kafka topics after stream-stream join

+---------------------------------+---------------------------------+---------------------------------+
|ROWKEY                           |ROWTIME                          |RS_STATUS_NAME                   |
+---------------------------------+---------------------------------+---------------------------------+
|1623816125045                    |3288234                          |[integ] build in progress        |
|1623816125045                    |3288234                          |[integ] build finished           |
|1623816125045                    |3288234                          |[integ] regression started       |
|1623816125045                    |3288234                          |[integ] regression failures found|
|1623816125045                    |3288234                          |integ requested                  |
|1623816125045                    |3288234                          |approved                         |
|1623816125045                    |3288234                          |checked in                       |
|1623816125045                    |3288234                          |checked in                       |

but the data into Elastic search is updated for id 3288234 is integ requested instead of checked in.

I had gone through the sink connector code and found that if we making key.ignore to false then we are making a strongly coupled system for Kafka and elastic search.
If we freshly launch Kafka then elastic search also needed a fresh launch.

@rmoff adding to my previous comment,
I am seeing that elastic search is using version_type internal and it causes race condition for record update.
can you please suggest how I can config version_type to external, so that Elastic search version get updated to record’s kafkaOffset and thus the update in elastic search will always be last one.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.