KTable aggregation with tombstones

Hey guys, I am trying to process tombstones to my KTable from the stream.
I am using a KTable<Key, Value>, which is being grouped by key, then aggregated into KTable<Key, List< Value>>.
On the aggregation step I would like to remove the associated key if a tombstone is received.
For example, this is my test with TestTopologyDriver:

input.pipeInput(1, value1);
input.pipeInput(2, value2);
input.pipeInput(3, value3);
input.pipeInput(1, null);

and here comes the interesting part…
my topology logic

  • final KTable<Key, List<Value>> valuesById = valuesStream
    
  •            .groupByKey()
    
  •            .aggregate(
    
  •                    ArrayList::new, 
    
  •                   (k, v, agg) -> { 
    
  •                      if(value==null) {
    
  •                          agg.removeIf(el -> el.id()==key);
    
  •    }
    
  •    agg.add(value);
    
  •    return agg;
    
  • },
  •                    Materialized.<Key, List<Value>, KeyValueStore<Bytes, byte[]>>as("values")
    
  •                            .withKeySerde(Serdes.Integer())
    
  •                            .withValueSerde(new JsonSerde<>(new TypeReference<List<Value>>() {}))*
    
  •                    );
    

So this is the way how I would like manually to handle tombstones and affect them during aggregation and materialization.

But what happenes?
Kafka streams has its own default KStreamAggregateProcessor which says:

WARN org.apache.kafka.streams.kstream.internals.KStreamAggregate – Skipping record due to null key or value.

And this is a problem, actually, because in my scenario I should have 2 values at the end, because tombstone should invoke a deletion of the first record with key “1”.
Has anyone dealed with that?
I tried to declare my own Aggregator, but its not processing any data when trying to debug…

The real idea behing this question is that I want to do real time processing and to use the KTable for lookup and enriching.
So currently if a tombstone comes, I shouldn’t send the key which “has to be deleted” at the end of the topology.
Even when I’m passing my own aggregate class, it goes over KStreamAggregate from Kafka lib and skipping the tombstone.
Anyone faced a similar issue?

Well, you input is a KStream, and thus <key,null> is not a tombstone… Only KTables have tombstones.

You would need to use your own/custom surrogate “tombstone” value:

valuesStream.mapValues(v -> v != null ? v : MyTombstone).
     .groupByKey(...)

And test for “MyTombstone” inside your aggregation function.

I see, but this approach doesn’t seems clean to me as I need to create unnecessary objects in my Java app.
Otherwise, I already implemented this with custom Processor and Transformer(deprecated)

  • Processor will aggregate and store the values in two local stores
  • Transformer will reach the stores and build the enriched object

Well, this is how it seems:

public Topology topology(){
        registerStores();
        final KStream<Long,  Set<Value>> aggregatedValues = oddKStream
                .process(new CustomProcessorSupplier(streamsBuilder), "groupedValuesByProperty", "valuesById");

        secondStream
                .filter(nonNullValue)
                .transform(CustomValueTransformer::new, "groupedValuesByProperty")
                .filter(nonNullValue);

        return streamsBuilder.build();
   

Here what the processor is doing actually as code:

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        var value = record.value(); var key = record.key().getId();
        List<Value> groupedValues = Optional.ofNullable(value)
                .map(o -> aggregatedStore.get(o.getRelationId()))
                .orElse(new ArrayList<>());
        long relationId;
        if (value==null) {
            relationId = valuesStore.get(key).getRelationId();
            valuesStore.delete(key);
            groupedValuesFromStore = aggregatedStore.get(relationId);
            groupedValuesFromStore.removeIf(el -> el.getId() == key);
        } else {
            relationId = value.getRelationId();
            valuesStore.put(key, value);
            groupedValuesFromStore.add(value);
        }
        groupedValues.put(relationId, groupedValuesFromStore);
        stopWatch.stop();
        logTimer(stopWatch,"processKStreamTimerFunction");

Well this is my implementation:
Grouping by the relation property from the value and storing them in a second store as a List and key → relationId.
This is my tombstone handling and it works. I am not pretty sure how this can be scaled, but from what I see, I started 5 different instances against 10 partitions and they process same data.

p.s. my code is masked due to bussiness security, but I think you will get the idea.

Cheers, Martin.

Well, this will be “unclean” in any case, because you give input events different semantics. In the end, a KStream models individual events and if there are two events with the same key, it’s still two independent events.

In your original example, you aggregate events with the same key into a list. Let’s assume you have the following input:

<k,1>, <k,2>, <k,3>

and get a result list of

<k,[1,2,3]>

Next, <k,null> comes in and you say it’s a delete. But a delete for which input record?You got three inputs, and <k,null> does not indicate which one it should delete? You code is agg.removeIf(el -> el.id()==key); which “randomly” picks the first one… (Your second example is much more precise about deleting stuff, and makes sense.)

In the end, <k,null> is semantically not a delete in a KStream, because it’s unclear which of the previous events with the same key it should delete… (For a KTable there is no such issue, as <k,1>, <k,2>, <k,3> would already by updated to the same key k and thus <k,2> already updates <k,1> and thus the tombstone clearly deleted <k,3> (both <k,1> and <k,2> got already replaces and don’t exist any more).

It’s a longer history why we tread <k,null> as “corrupted” for a KStream aggregation… I agree that there are cases for which it’s annoying, but end-to-end it’s still the best solution right now, even if it forces you to put in a custom “delete” if you want to process such a message.

Of course, you can always use a custom Processor, too. That’s why we support it :slight_smile: – And given you complex logic to find the right value to delete, it makes clear why the DSL aggregate cannot provide this functionality to begin with.

Bottom line: you approach to use a custom Processor seems to be the correct one to me. The DSL is just not setup for your use case, and the build-in aggregation is not what you business logic does. In the end, you custom Processor does not implement an aggregation but something different (even it similar to an aggregation).

Thanks for the answer, @mjsax !
Yes, I agree with all the things you mentioned.
Just to clarify:
First I am storing the stream values into a <k,v> store:
“valuesStore”
So, whatever is coming as null with this key it will be deleted from this store.
Then I am “aggregating” this values by their “relationId” property which is the same and will lead to:
<k, [1,2,3]>”, but in the test I see when I send the tombstone, <k,1> is being deleted with my custom logic and then it produces “<k, [2,3]>” which is what I exactly want.
I have deployed the following example in development environment and I am currently monitoring it.
The only thing that I am concerned is that .transform() is deprecated and I have logic which uses the “aggregated” key-value store later on to build the output.

1 Like

Well, you can just switch to the new KStream#process() instead which replaces transform(). It provides the same functionality, just with a better (type safe) API, and unified the zoo of transformXXX() methods.

1 Like

sounds amazing, will review it tomorrow, Matthias!
Thanks.

Hey @mjsax , I just tried to replace the deprecated method of .transform() with the process() which you suggested.
it looks like this:

@Override
    public void process(Record<CompositeKey, Value> record) {
       ///some logic here...
context.forward(record.withKey(record.value().getId()).withValue(new TransformedObject(value, rawOdds)));
    }

It is really interesting that now the application started dying of OOM constantly. I am checking the heap dump and it looks fine ~50mb.
Do you have any idea why the process() is way heavier than transform?

No idea. Both should be basically the same under the hood. :thinking: