Message loss/missing from same topic read with different consumer group

Hey
I have a system where in I have two kafka connect sink working on same topic of Kafka.
I have S3 Connect Sink and Elastic Sink both are configured to read data from the same topic and both have different consumer group assigned.
As per my knowledge, both should have the same data read into. But what we are observing is the data read to Elasticsink is far too less than what it is persisted to S3 sink.
Upon a simple check I could find that while S3 contains 100% of data which is being targeted to the topic, Elastic has only 10% of data.
What could be the possible issue ?? And how can I debug around them .
The setup has

Kafka 2.5.0  
Confluent S3 version :- 5.5.1
Confluent Elastic Version :- 5.5.1

Appreciate any help or pointers.

Hi there! Can you kindly post the configurations you’re using for the sink connector?

1 Like

Below are the configurations used, topics are the only thing which I have changed others are same. For below config we are able to get 100% data for topic2 but only 0-10% data for topic1.

topics: "topic1,topic2"
key.ignore: "true"
schema.ignore: "true"
timezone: "UTC"
connection.url: "https://elastic_search_url:9200"
offset.flush.timeout.ms: "180000"
session.timeout.ms: "600000"
connection.username: elastic
elastic.security.protocol: SSL
elastic.https.ssl.keystore.type: JKS
elastic.https.ssl.truststore.type: JKS
type.name: "_doc"
value.converter.schemas.enable: "false"
key.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
behavior.on.malformed.documents: "warn"
transforms: "routeTS"
transforms.routeTS.type: "org.apache.kafka.connect.transforms.TimestampRouter"
transforms.routeTS.topic.format: "${topic}-${timestamp}"
transforms.routeTS.timestamp.format: "YYYYMMdd"

Linking to Github issue for future readers - Data Missing With Elasticsearch Kafka Connector · Issue #632 · confluentinc/kafka-connect-elasticsearch · GitHub

Overall, I find there is not enough information here. For example:

  • are the Elastic documents overwritten?
  • Do your _id fields in Elastic show sequential offset values for the records?
  • Does your data span multiple days? If so, are you counting data from all the ES indicies?

Please find my reply below

  • are the Elastic documents overwritten ?
    I am not sure what do you mean by documents being overwritten or not. To state plainly I am using rolling index strategy and every day a new index is created for each topic.
  • Do your _id fields in Elastic show sequential offset values for the records?
    Yes the documents which get indexed have the _id fields giving the offset and partition details.
  • Does your data span multiple days? If so, are you counting data from all the ES indicies?
    Yes my data is spanning multiple days. And Yes I am counting data from all ES indices of a given topic.
    To add I have two topics in place one topic being topic1 and other being topic2 . Topic2 all the data are being accounted for in Elasticsearch but only few data/records can be found for topic 2 in Elasticsearch.
    Whereas in parallel I have s3 confluent connector which reads from the same topic but is configured with different consumer group accounts for all the data for both topic.

If you write Kafka records with the same key, then that would be used by Elasticsearch to insert or update the same document. S3 sink, on the other hand will append every single record into files. In other words, Elasticsearch connector acts similarly to a compacted Kafka topic.

This would be apparent if there are any gaps in the offset details from _id field

Ok … so my docs are being inserted with _id being a combination of offset, partition something like this … topic2-20220530+4+463775. So to answer your question no we are not overwriting.

What about offsets 1 through 463774? Are there any missing numbers?

The _id which I provided is for the topic which has it’s 100% data in elasticsearch, in this case topic2. So for that topic all the data with offset 1 through 463774 are present.
For the other topic which current offset is 2469189, I don’t have all offset in elasticsearch. The data is missing for that offset. So if I debug the status of the given topic for given consumer group I can see the offset being updated and increasing but I don’t see any data. Unable to draw any conclusion from this.

What are the settings of the other topic? Is that topic compacted, for example? If so, it’ll potentially miss offsets when consumed.

Was the Elastic connector started at the same time of the S3 sink? If not, you could lose data due to either retention or compaction.

Also, Connect defaults to auto.offset.reset=latest, I believe, so if you’d started the connector without overriding this, it would only have read data written after the connector started.

The setting of both topic are completely same.
Yes the s3-sink and elastic-connector are started at same time. So we can rule out that.
I tried providing consumer.override.auto.offset.reset: “earliest” in the config but that didn’t helped. Is there any way to validate or understand that the connector has read the data from topic and processed it ??
May be that can be a starting exercise .

If you’ve added the override after already starting the connector, that wouldn’t do anything. You’d have to set a new name, or use kafka-consumer-groups to reset the offsets.

You can describe both connector consumer groups to see what offsets it has committed, but only the logs or querying the actual data will tell you what has been written successfully. For example, if you setup error tolerance with a dead letter queue, or to ignore errors, then the sink consumer offsets will still get committed, but the data will not exist in the sink.

Changing the following settings helped me resolve the issue, Not sure what the reason behind that … but data has started to flow to Elasticsearch now.

key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"

Still wondering why or what is the reason behind that ??

Glad to hear it.

Strings don’t have schemas, only JSON events do, so you may remove schemas.enable property

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.