Does Elastic-sink connector work with elastic cloud?

Hi everyone!
I tried a simple example with elastic-sink connector in Confluent Platform and I observed that if I connect my elastic-sink connector with elasticsearch in local(http://192.168.x.x:9200) it works well, but if I use my elasticsearch cloud the conenctor fails. So as far as I understood the elastic-sink connector does not work with elastic cloud, right? The example is the follow: Elasticsearch Service Sink Connector for Confluent Platform | Confluent Documentation

Jessica,

It usually works, but it would help if you could share what is the error you’re getting while connecting with Elastic Cloud.

@riferrei

I followed the example and the connector is correctly created but when I searched for the index into elastic with curl -XGET ‘http://localhost:9200/test-elasticsearch-sink/_search?pretty’, I didn’t find anything and the connector in confluent platform switched to failed. I tried to see the error and I found this:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to create index test-elasticsearch-sink.
        at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:362)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.createIndex(ElasticsearchClient.java:411)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.createIndexOrDataStream(ElasticsearchClient.java:193)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.ensureIndexExists(ElasticsearchSinkTask.java:245)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:97)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        ... 10 more
Caused by: ElasticsearchStatusException[Unable to parse response body]; nested: ResponseException[method [PUT], host [https://prova.kb.us-west1.gcp.cloud.es.io:9243], URI [/test-elasticsearch-sink?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]
{"statusCode":404,"error":"Not Found","message":"Not Found"}];
        at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1872)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1626)
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1598)
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1565)
        at org.elasticsearch.client.IndicesClient.create(IndicesClient.java:145)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$createIndex$5(ElasticsearchClient.java:415)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:161)
        at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:120)
        at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:355)
        ... 15 more
        Suppressed: ParsingException[Failed to parse object: expecting field with name [error] but found [statusCode]]
                at org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)
                at org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592)
                at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:179)
                at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
                at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1869)
                ... 23 more
Caused by: org.elasticsearch.client.ResponseException: method [PUT], host [https://prova.kb.us-west1.gcp.cloud.es.io:9243], URI [/test-elasticsearch-sink?master_timeout=30s&timeout=30s], status line [HTTP/1.1 404 Not Found]
{"statusCode":404,"error":"Not Found","message":"Not Found"}
        at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:302)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:272)
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:246)
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1613)
        ... 22 more

Did you created a role with the proper permissions and associated the role to the user being used by the connector?

The connector is failing to create the index on Elasticsearch, likely for a lack of permissions.

2 Likes

I didn’t create any role.
Pelase can you give me the link about the example in the image?
Thank you

Absolutely. Here it is:

https://docs.confluent.io/kafka-connect-elasticsearch/current/overview.html?_ga=2.162596476.1307680525.1629727917-392858331.1621873920#prerequisites

1 Like