Hi there,
While doing the POC for elasticsearch sink connector (in standalone mode). We are facing following issues. Kindly guide us.
System details:
OS - Red Hat Enterprise Linux 8.9 (Ootpa)
Kafka flavor - Apache Kafka
Version - 2.12-2.8.2
Elastic search version - 8.6.2
Purpose of POC - sending the data from kafka topic to elastic search via elasticsearch sink connector
Elasticsearch connector version - confluentinc-kafka-connect-elasticsearch-14.0.15
Mode - Sink
#######################################################
configuration files
- connect-standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schemas.enable=true
#value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/etc/kafka-connect/confluentinc-kafka-connect-elasticsearch-14.0.15/
#######################################################
- quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=test-topic1
key.ignore=true
connection.url=http://localhost:9200
type.name=_doc
schemas.enable=false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
#######################################################
Connect- logs
INFO ElasticsearchSinkConnectorConfig values:
batch.size = 2000
behavior.on.malformed.documents = FAIL
behavior.on.null.values = FAIL
bulk.size.bytes = 5242880
compact.map.entries = true
connection.compression = false
connection.password = null
connection.timeout.ms = 1000
connection.url = [http://localhost:9200]
connection.username = null
data.stream.dataset =
data.stream.timestamp.field =
data.stream.type = NONE
drop.invalid.message = false
elastic.https.ssl.cipher.suites = null
elastic.https.ssl.enabled.protocols = [TLSv1.2]
elastic.https.ssl.endpoint.identification.algorithm = https
elastic.https.ssl.engine.factory.class = null
elastic.https.ssl.key.password = null
elastic.https.ssl.keymanager.algorithm = SunX509
elastic.https.ssl.keystore.certificate.chain = null
elastic.https.ssl.keystore.key = null
elastic.https.ssl.keystore.location = null
elastic.https.ssl.keystore.password = null
elastic.https.ssl.keystore.type = JKS
elastic.https.ssl.protocol = TLSv1.2
elastic.https.ssl.provider = null
elastic.https.ssl.secure.random.implementation = null
elastic.https.ssl.trustmanager.algorithm = PKIX
elastic.https.ssl.truststore.certificates = null
elastic.https.ssl.truststore.location = null
elastic.https.ssl.truststore.password = null
elastic.https.ssl.truststore.type = JKS
elastic.security.protocol = PLAINTEXT
external.version.header =
flush.synchronously = false
flush.timeout.ms = 180000
kerberos.keytab.path = null
kerberos.user.principal = null
key.ignore = true
linger.ms = 1
log.sensitive.data = false
max.buffered.records = 20000
max.connection.idle.time.ms = 60000
max.in.flight.requests = 5
max.retries = 5
proxy.host =
proxy.password = null
proxy.port = 8080
proxy.username =
read.timeout.ms = 3000
retry.backoff.ms = 100
schema.ignore = false
topic.key.ignore =
topic.schema.ignore =
write.method = INSERT
[2024-06-05 11:57:15,484] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:190)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
** at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)**
** at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:497)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)**
** at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)**
** at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)**
** at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)**
** at java.util.concurrent.FutureTask.run(FutureTask.java:266)**
** at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)**
** at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)**
** at java.lang.Thread.run(Thread.java:750)**
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires “schema” and “payload” fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
** at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)**
** at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:541)**
** at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:497)**
** at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)**
** at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)**
** … 13 more**
After firing curl command
curl localhost:8083/connectors/elasticsearch-sink/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 2409 100 2409 0 0 588k 0 --:–:-- --:–:-- --:–:-- 588k
{
“name”: “elasticsearch-sink”,
“connector”: {
“state”: “RUNNING”,
“worker_id”: “127.0.0.1:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “FAILED”,
“worker_id”: “127.0.0.1:8083”,
“trace”: "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:497)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:474)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:541)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:497)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t… 13 more\n"
}
],
“type”: “sink”
}