Can't insert data to Elasticsearch from Confluent Kafka

I’ve been trying to send Topic messages to Elasticsearch Indexes but keep receiving the same error no matter what parameters I changed, when i run this config with an empty topic, it actually creates index on the other side but when i write messages to topic, instead of copying them to elastic indexes it fails with the below error, I added parameters to ignore schemas, schema.ignore, key.ignore etc. and I also tried to use built in value.converter etc. but just can’t make it run.

The Connector I run:

`curl -X POST -H "Content-Type: application/json" --data '{
  "name": "pageviews3",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "pageviews2",
    "connection.url": "http://xxxxxxxxxx:9200",
    "type.name": "kafka-connect",
    "connection.username": "elastic",
    "connection.password": "xxxxxxxxxxxx"
  }
}' http://xxxxx:8083/connectors`

Another Connector where i ignore schema registry etc.

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "page-views34",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://xxxx:9200",
    "tasks.max": "1",
    "topics": "pageviews2",
    "type.name": "_doc",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "schema.ignore": "true",
    "key.ignore": "true",
    "connection.username": "elastic",
    "connection.password": "xxxxxx"
  }
}' http://xxxxx:8083/connectors

I used default confluent datagenerator, it also creates schema for itself.

curl -i -X PUT http://localhost:8083/connectors/datagen_local_02/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews2",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

This is the error Output:

“org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic pageviews2 to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:513)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t… 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:244)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.(AbstractKafkaAvroDeserializer.java:334)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:202)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t… 17 more\n”

The error update after

Creating topic with this command & updating the connector like this,

kafka-console-producer --broker-list localhost:9092 --topic simple.elasticsearch.data2 --property "parse.key=false" --property "key.separator=" --property "value.serializer=org.apache.kafka.common.serialization.StringSerializer" --producer.config /etc/kafka/client.properties


curl -X POST -H "Content-Type: application/json" --data '{
  "name": "simple-elasticsearch2",
  "config": {
 "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "connection.url": "http://xxxxx:9200",
 "tasks.max": "1",
 "topics": "simple.elasticsearch.data2",
 "name": "simple-elasticsearch2",
 "type.name": "_doc",
 "value.converter": "org.apache.kafka.connect.storage.StringConverter",
 "value.converter.schemas.enable": "false",
 "schema.ignore": "true",
 "key.ignore": "true",
 "connection.username": "elastic",
 "connection.password": "xxxx"
  }
}' http://xxxxx:8083/connectors

Error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:443)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:64)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.ActionListener$RunAfterActionListener.onFailure(ActionListener.java:350)
at org.elasticsearch.action.ActionListener$Delegating.onFailure(ActionListener.java:66)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:123)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:216)
… 5 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to ‘org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes’ after 6 attempt(s)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:490)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:210)
… 5 more
Caused by: org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
at org.elasticsearch.common.compress.CompressorFactory.compressor(CompressorFactory.java:42)
at org.elasticsearch.common.xcontent.XContentHelper.createParser(XContentHelper.java:76)
at org.elasticsearch.client.RequestConverters.bulk(RequestConverters.java:226)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:2167)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:2137)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:2105)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:620)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:212)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
… 8 more

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.