Kafka Streams application - new topology deployment

What are the strategies to deploy application based on Kafka Streams, with a new topology, but same input topics, so that no records are missed, and no duplicates are sent downstream? Bumping application id causes new consumer group to be created, which offsets will be set according to auto.offset.reset config parameter. Assuming the requirement for exactly-once processing - what are the approaches to resume processing from offsets committed by the previous version of the application?

2 Likes

Hi @yagiel!

Currently, the official strategy is to use a new application.id or to reset the application and reprocess historical data. That is needed because the new topology might be incompatible with the old topology and errors with the states and the repartition topics may arise. Additionally, also the semantics of the processing may be changed with the new topology (but that is probably the reason to change the topology).

However, there are certainly cases where the new topology is compatible with the old one and the topology can be changed without the need for changing the application.id or reset the application.

So it depends on the changes to the topology.

We have discussed addition of topology evolution but we haven’t yet put it on our short-term roadmap.

Best,
Bruno

2 Likes

Thanks @Bruno for your reply. It’s absolutely correct, when topology changes (in a breaking way) internal topics and state stores get new names, thus it is required to bump application id or perform application reset.
It may be somewhat niche problem what I am referring to, but what I observed is that sometimes it is required to start processing from where former application version stopped (in terms of source topics).
For now, I guess, some solution could be to set offsets manually for a consumer group created for new application-id, but it is quite cumbersome task. Another approach may be to create completely separate stack, with different output topics as well, and then switch consumer’s at some point - but it brings a notion of coordinated deployments chain, which should be avoided.
Is there any KIP with ongoing discussion about topology evolution, or is it too early stage to create one?

Hi @yagiel,

The naming issue can be somehow alleviated by explicitly naming all your operators. That obviously needs to be done when the topology is created. But this does not guarantee compatibility in general.

I personally do not think that it is such a niche problem, because a topology needs to be upgraded sooner or later. The simpler such an upgrade is the better.

No, there hasn’t been a KIP, yet. Just some ideas that I discussed with some folks.

Best,
Bruno

2 Likes

Hi @Bruno,

I wondered if you could elaborate on why naming all your operators isn’t enough?

I also wondered where the best guidance on this topic is, preferably covering how to tell if your topology changes are compatible or not?

I ask because I agree the issue isn’t niche, but using a new application ID just to be safe isn’t very satisfying especially when you have lots of data that then needs to be reprocessed.

Hi @colijack,

Explicit naming helps when you add a new operator to your topology. Otherwise the names of your existing operators would change with automatic naming due to the changed order of the operators.
However, if you change a topology, you might also change the serialization format of data. For example, you might change the serialization format of an aggregate. Or you might add or remove a repartition step that changes the set of tasks that Streams distributes across the Streams instances that may affect rolling upgrades. Probably, there are also other changes to consider.

There are some ideas to have a tool that analyses old and new topology and gives you kind of a compatibility report. But it is still just an idea.

Good to know that it is not niche for you! That shows us that users need this feature. I also think that we can do better with topology upgrades.

Best,
Bruno

1 Like

Hi @Bruno,

Thanks again for replying.

In terms of it being a feature we’d need, I’d actually say clarity around deploying changes, and tooling to help make it easier (migrating state data, explicitly warning of incompatibility), would be quite high up our teams’ priority list.

In some areas the guidance makes it seem like naming is enough (such as [1] and [2]), so its easy to get caught out. However just changing the application ID has its own issues because starting a streaming app up from scratch isn’t an insignificant issue when you have lots of data to re-ingest.

If there’s anything you need from us in terms of examples for the sort of features you are considering then we’re definitely happy to provide them.

Thanks,

Colin

[1]
https://docs.confluent.io/platform/current/streams/developer-guide/dsl-topology-naming.html
Now, even though you’ve added processors before your state store, the store name and its changelog topic names don’t change. This makes your topology more robust and resilient to changes made by adding or removing processors.

[2]

As of 2.1, you can set up your application for safe alterations without resetting by naming stateful operators. The most stable practice would be to name every operator that supports naming. Then, you could alter the topology arbitrarily (including adding and removing suppressions), and Streams would still be able to keep track of all its resources.

1 Like

At my organization, we are developing applications with strict requirements for availability and zero processing downtime. Therefore, some sort of topic evolution/upgrade would be greatly appreciated! I think such a feature would help Kafka better integrate with DevOps principles and agile/iterative development.

1 Like

Some topology changes are “incompatible”, requiring a new application.id or a reset. Per discussion with Confluent Support, a topology change is incompatible if there is any difference in the stateful portions of a topology description (“state store name and their changelog topic names, as well as repartition topics”). It would be nice if there were an automated way to detect whether a given deployment’s topology is incompatible. That would be useful for CI/CD – if we know a given deployment’s topology is compatible, it’s safe to deploy without the need for a reset or new application id.

One idea would be if Kafka Streams could produce a more specific topology description, one which omits non-stateful processors (the change in the name of a filter or suppress, for example, is irrelevant for compatibility), ie. describeStatefulTopology(). That could be used as input for comparison with the previous deployment’s description.

1 Like

When a given topology is incompatible with the previous one, it would be nice if Kafka Streams could display a useful error on startup. Currently, for example, if we have a stateful stream processor which relies on named state stores A and B, and we then revise the topology by having it add an additional state store C, that results in an infinite loop during state store restoration. Infinite loops are ambiguous at first, the StoreChangelogReader emits log entries such as ...-changelog-0 cannot be found; will retry in the next time., suggesting it is just a question of time. Only as the app remains stuck do we eventually infer that no amount of waiting will help.

For the specific case where we want to add state store C to a stateful stream processor currently using named state stores A and B, and we want to re-use A and B’s stores, rather than rebuild them (for example: in a case where values assigned to entries in A and B involve non-deterministic results, such as assigning a UUID for a given Avro key), currently that feature is missing:
https://issues.apache.org/jira/browse/KAFKA-13627
and the KIP is inactive:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
(State of KIP is listed as “Under discussion” but discussion halted as of Feb 2022)

Has anyone found any workarounds in such a case?
I was hoping it might be possible to use a combination of copying changelog topics and generating consumer offsets.

Before

changelog-A, offsets: X
changelog-B, offsets: Y

After

changelog-A’, offsets: X
changelog-B’: offsets: Y
changelog-C: offsets: Z

That is by copying changelog topics A and B into A’ and B’, and by creating consumer offsets (offset 0 for each partition) for new changelog topic C, could that be sufficient to avoid the “changelog cannot be found; will retry in the next time” loop that StoreChangelogReader otherwise falls into?

Has anyone tried something similar to this?