Hello,
I’m running kafka-streams on OpenShift having state-store for interactive queries.
Everything is working fine when running one instance of the application.
When we spawn another instance (i.e., increasing pod from 1 to 2) - we started observing strange behaviour.
When we query the rest endpoint with an id; we’re not getting the out put as if the value is not available in the state-store.
But when we query the same id again; we’re able to get the result.
Upon verifying the pod logs -
the pod-2 (new instance) is showing the “id not available” (#1 stated above)
the pod-1 - always able to fetch the results
It looks like the newly spawned pod(pod-2) is not able to query the state-store OR able to query the state-store but not getting the results.
Does kafka-streams (ktable) supports multiple-instance deployment for state-store?
If not, what’s the solution to run in multi-instance for state-store?
Here’s the code that creates the state-store :
final StreamsBuilder builder = new StreamsBuilder();
builder.table(inputTopicName, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryable_state_store"));
// Querying the store with an {id}
private KafkaStreams StoreStream;
ReadOnlyKeyValueStore<String, String> store = StoreStream.store(StoreQueryParameters.fromNameAndType("queryable_state_store", QueryableStoreTypes.keyValueStore()));
String service = store.get(id);
Another thing I observed is,
even though I"m using topology.optimization = all ; I still see that change-log topic is getting created. But the document on KTable says that :
" …
An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig ."
Could anyone help me to understand these two issues?
Thanks
State stores in Kafka Streams are always shareded/partitioned based on the number of input topic partitions. If you have a single instance, it will host all shards, however, for a multi-instance deployment, different instances will host different shards. Using “interactive queries” you can only query the local shards, but there is no built-in support to query remote shards on other instances.
Kafka Streams provides only the corresponding metadata via KafkaStreams#queryMetadataForKey(), KafkaStreams#allMetadataForStore(), or KafkaStreams#allMetadata(), that allows you to figure out if a key is on the local instance or a remote instance. For the remote instance case, you need to implement you own RPC to reroute the query to the corresponding instance.
Thanks for the reply @mjsax
May be I’m not clear in explaining my scenario.
I’m running the second instance(like launching the same code again) on the same machine where first instance is running.
When I query the rest endpoint :
Second instance will not return any data
2.Where as first instance only returning the data.
About topology optimization: you need to pass the config to streamsBuilder#build(Properties) to enable the optimization.
I’m running the second instance(like launching the same code again) on the same machine where first instance is running.
Not sure if i can follow. You mentioned that you add a new POD? But in any case, KafkaStreams instances are totally isolated from each other, even if they would run in the same JVM… So you need to use the correct KafkaStreams client instance.
Second instance will not return any data
Where as first instance only returning the data.
If I understand this correct, you issue the same query (ie, for the same key) to both instances and only one instance returns the data? That is expected, as only one instance will host the corresponding shard. If you want all be able to issue a query agains any instance, you would need to add code such that one instance forwards a query transparently to another instance that hold the data.
Given your code snippet, you don’t pass the configs into StreamsBuilder#build() but only into KafkaStreams, what is not sufficient.
final Topology streamTopology = builder.build(); // <---- not sufficient
final Topology streamTopology = builder.build(props); // correct
Optimizations are applied while the Topology is built, and if the props are not passed into build() we assume that optimization is disabled. KafkaStreams only executes whatever Topology it gets – it does not rewrite it. Thus StreamsBuilder is responsible to apply optimization while building the Topology in the first place.
Instead of writing code, is there any other way to implement this? May be Global state-store?
If you use a global state store, each instance will read all partitions, and will have a full copy of your state. Of course, it comes with the corresponding increase client side storage costs. – Also note, if you use a global-KTable in a join, it will have different semantics (maybe your program does not use a join though so it might not matter).
Thank you very much for the suggestion. It’s working now; I do not see the change-log topic anymore.
Yes; I’m aware of the storage costs comes with it.
Just an observation - considering the use-cases where we can just scale another instance (same app.id) of the application - it would be better if kafka-streams library supports the auto-discovery mechanism built-in instead of client to implement RPC to discover the running instances for state-store related scenarios.
I guess Kafka Streams could include something, but the philosophy so far was to keep it simple and to stay un-opinionated if possible. Some people might want to REST, others gRCP, others something else… But who knows. If the demand is large enough, we might add it. (If not in Apache Kafka, maybe in Confluent.)