I’ve got a consumer whose topology is defined using the Processor API. I want to read a new topic into a global store, to query that store from an existing processor. The topic is produced by another Kafka Streams application and contains data aggregated using a sliding window, so its key is of type Windowed<String>.
Following the Javadoc of Topoly#addGlobalStore, I created a processor to update this global store that implements Processor<Windowed<String>, ThingsPerUser, Void, Void> and gave it a reference to the global store as WindowStore<String, ThingsPerUser> store. It updates the store this way:
@Override
public void process(Record<Windowed<String>, ThingsPerUser> thingsPerUserRecord) {
store.put(
thingsPerUserRecord.key().key(),
thingsPerUserRecord.value(),
thingsPerUserRecord.key().window().start());
}
Now, I’m confused by this sentence in the Javadoc of WindowStore<K,V>
Note, that the stores’ physical key type is
Windowed<K>.
Does this mean I should use a WindowStore<String, ThingsPerUser> as above instead of WindowStore<Windowed<String>, ThingsPerUser>?
The global store is created and added to the topology with:
Topology topology = new Topology();
var windowSize = Duration.ofHours(24L);
StoreBuilder<WindowStore<Windowed<String>, ThingsPerUser>> globalStoreBuilder =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(
"things-per-user-store", windowSize.multipliedBy(2), windowSize, false),
WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize.toMillis()),
thingsPerUserSerde)
.withLoggingDisabled();
ProcessorSupplier<Windowed<String>, ThingsPerUser, Void, Void> processorSupplier =
() -> new MyUpdaterProcessor("things-per-user-store");
topology.addGlobalStore(
globalStoreBuilder,
"global-store-source",
WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize.toMillis()).deserializer(),
thingsPerUserSerde.deserializer(),
"things-per-user-topic",
"global-store-updater",
processorSupplier);
This compiles fine, but I get an execption when I try to use this global store in another processor. When calling WindowStore<String, ThingsPerUser> store = context.getStateStore("things-per-user-store"); in the processor’s init method, I get:
java.lang.ClassCastException: class org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to class org.apache.kafka.streams.state.WindowStore (org.apache.kafka.streams.state.internals.MeteredKeyValueStore and org.apache.kafka.streams.state.WindowStore are in unnamed module of loader 'app')
Could you please help me figure out what’s wrong here?
I wanted to use a WindowStorein this other processor in order to read from it with store.backwardFetch(key, now.minus(24L, ChronoUnit.HOURS), now). Is this the correct way to do it?