Handle different retention periods for a state-store depending on the message payload

Hi,

I have the following use-case and want to know if there is a better best-practice solution to solve this as my solution “smells”.

Use-Case:
→ I have a topic where I get updates of some business object streamed where the key is the unique key of that business object.
→ If a new update comes in I need to compare against the current known state to check if something specific happened and if yes I need to emit a record into another topic
→ After I emitted that record I need to remember that object for at least 7 days as this can happen again in this time range
→ For objects where this didn’t happen I can delete them after 30 days
→ For objects with some specific payload I’m not allowed to delete them at all
Incoming Event Stream is about 30GB per days (so we don’t want to save everything forever ^^ and the estimate is that about >90% of the objects need only to be remembered for ~7 days)

We ended up in the following approach:

  1. We aggregate the incoming messages and persist the information we need to compare against in aggregate itself which is stored in a norma KeyValue Store
KStream<String,BusinessObject> businessEventStream =
		streamsBuilder.stream(inputTopic)
		.filter((k,v) -> v != null) // Filter our tombstones as aggregate initializes a new aggregate if the value is null 
		.aggregate(new Initializer<BusinessObject>() {
					   @Override
					   public BusinessObject apply() {
						   return new BusinessObject();
					   }
				   },
				myAggregator,
				Materialized.<String, BusinessObject, KeyValueStore<Bytes, byte[]>>as(myStore))
		.toStream();


After that we process that stream with .flatMapValue

aggrStream.flatMapValues(myMapper)
		.to(businesObjectEvents);

where we detect if we need to emit a new business event or not (therefore “flatmap” instead of “map” as this also can end up with no record as output)

With that the business functionality is finished BUT we have an state store that has infinite growth as it never will delete something.
Windowed Stored somehow does not work for use due the different time windows we need.

So we ended up with the approach discussed at: Kafka Stream Usage Patterns - Apache Kafka - Apache Software Foundation

So we added

streamsBuilder.addStateStore(purgeTimeStateStore);

aggrStream
		.transform(mypurgetransformer, purgeTimeStateStore.STATE_STORE_NAME)
		.to(inputTopic);
  • This state-store holds they key of an object and the long value of the epoch when it cans deleted.
  • On every update incoming from aggrStream this timestamp is calculated to be in + 7 days in + 30 days or never (null)
  • And in the transformer we use the “schedule” method as described in the link above to iterate through that statestore all “n” hours to check if something needs to be deleted.
  • If something needs to be deleted we emit a “special” record to the incoming topic (no tombstone as aggregate cannot handle it as we would need)
  • the aggregator understands this special record and emits a tombstone so the record is deleted from the changelog.

Basically “it works” but it doesn’t feel right …

Is there any easier approach to handle this? (Without implementing my own state-store)

One way I thought about was using 3 stores - one for reach retention period and move the objects between the stores as needed - but that also seems to be error prune and not too straight forward. So like routing the objects to the correct store and emit tombstones to the other stores.

My team does alot of what you describe. We have many use cases where we have specific state retention requirements . We try to use one single state store and each object within that store holds transformed state data for the use case and metadata such as retention types, flags and timestamps that support separate things such as when to remove etc. All that is used to identify candidates for purging during a punctuator callback and also support the main use case of joining, lookups and enrichment.