Hey
I have a system where in I have two kafka connect sink working on same topic of Kafka.
I have S3 Connect Sink and Elastic Sink both are configured to read data from the same topic and both have different consumer group assigned.
As per my knowledge, both should have the same data read into. But what we are observing is the data read to Elasticsink is far too less than what it is persisted to S3 sink.
Upon a simple check I could find that while S3 contains 100% of data which is being targeted to the topic, Elastic has only 10% of data.
What could be the possible issue ?? And how can I debug around them .
The setup has
Kafka 2.5.0
Confluent S3 version :- 5.5.1
Confluent Elastic Version :- 5.5.1
Below are the configurations used, topics are the only thing which I have changed others are same. For below config we are able to get 100% data for topic2 but only 0-10% data for topic1.
are the Elastic documents overwritten ?
I am not sure what do you mean by documents being overwritten or not. To state plainly I am using rolling index strategy and every day a new index is created for each topic.
Do your _id fields in Elastic show sequential offset values for the records?
Yes the documents which get indexed have the _id fields giving the offset and partition details.
Does your data span multiple days? If so, are you counting data from all the ES indicies?
Yes my data is spanning multiple days. And Yes I am counting data from all ES indices of a given topic.
To add I have two topics in place one topic being topic1 and other being topic2 . Topic2 all the data are being accounted for in Elasticsearch but only few data/records can be found for topic 2 in Elasticsearch.
Whereas in parallel I have s3 confluent connector which reads from the same topic but is configured with different consumer group accounts for all the data for both topic.
If you write Kafka records with the same key, then that would be used by Elasticsearch to insert or update the same document. S3 sink, on the other hand will append every single record into files. In other words, Elasticsearch connector acts similarly to a compacted Kafka topic.
This would be apparent if there are any gaps in the offset details from _id field
Ok … so my docs are being inserted with _id being a combination of offset, partition something like this … topic2-20220530+4+463775. So to answer your question no we are not overwriting.
The _id which I provided is for the topic which has it’s 100% data in elasticsearch, in this case topic2. So for that topic all the data with offset 1 through 463774 are present.
For the other topic which current offset is 2469189, I don’t have all offset in elasticsearch. The data is missing for that offset. So if I debug the status of the given topic for given consumer group I can see the offset being updated and increasing but I don’t see any data. Unable to draw any conclusion from this.
What are the settings of the other topic? Is that topic compacted, for example? If so, it’ll potentially miss offsets when consumed.
Was the Elastic connector started at the same time of the S3 sink? If not, you could lose data due to either retention or compaction.
Also, Connect defaults to auto.offset.reset=latest, I believe, so if you’d started the connector without overriding this, it would only have read data written after the connector started.
The setting of both topic are completely same.
Yes the s3-sink and elastic-connector are started at same time. So we can rule out that.
I tried providing consumer.override.auto.offset.reset: “earliest” in the config but that didn’t helped. Is there any way to validate or understand that the connector has read the data from topic and processed it ??
May be that can be a starting exercise .
If you’ve added the override after already starting the connector, that wouldn’t do anything. You’d have to set a new name, or use kafka-consumer-groups to reset the offsets.
You can describe both connector consumer groups to see what offsets it has committed, but only the logs or querying the actual data will tell you what has been written successfully. For example, if you setup error tolerance with a dead letter queue, or to ignore errors, then the sink consumer offsets will still get committed, but the data will not exist in the sink.
Changing the following settings helped me resolve the issue, Not sure what the reason behind that … but data has started to flow to Elasticsearch now.