I am new to Kafka but I managed to setup an instance of Kafka on my local machine. I have one process which adds a message into a topic once per second. I have setup confluentinc-kafka-connect-elasticsearch-11.0.4, it forwards messages to ElasticSearch.
I tried to see what happens if I disconnect from the Internet. I see an error
Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Bulk request failed.
If I connect back, no more messages are sent to ElastiSearch and the connector has to be terminated (Ctrl+C) and started again. Then it runs ok.
Is there some recovery mechanism which could be used? I would like to avoid killing and starting the connector when there is a connection problem for a while.
Even thought the script was running, Elasticsearch Service Sink Connector stopped sending data to ElasticSearch (a task was running). Once I restarted Kafka Connect, data was sent to ElastiSearch.
I was wondering how you deal with such problems. Is it possible to check programmatically that Elasticsearch Service Sink Connector has to be restarted? Is the Sink Connector able to restart on its own?
Thank you.
I found following lines in server.log and connect.log
server.log
[2021-06-30 13:30:40,863] INFO [GroupCoordinator 0]: Member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 in group connect-elasticsearch-sink has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-06-30 13:30:40,870] INFO [GroupCoordinator 0]: Preparing to rebalance group connect-elasticsearch-sink in state PreparingRebalance with old generation 19 (__consumer_offsets-28) (reason: removing member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-06-30 13:30:40,873] INFO [GroupCoordinator 0]: Group connect-elasticsearch-sink with generation 20 is now empty (__consumer_offsets-28) (kafka.coordinator.group.GroupCoordinator)
connect.log
[2021-06-30 13:25:30,484] WARN Bulk request 77736 failed. Retrying request. (io.confluent.connect.elasticsearch.ElasticsearchClient:335)
java.net.ConnectException: Timeout connecting to [xxx.eu-west-1.es.amazonaws.com/xxx.xxx.xxx.xxx:xxx]
at org.apache.http.nio.pool.RouteSpecificPool.timeout(RouteSpecificPool.java:169)
at org.apache.http.nio.pool.AbstractNIOConnPool.requestTimeout(AbstractNIOConnPool.java:628)
at org.apache.http.nio.pool.AbstractNIOConnPool$InternalSessionRequestCallback.timeout(AbstractNIOConnPool.java:894)
at org.apache.http.impl.nio.reactor.SessionRequestImpl.timeout(SessionRequestImpl.java:183)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processTimeouts(DefaultConnectingIOReactor.java:210)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:155)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.base/java.lang.Thread.run(Thread.java:831)
[2021-06-30 13:30:31,353] INFO [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Member connector-consumer-elasticsearch-sink-0-01f75d48-a21a-4423-b9ca-c46c35bc82a1 sending LeaveGroup request to coordinator host.docker.internal:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1042)