Hello,
We’ve encountered a problem (if this is actually a problem in my scenario) with missing null propagation on KTable-KTable inner join.
I wrote a test on a simplified topology where this problem occurs.
Short description of the topology:
- It creates a KTable with numbers (table 1)
- It uses mapValues on table 1 that returns given number if it is even or returns null if it’s odd (table 2)
- It joins KTable with all numbers (from table 1) to KTable with even numbers (from table 2).
When I remove a record from table (by putting null value in the input topic), it doesn’t propagate to the table after join. I know that topology doesn’t make sense but I wonder if it could cause problems in some more real world scenarios.
TopologyProvider
public class TopologyProvider {
public static final String NUMBERS_TOPIC = "numbers-topic";
public static final String NUMBERS_STORE_NAME = "numbers";
public static final String EVEN_NUMBERS_STORE_NAME = "even-numbers";
public static final String EVEN_NUMBERS_AFTER_JOIN_STORE_NAME = "even-numbers-after-join";
public Topology getTopology() {
StreamsBuilder builder = new StreamsBuilder();
KTable<String, Integer> numbersTable = builder.stream(NUMBERS_TOPIC, Consumed.with(Serdes.String(), Serdes.Integer()))
.toTable(getMaterializationConfigForNumbers());
KTable<String, Integer> evenNumbersTable = numbersTable
.mapValues((key, value) -> getEvenNumberOrNull(value), getMaterializationConfigForEvenNumbers());
evenNumbersTable.join(numbersTable, (a, b) -> a, getMaterializationConfigForEvenNumbersAfterJoin());
return builder.build();
}
private Integer getEvenNumberOrNull(Integer value) {
return value % 2 == 0 ? value : null;
}
private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForNumbers() {
return Materialized
.<String, Integer, KeyValueStore<Bytes, byte[]>>as(NUMBERS_STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer());
}
private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbers() {
return Materialized
.<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer());
}
private Materialized<String, Integer, KeyValueStore<Bytes, byte[]>> getMaterializationConfigForEvenNumbersAfterJoin() {
return Materialized
.<String, Integer, KeyValueStore<Bytes, byte[]>>as(EVEN_NUMBERS_AFTER_JOIN_STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Integer());
}
}
TopologyProviderTest
public class TopologyProviderTest {
private TopologyTestDriver topologyTestDriver;
private TestInputTopic<String, Integer> inputTopic;
@BeforeEach
public void setup() {
topologyTestDriver = prepareDriver(new TopologyProvider().getTopology());
inputTopic = topologyTestDriver.createInputTopic(TopologyProvider.NUMBERS_TOPIC, new StringSerializer(), new IntegerSerializer());
}
@AfterEach
public void close() {
topologyTestDriver.close();
}
private TopologyTestDriver prepareDriver(Topology topology) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
return new TopologyTestDriver(topology, props);
}
@Test
public void test() {
inputTopic.pipeInput(new TestRecord<>("key1", 1));
inputTopic.pipeInput(new TestRecord<>("key2", 2));
inputTopic.pipeInput(new TestRecord<>("key3", 3));
inputTopic.pipeInput(new TestRecord<>("key4", 4));
inputTopic.pipeInput(new TestRecord<>("key20", 20));
inputTopic.pipeInput(new TestRecord<>("key20", 21));
// This null should propagate to KTable after join
inputTopic.pipeInput(new TestRecord<>("key2", null));
Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.NUMBERS_STORE_NAME));
Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_STORE_NAME));
// It correctly removed this record because it was changed to an odd number.
Assertions.assertNull(getIntegerValueFromStore("key20", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));
// FAIL: It still contains value 2 when both sides of join contain null on record with the key "2"
Assertions.assertNull(getIntegerValueFromStore("key2", TopologyProvider.EVEN_NUMBERS_AFTER_JOIN_STORE_NAME));
}
private Integer getIntegerValueFromStore(String key, String storeName) {
return (Integer) topologyTestDriver.getKeyValueStore(storeName).get(key);
}
}
It occurs on 3.4.0 version. I didn’t change any Streams configs that are not visible in the code above. Thank you in advance for answers.