Problem with missing null propagation on inner KTable-KTable join

Hello,
We’ve encountered a problem (if this is actually a problem in my scenario) with missing null propagation on KTable-KTable inner join.
I wrote a test on a simplified topology where this problem occurs.

Short description of the topology:

  1. It creates a KTable with numbers (table 1)
  2. It uses mapValues on table 1 that returns given number if it is even or returns null if it’s odd (table 2)
  3. It joins KTable with all numbers (from table 1) to KTable with even numbers (from table 2).

When I remove a record from table (by putting null value in the input topic), it doesn’t propagate to the table after join. I know that topology doesn’t make sense but I wonder if it could cause problems in some more real world scenarios.

TopologyProvider
public class TopologyProvider {

    public static final String NUMBERS_TOPIC = "numbers-topic";
    public static final String NUMBERS_STORE_NAME = "numbers";
    public static final String EVEN_NUMBERS_STORE_NAME = "even-numbers";
    public static final String EVEN_NUMBERS_AFTER_JOIN_STORE_NAME = "even-numbers-after-join";

    public Topology getTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        KTable<String, Integer> numbersTable = builder.stream(NUMBERS_TOPIC, Consumed.with(Serdes.String(), Serdes.Integer()))
                .toTable(getMaterializationConfigForNumbers());
        KTable<String, Integer> evenNumbersTable = numbersTable
                .mapValues((key, value) -> getEvenNumberOrNull(value), getMaterializationConfigForEvenNumbers());
        evenNumbersTable.join(numbersTable, (a, b) -> a, getMaterializationConfigForEvenNumbersAfterJoin());
        return builder.build();
    }

    private Integer getEvenNumberOrNull(Integer value) {
        return value % 2 == 0 ? value : null;
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForNumbers() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(NUMBERS_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbers() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }

    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbersAfterJoin() {
        return Materialized
                .<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_AFTER_JOIN_STORE_NAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.Integer());
    }
}
TopologyProviderTest
public class TopologyProviderTest {

    private TopologyTestDriver topologyTestDriver;
    private TestInputTopic<String, Integer> inputTopic;

    @BeforeEach
    public void setup() {
        topologyTestDriver = prepareDriver(new TopologyProvider().getTopology());
        inputTopic = topologyTestDriver.createInputTopic(TopologyProvider.NUMBERS_TOPIC, new StringSerializer(), new IntegerSerializer());
    }

    @AfterEach
    public void close() {
        topologyTestDriver.close();
    }

    private TopologyTestDriver prepareDriver(Topology topology) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_id");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        return new TopologyTestDriver(topology, props);
    }

    @Test
    public void test() {
        inputTopic.pipeInput(new TestRecord<>("key1", 1));
        inputTopic.pipeInput(new TestRecord<>("key2", 2));
        inputTopic.pipeInput(new TestRecord<>("key3", 3));
        inputTopic.pipeInput(new TestRecord<>("key4", 4));
        inputTopic.pipeInput(new TestRecord<>("key20", 20));

        inputTopic.pipeInput(new TestRecord<>("key20", 21));

        // This null should propagate to KTable after join
        inputTopic.pipeInput(new TestRecord<>("key2", null));


        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.NUMBERS_STORE_NAME));
        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_STORE_NAME));

        // It correctly removed this record because it was changed to an odd number.
        Assertions.assertNull(getIntegerValueFromStore("key20", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));

        // FAIL: It still contains value 2 when both sides of join contain null on record with the key "2"
        Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));
    }

    private Integer getIntegerValueFromStore(String key, String storeName) {
        return (Integer) topologyTestDriver.getKeyValueStore(storeName).get(key);
    }
}

It occurs on 3.4.0 version. I didn’t change any Streams configs that are not visible in the code above. Thank you in advance for answers.

When your mapValues() return null, it basically forwards a “delete / tombstone”, and thus effectively implement a filter.

For dropped input rows, there is obviously no join result, because one input side has no row. Thus, when you later delete the original input, it’s not necessary to propagate the delete through the full topology, because it’s already known that there is no result, and thus there is nothing that needs to be deleted.

Does this make sense?

1 Like

I am able to reproduce it. To show the problem I add the left join after original join - it just takes the number from the second input topic and add it to joined result. Below the table with expected and real output

in1 in2 map join left-join out (real) out (expected)
(key2:2) (key2:2) (key2:2) (key2:2) (key2:2) (key2:2)
(key2:null) (key2:null) (key2:2) (key2:2) (key2:2) (key2:null)
(key2:10) (key2:12) (key2:12) (key2:null)

