Exactly once - S3 Sink - documentation questions

Hi

I have some questions regards the “Exactly once” part S3 Sink Kafka Connect documentation.

I understood the following from the text:

  • The only condition for exactly-once is a deterministic partitioner

  • Default and field partitioners are always deterministic

  • Time-based partitioner is only deterministic if used with deterministic time extractor (time taken from Kafka Record or from record field, not wall-clock) and deterministic rotation strategy (rotation based on timestamp extracted by the deterministic time extractor mentioned earlier).

However, some details are not mentioned and some contradicted by the diagram:

  1. The diagram seem to contradict the text.
    According to the text, the field partitioner is always deterministic, which means always ensures exactly-once
    The diagram shows that field partitioner doesn’t provide exactly-once guarantee if scheduled rotation is used.
    IMHO the diagram is correct.
    Does it mean the text is inaccurate?
    Or maybe I misunderstood something?

  2. Can flush.size configuration compromise exactly-once guarantee in any way?

  3. If I want to partition the data by timestamp extracted from a record field, then in order to maintained exactly-once guarantee I must use rotation strategy that is based on that field.
    Scenario:

    • rotate.interval.ms is 1 hour
    • first record of the file has timestamp field with value of 12:00
    • record that has timestamp field 13:01 is consumed from Kafka and it causes the file to rotate.
    • record with timestamp field 12:55 is read from Kafka.
      What will happen to the record? Will it be discarded?
  4. The diagram doesn’t present exactly-once option for handling late data.
    Does it mean that exactly-once approach can’t handle late data at all?
    Are there any solutions?

  5. Assuming the data is partitioned by field or by timestamp and scheduled rotation strategy can’t be used because exactly-once guarantee is required (timestamp rotation strategy is used).

    • rotate.interval.ms is 1 hour
    • first record of the file has timestamp (from record or field) with value of 12:00
    • few more records are consumed, with timestamp between 12:00 and 12:30.
    • no more records for that field partition or for that time range arrive
      Does it mean that the data will never become available at S3 (because there is no rotation/flush trigger)?
  1. scheduled rotation is based on time accured on the host , in case of restart , no way to be deterministic → no-exactly-once

  2. f you never change flush.size than its deterministic , if it change during consumption of a topic then yes , no more exactly-once

  3. from doc →

The S3 connector allows one writer at a time for this parameter. When the S3 connector encounters data that belongs to a previously closed output partition, it closes the current partition and creates a new partition in S3. If there is no late data arriving, the S3 connector closes the current partition when it reads a record whose extracted timestamp is greater than the specified amount of time.

1 Like

Thanks a lot, Raphael
Can you kindly point me to the documentation that you have mentioned?

https://docs.confluent.io/kafka-connect-s3-sink/current/configuration_options.html#:~:text=If%20there%20is%20no%20late,the%20specified%20amount%20of%20time.&text=The%20time%20interval%20in%20milliseconds%20to%20periodically%20invoke%20file%20commits.&text=It%20rolls%20up%20files%20based%20on%20the%20time%20interval%20specified.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.