Reading windowed topic into a global store

I’ve got a consumer whose topology is defined using the Processor API. I want to read a new topic into a global store, to query that store from an existing processor. The topic is produced by another Kafka Streams application and contains data aggregated using a sliding window, so its key is of type Windowed<String>.

Following the Javadoc of Topoly#addGlobalStore, I created a processor to update this global store that implements Processor<Windowed<String>, ThingsPerUser, Void, Void> and gave it a reference to the global store as WindowStore<String, ThingsPerUser> store. It updates the store this way:

@Override
public void process(Record<Windowed<String>, ThingsPerUser> thingsPerUserRecord) {
    store.put(
        thingsPerUserRecord.key().key(),
        thingsPerUserRecord.value(),
        thingsPerUserRecord.key().window().start());
}

Now, I’m confused by this sentence in the Javadoc of WindowStore<K,V>

Note, that the stores’ physical key type is Windowed<K>.

Does this mean I should use a WindowStore<String, ThingsPerUser> as above instead of WindowStore<Windowed<String>, ThingsPerUser>?

The global store is created and added to the topology with:

    Topology topology = new Topology();
    var windowSize = Duration.ofHours(24L);
    StoreBuilder<WindowStore<Windowed<String>, ThingsPerUser>> globalStoreBuilder =
        Stores.windowStoreBuilder(
                Stores.persistentWindowStore(
                    "things-per-user-store", windowSize.multipliedBy(2), windowSize, false),
                WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize.toMillis()),
                thingsPerUserSerde)
            .withLoggingDisabled();

    ProcessorSupplier<Windowed<String>, ThingsPerUser, Void, Void> processorSupplier =
        () -> new MyUpdaterProcessor("things-per-user-store");
    topology.addGlobalStore(
        globalStoreBuilder,
        "global-store-source",
        WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize.toMillis()).deserializer(),
        thingsPerUserSerde.deserializer(),
        "things-per-user-topic",
        "global-store-updater",
        processorSupplier);

This compiles fine, but I get an execption when I try to use this global store in another processor. When calling WindowStore<String, ThingsPerUser> store = context.getStateStore("things-per-user-store"); in the processor’s init method, I get:

java.lang.ClassCastException: class org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to class org.apache.kafka.streams.state.WindowStore (org.apache.kafka.streams.state.internals.MeteredKeyValueStore and org.apache.kafka.streams.state.WindowStore are in unnamed module of loader 'app')

Could you please help me figure out what’s wrong here?

I wanted to use a WindowStorein this other processor in order to read from it with store.backwardFetch(key, now.minus(24L, ChronoUnit.HOURS), now). Is this the correct way to do it?

What version of Kafka Streams are you using?

Can you share the full stracktrace of the class cast exception?

This is using Kafka Streams version 3.9.1 with the following dependencies:

  • org.apache.kafka:kafka-clients:jar:3.9.1
  • org.apache.kafka:kafka-streams-test-utils:jar:3.9.1
  • org.apache.kafka:kafka-streams:jar:3.9.1

There isn’t much more in the stack trace, unfortunately:

java.lang.ClassCastException: class org.apache.kafka.streams.state.internals.MeteredKeyValueStore cannot be cast to class org.apache.kafka.streams.state.WindowStore (org.apache.kafka.streams.state.internals.MeteredKeyValueStore and org.apache.kafka.streams.state.WindowStore are in unnamed module of loader 'app')

	at package.removed.EventProcessor.init(EventProcessor.java:78)
	at package.removed.EventProcessorTest.initialiseProcessor(EventProcessorTest.java:100)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)

Oh wait, I’ve got a @BeforeEach method calling EventProcessor#init on a different instance of this class with the wrong store type in the context and that’s causing this weird exception. Please ignore this error.

However my other questions still stand:

Glad you figure it out.

Does this mean I should use a WindowStore<String, ThingsPerUser> as above instead of WindowStore<Windowed<String>, ThingsPerUser>?

Yes, using WindowStore<String, ThingsPerUser>is the right thing to do, as you only need to specify the underlying key type.

Thanks for clearing that up @mjsax ! :smile:

For some reason, I thought the topic’s key type had to match the store’s key type when calling Topology#addGlobalStore, but I know have

StoreBuilder<WindowStore<String, ThingsPerUser>> globalStoreBuilder;
ProcessorSupplier<Windowed<String>, ThingsPerUser, Void, Void> processorSupplier;

and no type conflict.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.