Oh, I got confused because your code does builder.stream(NUMBERS_TOPIC, Consumed.with(Serdes.String(), Serdes.Integer()) instead of builder.table(...), and I missed the toTable() statement. I thought you are using a KStream#mapValues() while you actually use KTable#mapValues(). Sorry; my bad.

Looking into the details, the problem is the self-join as it seems, and how KS topology is wire up. If I get it right, you get something like this

topic -> [numberStore] -> mapValues -> [evenStore] -> join( ... )
           ^       |                                       ^   |
           |       |                                       |   |
           |       +-----------------(B)-------------------+   |
           |                                                   |
           +-------------------(A)-----------------------------+

So what happens is, that you first insert a new key you get

numberStore = (k,2)

record flows through mapValues and is inserted into the second store

evenStore = (k,2)

and now the join is executed using (k,2) (from evenStore) doing a lookup (Step A) into numberStore, and joins. Note, that there should be another step: the original input record (k,2) and not also used a “right input” to the join, and does a lookup (Step B) into numberStore, again finds a row for k, and re-emit the same join result a second time. – In your test, you don’t see this, because you query the result store/KTable, but the result store should have been updates twice (if you add toStream().to(...) to the result table, and change your test to read all records written into the result topic, you should see two records, not just one).

Next you push the (k,null), this first deletes from numberStore, next deletes from evenStore, and finally, we do a lookup into numberStore for the join (note that numberStore is already empty).

Thus, the join logic says: the left table does not contain a row for the key k, thus, we did not compute a result for k perviously, and hence, we do not need to delete any result record, because no result record can exist. – Similar to the first record, the join in reverse order also executes now, again, not finding any row, and not emitting any “delete” for the result table.

The join is implemente this way, to avoid sending unncessary deletes downstream. If you have two input topic, topic-A and topic-B that you join, and you first do an insert of (k,1) into topic A, the table for topic-B is empty, and no join result is computed. If you next do a delete (k,null) on topic A, we know (because no row for keyk exists in B), that the result table does not contain a row for k either, and thus, no delete is issues downstream.

So it’s really an issue with the self-join I believe… Not sure if there is a workaround from top of my head. Maybe self-join is just not something that works, and we would need a proper implementation for it.

You could of course duplicate the input topic, and read from two topics, but that is of course rather expensive.

Hmm…
So, If I have one input topic, split it to two, then process them separately and join before output it will not work?

Something like this

        + -> [some process 1] -> +
        |                        |
- IN -> +                      [join] -> OUT -
        |                        |
        + -->[some process 2] -->+ 

It’s weird, isn’t it?

This might work, if you materialize both tables but the “split” must happen before the materialization, to ensure that only one table is updated before the join is executed.

In your current topology however, you first materialize the table, and you “split” the data-flow afterward, what implies that both tables are updated before the join happens, introducing the issue.

So, it depends on content of [some process 1/2]. Or I didn’t understand what you mean " split must happen before".

My sources are slightly different from sources from initial message (I use the pure “Spring” notation. For the last example they are:

@Configuration
@RequiredArgsConstructor
@SuppressWarnings("static-method")
public class NumbersTopology {

    private final NewTopic numbersSrcTopic;

    // Input
    @Bean
    KTable<String, Integer> numbers(StreamsBuilder epsStreamBuilder) {
        return epsStreamBuilder.table(numbersSrcTopic.name(), cons("src-numbers"));
    }

    // First processor
    @Bean
    KTable<String, Integer> processOne(KTable<String, Integer> numbers) {
        return numbers.mapValues((key, value) -> value);
    }

    // Second processor
    @Bean
    KTable<String, Integer> processTwo(KTable<String, Integer> numbers) {
        return numbers.mapValues((key, value) -> getEvenNumberOrNull(value));
    }

    // Join output first and second processors
    @Bean
    KTable<String, Integer> join(KTable<String, Integer> processOne, KTable<String, Integer> processTwo) {
        return processOne.join(processTwo, (a, b) -> a, mat("join-numbers"));
    }

    @SuppressWarnings("boxing")
    private Integer getEvenNumberOrNull(Integer value) {
        if (value == null || 0 != value % 2) {
            return null;
        }
        return value;
    }

    @SuppressWarnings("resource")
    private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> mat(String name) {
        return Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(name)
            .withKeySerde(Serdes.String())
            .withValueSerde(Serdes.Integer());
    }

    @SuppressWarnings("resource")
    private Consumed<String, Integer> cons(String name) {
        return Consumed.with(Serdes.String(), Serdes.Integer()).withName(name);
    }

}

and the sink

@Component
@RequiredArgsConstructor
public class NumberService {

    private final NewTopic numbersOutTopic;

    // Sinks the join result to output topic
    @Autowired
    public void process(KTable<String, Integer> join) {
        join.toStream(Named.as("proc-stream-out-numbers"))
            .to(numbersOutTopic.name(), Produced.as("proc_to_out_numbers"));
    }

}

So, it depends on content of [some process 1/2]. Or I didn’t understand what you mean " split must happen before".

When you write a DSL program, it is internally translated into a data flow program, ie, a graph of processor node which are connected via edges. If you do a self-join, this data flow program will need to “branch/split” at some point, and it depends a little bit how you write the DSL program how the shape of this data flow program will turn out, and at which point this “branch/split” happens.

Not sure if Spring might limit you in any way how the “wiring” is done…

For plain Kafka Streams:

KTable t1 = builder.table(...)
KTable mappedTable = t1.mapValues()

mappedTable.join(t1):

would would become (simplified… a join is actually 5 processors…)

topic -> t1 (with store) -> mapValues() -> mappedTable (with Store) -> join()
                 ^                                                       |
                 +-------------------------------------------------------+

Different example:

KTable t1 = builder.table(...)
KTable mappedTable = t1.mapValues()

t1.join(mappedTable)

This would become

topic -> t1 (with store) -> join() 
          |                  |
      mapValues()            |
          |                  v
          +-> mappedTable (with Store) 

So you can see it’s two quite different data flow graphs…

A good way to inspect what wiring is don, is to use Topology#describe(). There is even a tool that visualize the ascii output: https://zz85.github.io/kafka-streams-viz/

HTH.