Hi team, I encountered issue when I used JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) for left join
Should this method keep all the records in the left stream for 24 hours to find matching records in the right stream?
I noticed many records were emitted only a few hours after they were consumed into the left stream:
My issue is very similar to this one: [KAFKA-10847] Avoid spurious left/outer join results in stream-stream join - ASF JIRA
Here is my code:
public KStream<String, KafkaJobEvent> kStream(
@Value("${input-topic}") final String inputTopic,
final StreamsBuilder builder) {
final KStream<String, KafkaJobEvent> sourceStream = builder.stream(inputTopic,
Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())
.withTimestampExtractor(new DefaultTimestampExtractor(logReader)))
.selectKey((k, v) -> v.getID); // re-key the stream
final Map<String, KStream<String, KafkaJobEvent>> branches = sourceStream.split(Named.as(SUB_STREAM))
.branch((key, value) -> value.getTarget().contains("A"), Branched.as("A"))
.branch((key, value) -> value.getTarget().contains("B"), Branched.as("B"))
.defaultBranch();
final KStream<String, KafkaJobEvent> streamA = branches.get(SUB_STREAM + "A");
final KStream<String, KafkaJobEvent> streamB = branches.get(SUB_STREAM +"B");
final Duration windowDuration = Duration.ofHours(24);
return streamA
.leftJoin(
streamB,
new ValueJoiner<KafkaJobEvent, KafkaJobEvent, KafkaJobEventJoinedValue>() {
@Override
public KafkaJobEventJoinedValue apply(final KafkaJobEvent left, final KafkaJobEvent right) {
return KafkaJobEventJoinedValue.builder()
.left(left)
.right(right)
.build();
}
},
JoinWindows.ofTimeDifferenceWithNoGrace(windowDuration),
StreamJoined.with(
Serdes.String(), // Key serde
kafkaJobEventSerde, // Left value serde
kafkaJobEventSerde // Right value serde
))
.filter((key, value) -> value.getRight() == null)
.mapValues(KafkaJobEventJoinedValue::getLeft)
.process(() -> new ContextualProcessor<String, KafkaJobEvent, String, KafkaJobEvent>() {
@Override
public void process(final Record<String, KafkaJobEvent> record) {
try {
log.info("the timestamp:{} for record key:{}, record value:{}",
record.timestamp(), record.key(), record.value());
// do something ...
} catch (final Exception e) {
log.error("Exception found while handling message for record value:{}", record.value(), e);
}
}
});
}
}