Object value not updating in the State Store

I am trying to update an event that is already stored in the state store. However, for some reason, the update doesn’t take effect. The state store has the old value even though the value has been updated. Is there an impact of using a new instance of KeyValue object even though the key and the object instance didn’t change.

Thanks

To update anything in a state store, you need to call store.put(...). What did you try?

Thanks. Yes, that is exactly what I used. I did use the version that requires the timestamp.

kvstore.put(key, value, context.timestamp())

I am using a WindowStore. The aim is to update the value in the store as related events are received.

Note that for WindowStore, the effective key is a combination of the passed in key as well as the passed in timestamp. Thus, as you use context.timestamp() I assume that the timestamp for the second put() is different and thus instead of updating <(k,ts1),v1> you put a second record <(k,ts2), v2> into the state store.

Thanks. This is helpful. I tried to use the API Method that doesn’t require a timestamp, but that one was deprecated. kvstore.put(key,value).

I am not seeing multiple output events getting created by the transform processor. Only the first one is kept. Shall I change it to a flatTransform to produce all the output events.

Yes, it’s deprecated because is also used context.timestamp() to set the timestamp, what results in issues like this. – Thus, I am wondering: if you don’t want to set a timestamp, why are you using a windowed-store but not a a key-value store?

Great question. The reason we are not using any KeyValueStore is that events are time bounded. The timestamp is required. I think probably the solution would be to transform every event and then do a group by (pick the latest event).

I am trying to update an event that is already stored in the state store. However, for some reason, the update doesn’t take effect.

Going back to the original question: how do you actual read the row? After the second put, you should have two rows, so you can just use the one with larger timestamp? Or you could first read the old value via store.fetch() and delete it with put() with null value before you put() the new value with new timestamp.

Still trying to understand what the exact issue is?

1 Like

Thanks. The delete using put with a null value is a great idea since the WindowStore interface doesn’t expose a Delete method similar to the KeyValueStore. I will give a try.

The goal is to merge the input events. One or more inputs need to be transformed into output events. As events come, they are merged with past events. The transformation is the reason we are keeping previously computed values in the state store. I am using the Event Deduplication example (EventDeduplicationLambdaIntegrationTest) from the official Kafka Streams Examples. I am using the fetch method to lookup previous events from the store. I am using put to replace the value, but with the timestamp being required, a new entry ends being added to the store.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.