Question about Kafka stream left join

Hi team, I encountered issue when I used JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) for left join

Should this method keep all the records in the left stream for 24 hours to find matching records in the right stream?

I noticed many records were emitted only a few hours after they were consumed into the left stream:

My issue is very similar to this one: [KAFKA-10847] Avoid spurious left/outer join results in stream-stream join - ASF JIRA

Here is my code:

public KStream<String, KafkaJobEvent> kStream(
        @Value("${input-topic}") final String inputTopic,
        final StreamsBuilder builder) {

    final KStream<String, KafkaJobEvent> sourceStream =,
                    Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())
                            .withTimestampExtractor(new DefaultTimestampExtractor(logReader)))
           .selectKey((k, v) -> v.getID); // re-key the stream 

    final Map<String, KStream<String, KafkaJobEvent>> branches = sourceStream.split(
            .branch((key, value) -> value.getTarget().contains("A"),"A"))
            .branch((key, value) -> value.getTarget().contains("B"),"B"))

    final KStream<String, KafkaJobEvent> streamA = branches.get(SUB_STREAM + "A");
    final KStream<String, KafkaJobEvent> streamB = branches.get(SUB_STREAM +"B");

    final Duration windowDuration = Duration.ofHours(24);

    return streamA
                    new ValueJoiner<KafkaJobEvent, KafkaJobEvent, KafkaJobEventJoinedValue>() {
                        public KafkaJobEventJoinedValue apply(final KafkaJobEvent left, final KafkaJobEvent right) {
                           return KafkaJobEventJoinedValue.builder()
                            Serdes.String(), // Key serde
                            kafkaJobEventSerde, // Left value serde
                            kafkaJobEventSerde  // Right value serde
            .filter((key, value) -> value.getRight() == null)
            .process(() -> new ContextualProcessor<String, KafkaJobEvent, String, KafkaJobEvent>() {
                public void process(final Record<String, KafkaJobEvent> record) {
                    try {
              "the timestamp:{} for record key:{}, record value:{}",
                                record.timestamp(), record.key(), record.value());
                       // do something ...
                    } catch (final Exception e) {
                        log.error("Exception found while handling message for record value:{}", record.value(), e);