Elasticsearch Sink Connector, unrecoverable exception

Hi,

I am new to Kafka but I managed to setup an instance of Kafka on my local machine. I have one process which adds a message into a topic once per second. I have setup confluentinc-kafka-connect-elasticsearch-11.0.4, it forwards messages to ElasticSearch.

I tried to see what happens if I disconnect from the Internet. I see an error

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Bulk request failed.

If I connect back, no more messages are sent to ElastiSearch and the connector has to be terminated (Ctrl+C) and started again. Then it runs ok.

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=quickstart-events
key.ignore=true
connection.url=https://abcd.eu-west-1.es.amazonaws.com:443
connection.username=user
connection.password=pass
type.name=kafka-connect
transforms=TimestampRouter
transforms.TimestampRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.TimestampRouter.topic.format=logstash-test-${timestamp}
transforms.TimestampRouter.timestamp.format=YYYY.MM.dd
drop.invalid.message=true
behavior.on.malformed.documents=WARN
max.retries=1000
retry.backoff.ms=100
connection.compression=false
max.connection.idle.time.ms=60000
connection.timeout.ms=1000 
read.timeout.ms=15000

Is there some recovery mechanism which could be used? I would like to avoid killing and starting the connector when there is a connection problem for a while.

Full log: JustPaste.it - Share Text & Images the Easy Way

Thank you, Martin.

@MPeli you should be able to adjust max.retries and retry.backoff.ms in order to increase the outage tolerance.

https://docs.confluent.io/kafka-connect-elasticsearch/current/index.html#automatic-retries

Thank you very much for your answer, I will try to changemax.retries and retry.backoff.ms.

I wrote a PowerShell script which checks every 5 minutes for failed tasks and restarts them. I got inspired by this post Automatically restarting failed Kafka Connect tasks

Even thought the script was running, Elasticsearch Service Sink Connector stopped sending data to ElasticSearch (a task was running). Once I restarted Kafka Connect, data was sent to ElastiSearch.

I was wondering how you deal with such problems. Is it possible to check programmatically that Elasticsearch Service Sink Connector has to be restarted? Is the Sink Connector able to restart on its own?

Thank you.

I found following lines in server.log and connect.log

server.log

[2021-06-30 13:30:40,863] INFO [GroupCoordinator 0]: Member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 in group connect-elasticsearch-sink has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-06-30 13:30:40,870] INFO [GroupCoordinator 0]: Preparing to rebalance group connect-elasticsearch-sink in state PreparingRebalance with old generation 19 (__consumer_offsets-28) (reason: removing member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-06-30 13:30:40,873] INFO [GroupCoordinator 0]: Group connect-elasticsearch-sink with generation 20 is now empty (__consumer_offsets-28) (kafka.coordinator.group.GroupCoordinator)

connect.log

[2021-06-30 13:25:30,484] WARN Bulk request 77736 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.ConnectException: Timeout connecting to [xxx.eu-west-1.es.amazonaws.com/xxx.xxx.xxx.xxx:xxx]
	at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
	at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:628)
	at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894)
	at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
	at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
	at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
	at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
	at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
	at java.base/java.lang.Thread.run(Thread.java:831)
[2021-06-30 13:30:31,353] INFO [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 sending LeaveGroup request to coordinator host.docker.internal:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1042)

the script

$connector = "http://localhost:8083/"

While($true) {
    $tasks = try { 
        Invoke-RestMethod "$($connector)connectors/elasticsearch-sink/tasks" -ErrorAction Stop
    } catch [System.Net.WebException] { 
        Write-Host "An exception was caught: $($_.Exception.Message)"
        $_.Exception.Response
    }

    Foreach($task in $tasks)
    {
        if ($task.PSobject.Properties.name -match "id") {
            $id = $task.id.task;
            $status = try { 
                Invoke-RestMethod "$($connector)connectors/elasticsearch-sink/tasks/$($id)/status" -ErrorAction Stop
            } catch [System.Net.WebException] { 
                Write-Host "An exception was caught: $($_.Exception.Message)"
                $_.Exception.Response
            }

            if ($status.PSobject.Properties.name -match "state") {
                if ($status.state -eq "FAILED")
                {
                    $restart = try { 
                        Invoke-RestMethod -Method Post -Uri "$($connector)connectors/elasticsearch-sink/tasks/$($id)/restart" -contentType "application/json"
                        $_.Exception.Response
                    } catch [System.Net.WebException] { 
                        Write-Host "An exception was caught: $($_.Exception.Message)"
                    }

                    Write-Host "task $($id) has been restarted"        
                }
            }
        }
    }

    Start-Sleep -Seconds 300
}

This issue looks exactly the same Connector shows up as "RUNNING" after it stopped working and entered unending rebalance · Issue #507 · confluentinc/kafka-connect-elasticsearch · GitHub

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.