Problem with missing null propagation on inner KTable-KTable join

Hello,
We’ve encountered a problem (if this is actually a problem in my scenario) with missing null propagation on KTable-KTable inner join.
I wrote a test on a simplified topology where this problem occurs.

Short description of the topology:

  1. It creates a KTable with numbers (table 1)
  2. It uses mapValues on table 1 that returns given number if it is even or returns null if it’s odd (table 2)
  3. It joins KTable with all numbers (from table 1) to KTable with even numbers (from table 2).

When I remove a record from table (by putting null value in the input topic), it doesn’t propagate to the table after join. I know that topology doesn’t make sense but I wonder if it could cause problems in some more real world scenarios.

TopologyProvider
public class TopologyProvider {

    public static final String NUMBERS_TOPIC = "numbers-topic";
    public static final String NUMBERS_STORE_NAME = "numbers";
    public static final String EVEN_NUMBERS_STORE_NAME = "even-numbers";
    public static final String EVEN_NUMBERS_AFTER_JOIN_STORE_NAME = "even-numbers-after-join";

    public Topology getTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Integer> numbersTable = builder.stream(NUMBERS_TOPIC, Consumed.with(Serdes.String(), Serdes.Integer()))
                .toTable(getMaterializationConfigForNumbers());
        KTable<String, Integer> evenNumbersTable = numbersTable
                .mapValues((key, value) -> getEvenNumberOrNull(value), getMaterializationConfigForEvenNumbers());
        evenNumbersTable.join(numbersTable, (a, b) -> a, getMaterializationConfigForEvenNumbersAfterJoin());
        return builder.build();
    }

    private Integer getEvenNumberOrNull(Integer value) {
        return value % 2 == 0 ? value : null;
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForNumbers() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(NUMBERS_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbers() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbersAfterJoin() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_AFTER_JOIN_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }
}
TopologyProviderTest
public class TopologyProviderTest {

    private TopologyTestDriver topologyTestDriver;
    private TestInputTopic<String, Integer> inputTopic;

    @BeforeEach
    public void setup() {
        topologyTestDriver = prepareDriver(new TopologyProvider().getTopology());
        inputTopic = topologyTestDriver.createInputTopic(TopologyProvider.NUMBERS_TOPIC, new StringSerializer(), new IntegerSerializer());
    }

    @AfterEach
    public void close() {
        topologyTestDriver.close();
    }

    private TopologyTestDriver prepareDriver(Topology topology) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_id");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        return new TopologyTestDriver(topology, props);
    }

    @Test
    public void test() {
        inputTopic.pipeInput(new TestRecord<>("key1", 1));
        inputTopic.pipeInput(new TestRecord<>("key2", 2));
        inputTopic.pipeInput(new TestRecord<>("key3", 3));
        inputTopic.pipeInput(new TestRecord<>("key4", 4));
        inputTopic.pipeInput(new TestRecord<>("key20", 20));

        inputTopic.pipeInput(new TestRecord<>("key20", 21));

        // This null should propagate to KTable after join
        inputTopic.pipeInput(new TestRecord<>("key2", null));


        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.NUMBERS_STORE_NAME));
        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_STORE_NAME));

        // It correctly removed this record because it was changed to an odd number.
        Assertions.assertNull(getIntegerValueFromStore("key20", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));

        // FAIL: It still contains value 2 when both sides of join contain null on record with the key "2"
        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));
    }

    private Integer getIntegerValueFromStore(String key, String storeName) {
        return (Integer) topologyTestDriver.getKeyValueStore(storeName).get(key);
    }
}

It occurs on 3.4.0 version. I didn’t change any Streams configs that are not visible in the code above. Thank you in advance for answers.

When your mapValues() return null, it basically forwards a “delete / tombstone”, and thus effectively implement a filter.

For dropped input rows, there is obviously no join result, because one input side has no row. Thus, when you later delete the original input, it’s not necessary to propagate the delete through the full topology, because it’s already known that there is no result, and thus there is nothing that needs to be deleted.

Does this make sense?