Hi everybody! I’m trying to use Confluent platform to take input data from a MQTT broker and send it to Elastic cloud. I used a mqtt source connector to take input data successfully.
The problem is that I get “failed” status on the elastic sink connector . Here the config of the elastic sink connector:
{
“name”: “elasticsearch-sink”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“tasks.max”: “1”,
“topics”: “ciao”,
“key.ignore”: “true”,
“connection.url”: “https://prova1.kb.eastus2.azure.elastic-cloud.com:9243 ”,
“connection.username”: “elastic”,
“connection.password”: “**************”,
“type.name”: “kafka-connect”,
“name”: “ciao”
},
“tasks”: ,
“type”: null
}
I also tried to run some simple example of elastic sink connector but the status its always “failed”. I can’t undestard the problem!!
1 Like
rmoff
5 August 2021 09:26
2
You’ll need to check the logs of the Kafka Connect worker to find the actual error.
Useful links:
Thank you for your response! I checked the log and this is the error:
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Failed to create index ciao.\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:371)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.createIndex(ElasticsearchClient.java:189)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.ensureIndexExists(ElasticsearchSinkTask.java:163)\n\tat io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:90)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)\n\t... 10 more\nCaused by: ElasticsearchStatusException[Unable to parse response body]; nested: ResponseException[method [PUT], host [https://prova1.kb.eastus2.azure.elastic-cloud.com:9243], URI [/ciao?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]\n{\"statusCode\":404,\"error\":\"Not Found\",\"message\":\"Not Found\"}];\n\tat org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1686)\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1446)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418)\n\tat org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1385)\n\tat org.elasticsearch.client.IndicesClient.create(IndicesClient.java:125)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$createIndex$2(ElasticsearchClient.java:193)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:161)\n\tat io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:120)\n\tat io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:364)\n\t... 14 more\n\tSuppressed: ParsingException[Failed to parse object: expecting field with name [error] but found [statusCode]]\n\t\tat org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)\n\t\tat org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:587)\n\t\tat org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169)\n\t\tat org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1706)\n\t\tat org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1683)\n\t\t... 22 more\nCaused by: org.elasticsearch.client.ResponseException: method [PUT], host [https://prova1.kb.eastus2.azure.elastic-cloud.com:9243], URI [/ciao?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]\n{\"statusCode\":404,\"error\":\"Not Found\",\"message\":\"Not Found\"}\n\tat org.elasticsearch.client.RestClient.convertResponse(RestClient.java:260)\n\tat org.elasticsearch.client.RestClient.performRequest(RestClient.java:238)\n\tat org.elasticsearch.client.RestClient.performRequest(RestClient.java:212)\n\tat org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1433)\n\t... 21 more\n
How can I solve it?
system
Closed
4 September 2021 14:20
4
This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.