Migrate processor state stores

I have a processor with at state store with approx. 1 million records. We now wish to store the data on another format and we therefore need to migrate the records from the old store to the new store.

Are there any best practices to migrate data between state stores?

I have tried to migrate with the following method

class GuardProcessor(
    private val deprecatedStoreName: String,
    private val storeName: String,
    private val scheduleIntervalMillis: Long,
    private val storeRetentionMillis: Long
) :
    ContextualProcessor<String, MutableList<MyAvroObject>, String, MutableList<MyAvroObject>>() {
    private lateinit var deprecatedStore: KeyValueStore<String, MutableList<MyAvroObject>>
    private lateinit var store: KeyValueStore<String, MutableList<Record<String, MyAvroObject>>>

    override fun init(context: ProcessorContext<String, MutableList<MyAvroObject>>) {
        super.init(context)

        deprecatedStore = context().getStateStore(deprecatedStoreName)
        store = context().getStateStore(storeName)
        migrateStore()

        context().schedule(Duration.ofMillis(scheduleIntervalMillis), PunctuationType.WALL_CLOCK_TIME) {
            schedule()
        }
    }

    private fun migrateStore() {
        if (!store.all().use { it.hasNext() }) {
            App.log.info("Will migrate ${deprecatedStore.approximateNumEntries()} records from $deprecatedStoreName to $storeName")
            // [store] is empty and has therefore not been initialized with entries from [deprecatedStore]
            deprecatedStore.all().use { iterator ->
                iterator.forEachRemaining { keyValue ->
                    val records = keyValue.value
                        .map { myAvroObject-> Record(myAvroObject.name, myAvroObject, Instant.now().toEpochMilli()) }
                        .toMutableList()

                    store.put(keyValue.key, records)
                    context().commit()
                    deprecatedStore.delete(keyValue.key)
                }
            }
            App.log.info("The $storeName store now contains ${store.approximateNumEntries()} records")
        }
    }
}

without any luck. I am experiencing that the producer transaction times out during the migration.

Would it be an idea for Kafka to implement a migrationMethod that takes in two state stores and a method that converts the data format?

Hi @hermanjakobsen

I think you’re taking a good approach to the migration; the problem is the amount of state you have in the store. Off the top of my head, I’d suggest only doing a portion of the store with each punctuation call. Maybe keep track of the time elapsed and the last key you did not migrate, then at the next punctuation call, extract a range from the store starting with the key you left off with from the previous punctuation.

HTH,
Bill

1 Like

Hmmm, I was hoping it would be possible to do the whole process in one punctuation run to ensure that the whole state was migrated before the application is running in “normal operation” and processing records. But I guess not?

So to sum up: I have to migrate the state stores in several punctuation runs and ensure that my application is not processing any records between the punctuation calls due to state inconsistency? So during the migration process, the application’s output topic will experience downtime?

I am convinced that a Kafka native migrateStateStore() method would be extremely useful for Kafka Streams application developers. Is it possible to submit a KIP?

@mjsax, do you have any thoughts on adding state store migration capabilities to Kafka Streams?

Not sure to what extend we can extract a general pattern and API. I have also not seen any question about it in the past, so not sure how common it is to justify adding built-in support. It’s always worth to file a JIRA and see if others comment on it, that they would also want this.

Hmmm, I was hoping it would be possible to do the whole process in one punctuation run to ensure that the whole state was migrated before the application is running in “normal operation” and processing records.

Why is this important?

What I would recommend (we did a similar thing when migrating from plain kv-stores to timestamped-kv-stores) is to do a “lazy / on-demand” migration. Ie, you don’t use a punctuation but migrate record-by-record inside process() each time you access a row.

Ie, for a get() you first look into the new store, and if not found, do a second get() into the old store. For a delete() you do if for both stores. For a put(), you do a put() into the new store and a delete() into the old store. (We did the same for two Column Families inside a single RocksDB. Cf https://github.com/apache/kafka/blob/16fc8e1cfff6f0ac29209704a079b0ddcbd0625e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L106)

On init() you can still check if the old store has data, and disable the “dual access” if the old store is empty. Kafka Streams also does the same.

1 Like

The “lazy/on-demand” migration logic is what I was looking for! Than you for your help :slight_smile:

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.