Delete records exceeding retention time

I want records that are older than the retention time to be deleted in the statestore of an application. Currently I have a schedule function that iterates through the whole statestore and deletes all records that are older than 7 days. The problem with this seem to be that the iteration takes too long (too many records probably) and times out.

The warning I get related to this. The stacktrace points exactly to the block of the iteration:

org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic **** for task 1_0 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer with transactionalId **** and ProducerIdAndEpoch(producerId=893136, epoch=14) attempted to produce with an old epoch
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.

Also I use the exactly-once if that is relevant.

A solution could be to delete records older than 1 day and maybe the iteration would not be too long.

Is there a way to do this deletion of old records without timing out and keeping them for 7 days?

older than the retention time

What retention time are you talking about? KeyValue-stores don’t have a retention time (nor a TTL); WindowedStores have a retention time though (note, a WindowedStore is really just a key-value store, too – but it stores <(key,timestamp),value>); not sure if a WindowedStore would help (the API is different compared to a KeyValue-store).

If you are using punctuation, you should measure how much time you spent, and exit the punctuation early enough – for the particular timeout you hit, it seems to the be producers’ transaction.timeout.ms – so you could also increase it somewhat if you want to.

Sorry, retention time is a self-made variable. So these records have a dateTime. If the dateTime is older than 7 days we delete it.

1 Like