Global Ktable changelog topic retention vs Kafka topic retention

Hi,

If we create Global Ktable from topic that has retention of 7days, What will be the retention of Ktable change log topic?

I see option to set different retention value for changelog topic during ktable creation.

//code
Map<String, String> topicChangelogConfig = new HashMap<>();
topicChangelogConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, “compact,delete”);
topicChangelogConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(GLOBAL_TABLE_RETENTION_PERIOD));

    // Create Customer Details Global KTable
    kStreamBuilder.globalTable(
            CustomerDataTopic,
            Materialized.<String, Customer, KeyValueStore<Bytes, byte[]>>as(FLIGHT_DETAILS_STORE)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(customerDataSerde)
                    .withLoggingEnabled(topicChangelogConfig)
    );

Multiple questions:

  1. Can we have more retention on the ktable change log topic than the inbound topic?
    Like topic having 7 days retention, and Ktable change log topic having 60days. Is it even valid?

  2. Also, I don’t see Changelog topic created to hold state store for this global Ktable on the broker. What happens in case of failure? Does it read from source topic again?

  3. I read from documentation, that Global KTable is only suitable for static data or data not changing frequently. Will it be slow to read from the store if the data is changing frequently?

Thanks in advance.

For a global-KTable, there is no additional changelog topic, but it’s input topic is it’s changelog. Thus, the changelog topic should be configured with log-compaction, and not with retention.

(Globa) KTables do not support TTL, and thus there is no concept of data retention.

Can we have more retention on the ktable change log topic than the inbound topic?

No, because there is no additional changelog topic.

Also, I don’t see Changelog topic created to hold state store for this global Ktable on the broker. What happens in case of failure? Does it read from source topic again?

Yes, on restore, the input topic will be used.

I read from documentation, that Global KTable is only suitable for static data or data not changing frequently. Will it be slow to read from the store if the data is changing frequently?

It has nothing to do with “speed”. It’s more about semantics. A global KTable is not “time synchronized” with other topics, and thus, if data changes often, you basically get non-deterministic behavior, and if you re-process input data, your result might change on every re-run.

Thank you @mjsax !

I am finding it difficult to design our topology.

We have 2 topics(A, B), both are getting events from same APP at high volume. I am trying to create streams application to add data from topic B in to Topic A based on some key match, and it is one-to-many mapping between the event in A and event in B.

Problem is, when APP is generating events to Topic B, it will also generate events to topic A, but there is no way to link them. We need to match event in topic A with the corresponding event from Topic B. We are getting date when the event is generated in both events, we want to use that date to match.

Approach 1:
I created Global KTable store on topic B to hold all the events, and started KStream on Topic A, added transformer, inside transformer, I am generating the keys needed to pull event from Global Ktable store of topic B, using interactive queries.

It is not working as expected, when I do store.get() using query API, it is giving old records from store.

Approach 2:

If i move to Ktable instead of global Ktable, it will be problematic, because each instance will only process partial data, and interactive query on the store will only return data processed by that instance.

How do I store data coming from topic B(which is changing a lot) in a consistent way and have the ability to query it?

Thank you for helping me.

inside transformer, I am generating the keys needed to pull event from Global Ktable store of topic B, using interactive queries.

This does not sound right… IQ API should not be used from inside a Transformer.

We are getting date when the event is generated in both events, we want to use that date to match.

Sounds like you need to re-partition the data based on the date – this allows you to co-partition both input topics, do the a “local” computation.

I does not sound like as your input is actually a changelog topic, but it’s an event topic? Ie, the data might not be keyed and there is no notion of deletes? – For this case, you might want to use a Processor/Transformer with attached state store, and manage the state manually (including delete stuff you don’t need any longer).

I am creating stream on topic B, and then using process() method, with “ProcessorSupplier”, and it is reading from Interactive query ApI to read from topic A global table.

As suggested by you, I will try with attaching the global K-table state store, instead of using IQ API. What is the best use case of using IQ API?

You are correct, both topic A and topic B are regular topics not chnagelog topics, both are getting events at high rate, and I need to transform event A with data in event B.

Can I still use global Ktable for this, as data is changing at high rate?

Thank you for your responses!

What is the best use case of using IQ API?

The idea of IQ is to query state from outside.

You are correct, both topic A and topic B are regular topics not chnagelog topics, both are getting events at high rate, and I need to transform event A with data in event B.

Can I still use global Ktable for this, as data is changing at high rate?

From a “high rate” POV, yes, but it seems if there is no updates but only new events (ie, new keys get inserted), the state store would just grow unbounded?