I want records that are older than the retention time to be deleted in the statestore of an application. Currently I have a schedule function that iterates through the whole statestore and deletes all records that are older than 7 days. The problem with this seem to be that the iteration takes too long (too many records probably) and times out.
The warning I get related to this. The stacktrace points exactly to the block of the iteration:
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic **** for task 1_0 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer with transactionalId **** and ProducerIdAndEpoch(producerId=893136, epoch=14) attempted to produce with an old epoch
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
Also I use the exactly-once if that is relevant.
A solution could be to delete records older than 1 day and maybe the iteration would not be too long.
Is there a way to do this deletion of old records without timing out and keeping them for 7 days?