I have a processor with at state store with approx. 1 million records. We now wish to store the data on another format and we therefore need to migrate the records from the old store to the new store.
Are there any best practices to migrate data between state stores?
class GuardProcessor(
private val deprecatedStoreName: String,
private val storeName: String,
private val scheduleIntervalMillis: Long,
private val storeRetentionMillis: Long
) :
ContextualProcessor<String, MutableList<MyAvroObject>, String, MutableList<MyAvroObject>>() {
private lateinit var deprecatedStore: KeyValueStore<String, MutableList<MyAvroObject>>
private lateinit var store: KeyValueStore<String, MutableList<Record<String, MyAvroObject>>>
override fun init(context: ProcessorContext<String, MutableList<MyAvroObject>>) {
super.init(context)
deprecatedStore = context().getStateStore(deprecatedStoreName)
store = context().getStateStore(storeName)
migrateStore()
context().schedule(Duration.ofMillis(scheduleIntervalMillis), PunctuationType.WALL_CLOCK_TIME) {
schedule()
}
}
private fun migrateStore() {
if (!store.all().use { it.hasNext() }) {
App.log.info("Will migrate ${deprecatedStore.approximateNumEntries()} records from $deprecatedStoreName to $storeName")
// [store] is empty and has therefore not been initialized with entries from [deprecatedStore]
deprecatedStore.all().use { iterator ->
iterator.forEachRemaining { keyValue ->
val records = keyValue.value
.map { myAvroObject-> Record(myAvroObject.name, myAvroObject, Instant.now().toEpochMilli()) }
.toMutableList()
store.put(keyValue.key, records)
context().commit()
deprecatedStore.delete(keyValue.key)
}
}
App.log.info("The $storeName store now contains ${store.approximateNumEntries()} records")
}
}
}
without any luck. I am experiencing that the producer transaction times out during the migration.
Would it be an idea for Kafka to implement a migrationMethod that takes in two state stores and a method that converts the data format?
I think you’re taking a good approach to the migration; the problem is the amount of state you have in the store. Off the top of my head, I’d suggest only doing a portion of the store with each punctuation call. Maybe keep track of the time elapsed and the last key you did not migrate, then at the next punctuation call, extract a range from the store starting with the key you left off with from the previous punctuation.
Hmmm, I was hoping it would be possible to do the whole process in one punctuation run to ensure that the whole state was migrated before the application is running in “normal operation” and processing records. But I guess not?
So to sum up: I have to migrate the state stores in several punctuation runs and ensure that my application is not processing any records between the punctuation calls due to state inconsistency? So during the migration process, the application’s output topic will experience downtime?
I am convinced that a Kafka native migrateStateStore() method would be extremely useful for Kafka Streams application developers. Is it possible to submit a KIP?
Not sure to what extend we can extract a general pattern and API. I have also not seen any question about it in the past, so not sure how common it is to justify adding built-in support. It’s always worth to file a JIRA and see if others comment on it, that they would also want this.
Hmmm, I was hoping it would be possible to do the whole process in one punctuation run to ensure that the whole state was migrated before the application is running in “normal operation” and processing records.
Why is this important?
What I would recommend (we did a similar thing when migrating from plain kv-stores to timestamped-kv-stores) is to do a “lazy / on-demand” migration. Ie, you don’t use a punctuation but migrate record-by-record inside process() each time you access a row.
On init() you can still check if the old store has data, and disable the “dual access” if the old store is empty. Kafka Streams also does the same.