Multi-instance kafka streams using state-store

Hello,
I’m running kafka-streams on OpenShift having state-store for interactive queries.
Everything is working fine when running one instance of the application.
When we spawn another instance (i.e., increasing pod from 1 to 2) - we started observing strange behaviour.

  1. When we query the rest endpoint with an id; we’re not getting the out put as if the value is not available in the state-store.

  2. But when we query the same id again; we’re able to get the result.

  3. Upon verifying the pod logs -
    the pod-2 (new instance) is showing the “id not available” (#1 stated above)
    the pod-1 - always able to fetch the results

  4. It looks like the newly spawned pod(pod-2) is not able to query the state-store OR able to query the state-store but not getting the results.

  5. Does kafka-streams (ktable) supports multiple-instance deployment for state-store?
    If not, what’s the solution to run in multi-instance for state-store?


Here’s the code that creates the state-store :

        final StreamsBuilder builder = new StreamsBuilder();
        builder.table(inputTopicName, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryable_state_store"));

// Querying the store with an {id}

private KafkaStreams StoreStream;
ReadOnlyKeyValueStore<String, String> store = StoreStream.store(StoreQueryParameters.fromNameAndType("queryable_state_store", QueryableStoreTypes.keyValueStore()));
String service = store.get(id);

I’m using :
Confluent v5.5.2
Kafka streams v2.5.0

My Streams Config :
INFO org.apache.kafka.common.config.AbstractConfig [main] StreamsConfig values:
application.id = kafka.streams.state-store-testing
application.server =
bootstrap.servers = [br1:39092, br2:39092, br3:39092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 30000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class com.kafka.SendToDLQ
default.key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
default.production.exception.handler = class com.kafka.SendToDLQ
default.timestamp.extractor = class org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
max.task.idle.ms = 0
metadata.max.age.ms = 300000
metric.reporters =
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 1
partition.grouper = class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = at_least_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 3
request.timeout.ms = 40000
retries = 0
retry.backoff.ms = 100
rocksdb.config.setter = class com.kafka.CustomRocksDBConfig
security.protocol = SASL_SSL
send.buffer.bytes = 131072
state.cleanup.delay.ms = 600000
state.dir = /statestore
topology.optimization = all
upgrade.from = null
windowstore.changelog.additional.retention.ms = 86400000


Another thing I observed is,
even though I"m using topology.optimization = all ; I still see that change-log topic is getting created. But the document on KTable says that :
" …
An internal changelog topic is created by default. Because the source topic can be used for recovery, you can avoid creating the changelog topic by setting the "topology.optimization" to "all" in the StreamsConfig ."

Could anyone help me to understand these two issues?
Thanks

State stores in Kafka Streams are always shareded/partitioned based on the number of input topic partitions. If you have a single instance, it will host all shards, however, for a multi-instance deployment, different instances will host different shards. Using “interactive queries” you can only query the local shards, but there is no built-in support to query remote shards on other instances.

Kafka Streams provides only the corresponding metadata via KafkaStreams#queryMetadataForKey(), KafkaStreams#allMetadataForStore(), or KafkaStreams#allMetadata(), that allows you to figure out if a key is on the local instance or a remote instance. For the remote instance case, you need to implement you own RPC to reroute the query to the corresponding instance.

Check out the docs for more detials: Kafka Streams Interactive Queries | Confluent Documentation

We also have an example that implement a routing layer: GitHub - confluentinc/kafka-streams-examples: Demo applications and code examples for Apache Kafka's Streams API.

Thanks for the reply @mjsax
May be I’m not clear in explaining my scenario.
I’m running the second instance(like launching the same code again) on the same machine where first instance is running.
When I query the rest endpoint :

  1. Second instance will not return any data
    2.Where as first instance only returning the data.

About topology optimization: you need to pass the config to streamsBuilder#build(Properties) to enable the optimization.

I’m running the second instance(like launching the same code again) on the same machine where first instance is running.

Not sure if i can follow. You mentioned that you add a new POD? But in any case, KafkaStreams instances are totally isolated from each other, even if they would run in the same JVM… So you need to use the correct KafkaStreams client instance.

  1. Second instance will not return any data
  2. Where as first instance only returning the data.

If I understand this correct, you issue the same query (ie, for the same key) to both instances and only one instance returns the data? That is expected, as only one instance will host the corresponding shard. If you want all be able to issue a query agains any instance, you would need to add code such that one instance forwards a query transparently to another instance that hold the data.

@mjsax
Regarding the topology optimization, I’m already passing the properties (also in my original post, added streams properties captures from logs)

@Bean("StreamProperties")
	public Properties StreamProperties() {
		Properties config = new Properties();		
		config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
		return config;
	}

@Bean(destroyMethod="close")
public KafkaStreams stateSStream(@Qualifier("StreamProperties") Properties properties) {

    final StreamsBuilder builder = new StreamsBuilder();
    builder.table(inputTopicName, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("queryable_state_store"));
    final Topology streamTopology = builder.build();
    KafkaStreams streams = new KafkaStreams(streamTopology, properties);		
	streams.start();
	return streams;
}

Even though I’m using the topology.optimization = all ; and my input topic is “compact” - I still see the change-log topic is getting created.

============

Regarding issue with state-store,

yes, when I issue the same query with the same key; sometime I get the result; but the other time not.

Instead of writing code, is there any other way to implement this? May be Global state-store?

Thanks for you reply again .

Given your code snippet, you don’t pass the configs into StreamsBuilder#build() but only into KafkaStreams, what is not sufficient.

final Topology streamTopology = builder.build(); // <---- not sufficient
final Topology streamTopology = builder.build(props); // correct

Optimizations are applied while the Topology is built, and if the props are not passed into build() we assume that optimization is disabled. KafkaStreams only executes whatever Topology it gets – it does not rewrite it. Thus StreamsBuilder is responsible to apply optimization while building the Topology in the first place.

Instead of writing code, is there any other way to implement this? May be Global state-store?

If you use a global state store, each instance will read all partitions, and will have a full copy of your state. Of course, it comes with the corresponding increase client side storage costs. – Also note, if you use a global-KTable in a join, it will have different semantics (maybe your program does not use a join though so it might not matter).

1 Like

I’m so sorry that I didn’t reply earlier.

Thank you very much for the suggestion. It’s working now; I do not see the change-log topic anymore.

Yes; I’m aware of the storage costs comes with it.

Just an observation - considering the use-cases where we can just scale another instance (same app.id) of the application - it would be better if kafka-streams library supports the auto-discovery mechanism built-in instead of client to implement RPC to discover the running instances for state-store related scenarios.

Anyway, thanks for your time and knowledge :slight_smile:

1 Like

I guess Kafka Streams could include something, but the philosophy so far was to keep it simple and to stay un-opinionated if possible. Some people might want to REST, others gRCP, others something else… But who knows. If the demand is large enough, we might add it. (If not in Apache Kafka, maybe in Confluent.)

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.