Hey guys, I am trying to process tombstones to my KTable from the stream.
I am using a KTable<Key, Value>, which is being grouped by key, then aggregated into KTable<Key, List< Value>>.
On the aggregation step I would like to remove the associated key if a tombstone is received.
For example, this is my test with TestTopologyDriver:
input.pipeInput(1, value1);
input.pipeInput(2, value2);
input.pipeInput(3, value3);
input.pipeInput(1, null);
and here comes the interesting part…
my topology logic
-
final KTable<Key, List<Value>> valuesById = valuesStream
-
.groupByKey()
-
.aggregate(
-
ArrayList::new,
-
(k, v, agg) -> {
-
if(value==null) {
-
agg.removeIf(el -> el.id()==key);
-
}
-
agg.add(value);
-
return agg;
- },
-
Materialized.<Key, List<Value>, KeyValueStore<Bytes, byte[]>>as("values")
-
.withKeySerde(Serdes.Integer())
-
.withValueSerde(new JsonSerde<>(new TypeReference<List<Value>>() {}))*
-
);
So this is the way how I would like manually to handle tombstones and affect them during aggregation and materialization.
But what happenes?
Kafka streams has its own default KStreamAggregateProcessor which says:
WARN org.apache.kafka.streams.kstream.internals.KStreamAggregate – Skipping record due to null key or value.
And this is a problem, actually, because in my scenario I should have 2 values at the end, because tombstone should invoke a deletion of the first record with key “1”.
Has anyone dealed with that?
I tried to declare my own Aggregator, but its not processing any data when trying to debug…