Global Ktable changelog topic retention vs Kafka topic retention

Hi,

If we create Global Ktable from topic that has retention of 7days, What will be the retention of Ktable change log topic?

I see option to set different retention value for changelog topic during ktable creation.

//code
Map<String, String> topicChangelogConfig = new HashMap<>();
topicChangelogConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, “compact,delete”);
topicChangelogConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(GLOBAL_TABLE_RETENTION_PERIOD));

    // Create Customer Details Global KTable
    kStreamBuilder.globalTable(
            CustomerDataTopic,
            Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(FLIGHT_DETAILS_STORE)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(customerDataSerde)
                    .withLoggingEnabled(topicChangelogConfig)
    );

Multiple questions:

  1. Can we have more retention on the ktable change log topic than the inbound topic?
    Like topic having 7 days retention, and Ktable change log topic having 60days. Is it even valid?

  2. Also, I don’t see Changelog topic created to hold state store for this global Ktable on the broker. What happens in case of failure? Does it read from source topic again?

  3. I read from documentation, that Global KTable is only suitable for static data or data not changing frequently. Will it be slow to read from the store if the data is changing frequently?

Thanks in advance.

For a global-KTable, there is no additional changelog topic, but it’s input topic is it’s changelog. Thus, the changelog topic should be configured with log-compaction, and not with retention.

(Globa) KTables do not support TTL, and thus there is no concept of data retention.

Can we have more retention on the ktable change log topic than the inbound topic?

No, because there is no additional changelog topic.

Also, I don’t see Changelog topic created to hold state store for this global Ktable on the broker. What happens in case of failure? Does it read from source topic again?

Yes, on restore, the input topic will be used.

I read from documentation, that Global KTable is only suitable for static data or data not changing frequently. Will it be slow to read from the store if the data is changing frequently?

It has nothing to do with “speed”. It’s more about semantics. A global KTable is not “time synchronized” with other topics, and thus, if data changes often, you basically get non-deterministic behavior, and if you re-process input data, your result might change on every re-run.

Thank you @mjsax !

I am finding it difficult to design our topology.

We have 2 topics(A, B), both are getting events from same APP at high volume. I am trying to create streams application to add data from topic B in to Topic A based on some key match, and it is one-to-many mapping between the event in A and event in B.

Problem is, when APP is generating events to Topic B, it will also generate events to topic A, but there is no way to link them. We need to match event in topic A with the corresponding event from Topic B. We are getting date when the event is generated in both events, we want to use that date to match.

Approach 1:
I created Global KTable store on topic B to hold all the events, and started KStream on Topic A, added transformer, inside transformer, I am generating the keys needed to pull event from Global Ktable store of topic B, using interactive queries.

It is not working as expected, when I do store.get() using query API, it is giving old records from store.

Approach 2:

If i move to Ktable instead of global Ktable, it will be problematic, because each instance will only process partial data, and interactive query on the store will only return data processed by that instance.

How do I store data coming from topic B(which is changing a lot) in a consistent way and have the ability to query it?

Thank you for helping me.

inside transformer, I am generating the keys needed to pull event from Global Ktable store of topic B, using interactive queries.

This does not sound right… IQ API should not be used from inside a Transformer.

We are getting date when the event is generated in both events, we want to use that date to match.

Sounds like you need to re-partition the data based on the date – this allows you to co-partition both input topics, do the a “local” computation.

I does not sound like as your input is actually a changelog topic, but it’s an event topic? Ie, the data might not be keyed and there is no notion of deletes? – For this case, you might want to use a Processor/Transformer with attached state store, and manage the state manually (including delete stuff you don’t need any longer).

I am creating stream on topic B, and then using process() method, with “ProcessorSupplier”, and it is reading from Interactive query ApI to read from topic A global table.

As suggested by you, I will try with attaching the global K-table state store, instead of using IQ API. What is the best use case of using IQ API?

You are correct, both topic A and topic B are regular topics not chnagelog topics, both are getting events at high rate, and I need to transform event A with data in event B.

Can I still use global Ktable for this, as data is changing at high rate?

Thank you for your responses!

What is the best use case of using IQ API?

The idea of IQ is to query state from outside.

You are correct, both topic A and topic B are regular topics not chnagelog topics, both are getting events at high rate, and I need to transform event A with data in event B.

Can I still use global Ktable for this, as data is changing at high rate?

From a “high rate” POV, yes, but it seems if there is no updates but only new events (ie, new keys get inserted), the state store would just grow unbounded?

Thank @mjsax
yes, we have some updates also coming from the source topic to Global Ktable with same key. If the application instance restarts, my guess is it will reload whatever is in the source topic, is it correct? We want to give a retention of 2 months to the source topic, that way Global Ktable will load with 2 months of data.

What will happen if we don’t restart? does it continuously grow or stick to source topic’s retention period.

If the application instance restarts, my guess is it will reload whatever is in the source topic, is it correct?

Yes. – Assuming you don’t have persistent state. If you deploy, eg, with k8s and stateful sets and re-attache the disk to the same pod, no re-loading will be required.

We want to give a retention of 2 months to the source topic, that way Global Ktable will load with 2 months of data.

Sure, but only if you lose the local state – cf. first answer – in general, the assumption is that local state is not lost across restarts, and thus, the global table would never delete anything.

What will happen if we don’t restart? does it continuously grow or stick to source topic’s retention period.

It grows forever. There is no concepts of a retention time for a global store; it would only delete rows when it processes in input tombstone record.

Thank you @mjsax! One more question.

What we are seeing is, in the processor.process, when trying to read from GKtable, it is not always giving latest record. Is it possible to add delay in processor.process() method, like Tread.sleep()?

We are creating new processor instance for each event. I mean to say supplier is giving new processor object for each event.

Not sure if I can follow?

I mean to say supplier is giving new processor object for each event.

A KafkaStreams program is long running. If would be started, and process all messages from the input topic, and if it reached the end of the topic, would continue to run, and wait until new messages arrive.

The should not be a “new processor object for each event”. What do you mean by this? Are you not running KafkaStreams continuously, but start if after a single message was written to the input topic, and stop it afterward again?

@mjsax : I was thinking it will start new thread for each event, if the same thread will be processing all the events continuously, then my question is invalid.

If I add Thread.sleep(), in Processor.process() method, does it work on all the events it is processing?

If I add Thread.sleep(), in Processor.process() method, does it work on all the events it is processing?

The sleep would be executed for every input event, slowing down the whole processing. But not sure why you would want to do this… Does not sound like the right thing to do, as it will block the corresponding thread.

What do you try to achieve on a higher level. Maybe there is some better solution instead of using sleep().