Question about Kafka stream left join

Hi team, I encountered issue when I used JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) for left join

Should this method keep all the records in the left stream for 24 hours to find matching records in the right stream?

I noticed many records were emitted only a few hours after they were consumed into the left stream:

My issue is very similar to this one: [KAFKA-10847] Avoid spurious left/outer join results in stream-stream join - ASF JIRA

Here is my code:

public KStream<String, KafkaJobEvent> kStream(
        @Value("${input-topic}") final String inputTopic,
        final StreamsBuilder builder) {

    final KStream<String, KafkaJobEvent> sourceStream = builder.stream(inputTopic,
                    Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())
                            .withTimestampExtractor(new DefaultTimestampExtractor(logReader)))
           .selectKey((k, v) -> v.getID); // re-key the stream 


    final Map<String, KStream<String, KafkaJobEvent>> branches = sourceStream.split(Named.as(SUB_STREAM))
            .branch((key, value) -> value.getTarget().contains("A"), Branched.as("A"))
            .branch((key, value) -> value.getTarget().contains("B"), Branched.as("B"))
            .defaultBranch();


    final KStream<String, KafkaJobEvent> streamA = branches.get(SUB_STREAM + "A");
    final KStream<String, KafkaJobEvent> streamB = branches.get(SUB_STREAM +"B");


    final Duration windowDuration = Duration.ofHours(24);

    return streamA
            .leftJoin(
                    streamB,
                    new ValueJoiner<KafkaJobEvent, KafkaJobEvent, KafkaJobEventJoinedValue>() {
                        @Override
                        public KafkaJobEventJoinedValue apply(final KafkaJobEvent left, final KafkaJobEvent right) {
                           
                           return KafkaJobEventJoinedValue.builder()
                                    .left(left)
                                    .right(right)
                                    .build();
                        }
                    },
                    JoinWindows.ofTimeDifferenceWithNoGrace(windowDuration),
                    StreamJoined.with(
                            Serdes.String(), // Key serde
                            kafkaJobEventSerde, // Left value serde
                            kafkaJobEventSerde  // Right value serde
                    ))
            .filter((key, value) -> value.getRight() == null)
            .mapValues(KafkaJobEventJoinedValue::getLeft)
            .process(() -> new ContextualProcessor<String, KafkaJobEvent, String, KafkaJobEvent>() {
                @Override
                public void process(final Record<String, KafkaJobEvent> record) {
                    try {
                        log.info("the timestamp:{} for record key:{}, record value:{}",
                                record.timestamp(), record.key(), record.value());
                       // do something ...
                    } catch (final Exception e) {
                        log.error("Exception found while handling message for record value:{}", record.value(), e);
                    }

                }
            });



}

}