Indexing record failed

Hello,

i’m using kafka connect to send logs to ELK. once a week, the connector task failed and i got this error message :

Caused by: org.apache.kafka.connect.errors.ConnectException: Indexing record failed → Response status: TOO_MANY_REQUESTS,\n Index: kafka-topic-xxxxxx,\n Document Id: kafka-topic-xxxxxxx+5+40187906\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.handleResponse(ElasticsearchClient.java:621)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:426)\n\tat org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:59)\n\tat org.elasticsearch.action.bulk.BulkRequestHandler$1.onResponse(BulkRequestHandler.java:56)\n\tat org.elasticsearch.action.ActionListener$RunAfterActionListener.onResponse(ActionListener.java:341)\n\tat org.elasticsearch.action.bulk.Retry$RetryHandler.finishHim(Retry.java:168)\n\tat org.elasticsearch.action.bulk.Retry$RetryHandler.onResponse(Retry.java:112)

Any idea what is exactly the problem and how to fix it ?
Thank you

This looks like ElasticSearch being overloaded and returning a 429. Some ideas to address this:

  1. Can you find any helpful ElasticSearch logging around the same time that the connector logs this error? Ideally it’d give some more helpful info to suggest what to do on the Elastic side or the connector side
  2. The connector itself has some configs that may help to lighten the load on Elastic, e.g., max.in.flight.requests defaults to 5. You might try lowering this and/or lowering bulk.size.bytes / batch.size. The relevant source code building the ES client is here and the corresponding Javadoc is here
  3. This Elastic doc describes the scenarios that may lead to this error (depleted thread pool / high CPU usage) as well as remedies. Some of those relate to the connector config (Spread out bulk requests = lower max.in.flight.requests) and some are strictly on the Elastic side (scaling the cluster, cancelling resource intensive searches)
  4. Ideally you can trigger this error so that you can try out remedies in the connector or on the Elastic side and then validate that you can no longer trigger it. This isn’t always possible, though, in which case you might be resorting more toward log analysis (on the ES side) and educated guess + test (where test unfortunately means waiting) of the remedy options.
1 Like

Hello,

thank you very much for your help.
Just applied this modification to the connector config file, and the problem was solved.

batch.size: 100
max.buffered.records: 1000
max.retries: 10
retry.backoff.ms: 1000
flush.timeout.ms: 10000
max.in.flight.requests: 3

Best regards

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.