Hey guys, I am trying to process tombstones to my KTable from the stream.
I am using a KTable<Key, Value>, which is being grouped by key, then aggregated into KTable<Key, List< Value>>.
On the aggregation step I would like to remove the associated key if a tombstone is received.
For example, this is my test with TestTopologyDriver:
input.pipeInput(1, value1);
input.pipeInput(2, value2);
input.pipeInput(3, value3);
input.pipeInput(1, null);
and here comes the interesting part…
my topology logic
-
final KTable<Key, List<Value>> valuesById = valuesStream -
.groupByKey() -
.aggregate( -
ArrayList::new, -
(k, v, agg) -> { -
if(value==null) { -
agg.removeIf(el -> el.id()==key); -
} -
agg.add(value); -
return agg; - },
-
Materialized.<Key, List<Value>, KeyValueStore<Bytes, byte[]>>as("values") -
.withKeySerde(Serdes.Integer()) -
.withValueSerde(new JsonSerde<>(new TypeReference<List<Value>>() {}))* -
);
So this is the way how I would like manually to handle tombstones and affect them during aggregation and materialization.
But what happenes?
Kafka streams has its own default KStreamAggregateProcessor which says:
WARN org.apache.kafka.streams.kstream.internals.KStreamAggregate – Skipping record due to null key or value.
And this is a problem, actually, because in my scenario I should have 2 values at the end, because tombstone should invoke a deletion of the first record with key “1”.
Has anyone dealed with that?
I tried to declare my own Aggregator, but its not processing any data when trying to debug…
