Duplicate index documents in elastic search connector

Hi,

I am able to push documents from Kafka topic to Open search in AWS using the elastic search connector provided by confluent. But whenever message arrives in my topic, it creates a duplicate of the document in the search index. Anybody encountered this issue ?

Greatly appreciates any pointers.
The connector parameters are like,

name=:CONNECT_GROUP_ID

topics=:TOPIC_NAME
confluent.topic.bootstrap.servers=:BROKER_ENDPOINT
confluent.topic.group.id=:CONNECT_GROUP_ID


behavior.on.malformed.documents=IGNORE

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1 
connection.url=:ES_CONNECTION_URL
connection.username=:USE_NAME
connection.password=:USE_PASSWORD
type.name=_search
key.ignore=true
schema.ignore=true
batch.size=1
write.method=UPSERT
drop.invalid.message=true


transforms=flatten,replaceWhitelist

transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_

transforms.replaceWhitelist.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.replaceWhitelist.whitelist=address,id,name


Thanks
Arun

This tells the connector to ignore the key of the Kafka message, and thus create a new document in the target index each time.

Here’s a video that explains more:

and an accompanying article.

1 Like

Thanks a lot @rmoff . It works :slight_smile:

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.