Kafka connect to ES

Hello I am pretty new to kafka connect only about 2 weeks in of using it so bare with me.

Our environment is all in the AWS. So we have Kafka brokers and ES in AWS.
We are currently trying to send data from sql server to ES.
We have gotten all the data into a producer and now trying to stream the data to ES.
I am currently running in distributed mode with a ES sink connector.

Once I run that seems to start pushing data into ES pretty quickly but then kinda get stuck.
I am able to query the data that does come in though.
This is where it usually get stuck.

INFO WorkerSinkTask{id=elastic-search-smb-2-0} Committing offsets asynchronously using sequence number 5: {color.smd.abs-9=OffsetAndMetadata{offset=780831, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-7=OffsetAndMetadata{offset=787143, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-8=OffsetAndMetadata{offset=781276, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-5=OffsetAndMetadata{offset=780728, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-6=OffsetAndMetadata{offset=779078, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-3=OffsetAndMetadata{offset=781514, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-4=OffsetAndMetadata{offset=782010, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-1=OffsetAndMetadata{offset=781538, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-2=OffsetAndMetadata{offset=778608, leaderEpoch=null, metadata=β€˜β€™}, color.smd.abs-0=OffsetAndMetadata{offset=777293, leaderEpoch=null, metadata=β€˜β€™}} (org.apache.kafka.connect.runtime.WorkerSinkTask:346)

After a little time it will then throw this error below:

sending LeaveGroup request to coordinator b-2.*****.amazonaws.com:9092 due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

Not sure if its to much data being pushed through or something wrong with my configuration. Also not sure what config file where this max.poll.interval can be added to.

Any help would be much appreciated.

Hi,

In a nutshell, what is happening is that time required to send each batch to ES is longer than the time between subsequent Kafka poll calls and the heartbeat does not arrive on time.

As it’s mentioned in the log message, you have to either increase max.poll.interval.ms or make smaller batches when sending data to ES. For this, you can specify any of them in your connector configuration. The property keys are, respectively:

consumer.max.poll.interval.ms
batch.size

More information here: https://docs.confluent.io/kafka-connect-elasticsearch/current/configuration_options.html#

Hope this helps!

Thank you. So this would go in the ES connector when i create that correct?
Also i don’t see consumer.max.poll.interval.ms in the doc.

Would it be connection.timeout.ms?

That setting is not in the ES Sink Connector, but in the common consumer Connector configuration (you can imagine it like a parent class). More information here:

https://docs.confluent.io/home/connect/userguide.html#producer-and-consumer-overrides

In any case, the key is the one I mentioned consumer.max.poll.interval.ms

Sorry im still kind of confused. When I run the worker I use
bin/connect-distributed etc/kafka/connect-distributed.properties

Does it mean i need to add consumer.max.poll.interval.ms to the connect-distributed.properties file?

Yep, that’s correct :slight_smile:

Sorry two more questions.

Anything to do with the consumer I need to put that in front.
so for > batch.size I need to use > consumer.batch.size. Is that correct?

Also, I am running this bin/connect-distributed -daemon. Is this the best way to run it in the background or is like a service better? How do users usually do this in production? If you do run it as a daemon whats the best way to stop the worker?

Thank you again you have been a really great help.

batch.size is not a consumer setting but a specific setting define by the ES Sink Connector, no consumer prefix is needed. More details here.

For production environments, it’s recommended to run Kafka Connect in distributed mode. You can do it in the background or as a service, up to you. To stop the process, send a kill signal to the connect process kill <pid>.

1 Like

Great thank you. So the batch.size is part of the ES config connector when you create that.

The consumer.max.poll.interval.ms goes in the connect-distributed.properties.

Again thank you for the quick responses you been very helpful as I am new to kafka.

So I was able to add those new parameters and I do not get the error anymore. It seems like it works for the first 30 seconds and sends lots of data but then after that it seems to get stuck. I don’t see any error though. Only thing I see in the log is this.

INFO WorkerSinkTask{id=elastic-search-smb-4-0} Committing offsets asynchronously using sequence number 6: {color.smd.abs-9=OffsetAndMetadata{offset=764214, leaderEpoch=null, metadata=''}, color.smd.abs-7=OffsetAndMetadata{offset=772233, leaderEpoch=null, metadata=''}, color.smd.abs-8=OffsetAndMetadata{offset=764626, leaderEpoch=null, metadata=''}, color.smd.abs-5=OffsetAndMetadata{offset=764059, leaderEpoch=null, metadata=''}, color.smd.abs-6=OffsetAndMetadata{offset=764335, leaderEpoch=null, metadata=''}, color.smd.abs-3=OffsetAndMetadata{offset=765716, leaderEpoch=null, metadata=''}, color.smd.abs-4=OffsetAndMetadata{offset=765397, leaderEpoch=null, metadata=''}, color.smd.abs-1=OffsetAndMetadata{offset=765028, leaderEpoch=null, metadata=''}, color.smd.abs-2=OffsetAndMetadata{offset=763931, leaderEpoch=null, metadata=''}, color.smd.abs-0=OffsetAndMetadata{offset=763980, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:346)

Any idea what else can be done to try and get this data into ES. I set the batch to 100 but still doesn’t seem to work. Could it be something in my ES config file?
Does tasks.max effect anything in the connector?