Restore default timeExtractor before sink

Halu frens,

I’m doing some windowed aggregations with Kafka Streams. To use event time as a timestamp used in the windowing I have set the TimestampExtractor on the source:

streamBuilder.stream("myTopic", Consumed.with(keySerde, valueSerde)
                .withTimestampExtractor(myTimestampExtractor))

now after all calculation and before sink I want to restore the default TimestampExtractor which is ingesting time called LogAppendTime because I want Kafka to save output sink with the ingesting timestamp but windowing want to do on the event time.
How to do that?

Or maybe there is a possibility to set the timestampExtractor only for windowing?

myStream.groupByKey()
                .windowedBy(TimeWindows.of(duration))
                .aggregate(
                        () -> null,
                        new MyAggregator(duration.toString()),
                        Materialized.with(keySerde, valueSerde));

14e20a19e38b32d6c3928f97e4cc9ec3

There is no TimestampExtractor concept on the write path, only on the read path.

LogAppendTime is a broker/topic config. You can configure it for the output topic broker side. For this case, the broker would overwrite the timestamp provided by the producer (aka Kafka Streams) by it’s current wall-clock time when it receives the data.

If you want to change the timestamp within KafkaStreams, you could use a transform() and call context.forward(..., To.all().withTimestamp(...)) before calling to().

Thank you Sir, and is there any other solution then setting TimestampExtractor if I want to use the event time (from the record’s value not from record’s timestamp) in windowed aggregations?

Want something like this to run before windowed aggregations:

public class HeadersTransformer implements Transformer<String, MyEvent, KeyValue<String, MyEvent>> {

...
    @Override
    public KeyValue<String, MyEvent> transform(String k, MyEvent v) {
        context.headers()
                .remove(KafkaHeaders.LAST_TIMESTAMP)
                .add(KafkaHeaders.LAST_TIMESTAMP, context.timestamp().toString().getBytes());

        context.setTimestamp(v.getEventTime()); // this is the syntax I miss

        return KeyValue.pair(k, v);
    }
...

What you put down works basically, but you need to write it differently:

transform(...) {
  // there is no context.setTimestamp(...)
  // instead use
  context.forward(..., To.all().withTimestamp(...));

  // use a "dummy" return to not forward more records (cf. JavaDocs)
  return null; // `null` is valid and means no additional output
}

In Kafka Streams, each record has a timestamp associated that is used by any temporal DSL operation (e.g., windowed aggregation or stream-stream joins, or others). By default the record timestamp is set to the message timestamp, but you can also set it to a different value using a custom TimestampExtractor. Similarly, you can set a new record timestamp at any point in your topology via context.forward(..., To.all().withTimestamp(...)) as shown above.

1 Like