Message loss/missing from same topic read with different consumer group

Hey
I have a system where in I have two kafka connect sink working on same topic of Kafka.
I have S3 Connect Sink and Elastic Sink both are configured to read data from the same topic and both have different consumer group assigned.
As per my knowledge, both should have the same data read into. But what we are observing is the data read to Elasticsink is far too less than what it is persisted to S3 sink.
Upon a simple check I could find that while S3 contains 100% of data which is being targeted to the topic, Elastic has only 10% of data.
What could be the possible issue ?? And how can I debug around them .
The setup has

Kafka 2.5.0  
Confluent S3 version :- 5.5.1
Confluent Elastic Version :- 5.5.1

Appreciate any help or pointers.

Hi there! Can you kindly post the configurations you’re using for the sink connector?

1 Like

Below are the configurations used, topics are the only thing which I have changed others are same. For below config we are able to get 100% data for topic2 but only 0-10% data for topic1.

topics: "topic1,topic2"
key.ignore: "true"
schema.ignore: "true"
timezone: "UTC"
connection.url: "https://elastic_search_url:9200"
offset.flush.timeout.ms: "180000"
session.timeout.ms: "600000"
connection.username: elastic
elastic.security.protocol: SSL
elastic.https.ssl.keystore.type: JKS
elastic.https.ssl.truststore.type: JKS
type.name: "_doc"
value.converter.schemas.enable: "false"
key.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
behavior.on.malformed.documents: "warn"
transforms: "routeTS"
transforms.routeTS.type: "org.apache.kafka.connect.transforms.TimestampRouter"
transforms.routeTS.topic.format: "${topic}-${timestamp}"
transforms.routeTS.timestamp.format: "YYYYMMdd"

Linking to Github issue for future readers - Data Missing With Elasticsearch Kafka Connector · Issue #632 · confluentinc/kafka-connect-elasticsearch · GitHub

Overall, I find there is not enough information here. For example:

  • are the Elastic documents overwritten?
  • Do your _id fields in Elastic show sequential offset values for the records?
  • Does your data span multiple days? If so, are you counting data from all the ES indicies?

Please find my reply below

  • are the Elastic documents overwritten ?
    I am not sure what do you mean by documents being overwritten or not. To state plainly I am using rolling index strategy and every day a new index is created for each topic.
  • Do your _id fields in Elastic show sequential offset values for the records?
    Yes the documents which get indexed have the _id fields giving the offset and partition details.
  • Does your data span multiple days? If so, are you counting data from all the ES indicies?
    Yes my data is spanning multiple days. And Yes I am counting data from all ES indices of a given topic.
    To add I have two topics in place one topic being topic1 and other being topic2 . Topic2 all the data are being accounted for in Elasticsearch but only few data/records can be found for topic 2 in Elasticsearch.
    Whereas in parallel I have s3 confluent connector which reads from the same topic but is configured with different consumer group accounts for all the data for both topic.

If you write Kafka records with the same key, then that would be used by Elasticsearch to insert or update the same document. S3 sink, on the other hand will append every single record into files. In other words, Elasticsearch connector acts similarly to a compacted Kafka topic.

This would be apparent if there are any gaps in the offset details from _id field