Hi everyone!
I tried a simple example with elastic-sink connector in Confluent Platform and I observed that if I connect my elastic-sink connector with elasticsearch in local(http://192.168.x.x:9200) it works well, but if I use my elasticsearch cloud the conenctor fails. So as far as I understood the elastic-sink connector does not work with elastic cloud, right? The example is the follow: https://docs.confluent.io/kafka-connect-elasticsearch/current/overview.html
Jessica,
It usually works, but it would help if you could share what is the error you’re getting while connecting with Elastic Cloud.
I followed the example and the connector is correctly created but when I searched for the index into elastic with curl -XGET ‘http://localhost:9200/test-elasticsearch-sink/_search?pretty’, I didn’t find anything and the connector in confluent platform switched to failed. I tried to see the error and I found this:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to create index test-elasticsearch-sink.
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:362)
at io.confluent.connect.elasticsearch.ElasticsearchClient.createIndex(ElasticsearchClient.java:411)
at io.confluent.connect.elasticsearch.ElasticsearchClient.createIndexOrDataStream(ElasticsearchClient.java:193)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.ensureIndexExists(ElasticsearchSinkTask.java:245)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:97)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
... 10 more
Caused by: ElasticsearchStatusException[Unable to parse response body]; nested: ResponseException[method [PUT], host [https://prova.kb.us-west1.gcp.cloud.es.io:9243], URI [/test-elasticsearch-sink?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]
{"statusCode":404,"error":"Not Found","message":"Not Found"}];
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1872)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1626)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1598)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1565)
at org.elasticsearch.client.IndicesClient.create(IndicesClient.java:145)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$createIndex$5(ElasticsearchClient.java:415)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:161)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:120)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:355)
... 15 more
Suppressed: ParsingException[Failed to parse object: expecting field with name [error] but found [statusCode]]
at org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)
at org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592)
at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:179)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1869)
... 23 more
Caused by: org.elasticsearch.client.ResponseException: method [PUT], host [https://prova.kb.us-west1.gcp.cloud.es.io:9243], URI [/test-elasticsearch-sink?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]
{"statusCode":404,"error":"Not Found","message":"Not Found"}
at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:302)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:272)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1613)
... 22 more
Did you created a role with the proper permissions and associated the role to the user being used by the connector?
The connector is failing to create the index on Elasticsearch, likely for a lack of permissions.
I didn’t create any role.
Pelase can you give me the link about the example in the image?
Thank you