I have recently started working with the Managed Opensearch sink connector. The sink is consuming a topic which is the output of a ksql table. Every message being put in the topic has a defined key. While opensearch sink connector is consuming the data, it generates new document inside the index even for same keys with random _ids. How can I :
- Set the _id of the documents as the topic key.
- Even if I can’t create the _id myself, how do I ensure that same key always creates a same _id
What happens if you explicitly set an _id
field in the value? Either in ksqlDB (see AS_VALUE
) or, if you have a document ID already in the value, you can add one of the ReplaceField$Value
SMTs (Apache Kafka, Confluent).
I tried SMTs as well and I did set the _id explicitly but that didn’t help as well. It still went ahead and created a random _id in opensearch document.
attaching image for reference :
What index{#}.batch.size
is in the connector config? It looks like the default of 1 might lead to the issue you are seeing, but anything greater ought to work as long as _id
is in the value.
It was indeed 1, but I tried bumping it to 10, still no luck
Also adding _id in message value leads to error :
"{\"took\":7,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"order_index_test\",\"_id\":\"t4bzk5QBiA06QGNGvcah\",\"status\":400,\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse field [_id] of type [_id] in document with id 't4bzk5QBiA06QGNGvcah'. Preview of field's value: '123'\",\"caused_by\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.\"}}}}]}"
Sorry – I took a closer look at the connector source and the OpenSearch REST API and don’t believe that this is currently supported. Confirming that.
ahhh, too bad. I think custom connector would the way to go here then(unless you think otherwise). Thank you for the responses though.
That, or the other thing you can try is the HTTP Sink connector to use the PUT <index>/_doc/<_id>
API. Both the v1 and v2 connectors support template params so you should be able to tack ${key}
onto the URL to serve as the document ID and upsert. I haven’t tested this out but it’s worth a shot.