Not possible to change configuration for changelog topics

I am using persistent key-value state stores in my Kafka Streams application. I am running three replicas of the same application in OpenShift. As known, the state stores are backed by changelog topics.

My application is processing records within a time interval and for storing the records we are using a unique key per time interval

key = records.key()+ ";" + timeIntervalStart + timeIntervalEnd

Since there exists an ever-growing number of intervals, we will have an evergrowing number of keys. Note that I am deleting the expired records with store.delete(key) at a frequent interval to keep the state store clean. However, the number of records still pile up in the changelog, resulting in long restoration time for my application. As of now, the changelog topics contains tens of millions of records.

Using a WindowStore is not an option as the business logic of the application has some quirks that makes it rather tedious to use WindowStore.

Obviously, the default config of the key-value store changelog topics is not sufficient, as I need to remove records from the topic more frequently.

I have therefore tried to set the config with the withLoggingEnabled method:

val builder = StreamsBuilder()

val megabyte = 1024L * 1024L
    val segmentMB = 100 * megabyte
    val compactDeleteCleanupPolicy = "compact,delete"
    val compactRetention = Duration.ofHours(1)
    val retention = Duration.ofDays(1)

    builder.addStateStore(
        Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"),
            Serdes.String(),
            topicSerdeConfig.inputValueSerde
        ).withLoggingEnabled(
            mapOf(
                TopicConfig.CLEANUP_POLICY_CONFIG to compactDeleteCleanupPolicy,
                TopicConfig.RETENTION_MS_CONFIG to retention.toMillis().toString(),
                TopicConfig.DELETE_RETENTION_MS_CONFIG to compactRetention.toMillis().toString(),
                TopicConfig.SEGMENT_BYTES_CONFIG to segmentMB.toString()
            )
        )
    )

...

The store is then later used in a ContextualProcessor.

However, the result of withLoggingEnabled is rather arbitrary. Sometimes it works, but mostly it results in rather strange default configurations for the changelog topic:

Even weirder, applying the same state store configuration with withLoggingEnabled for a state store gave different results in my test environment and production environment. In my test environment, the correct configuration was set (e.g. cleanup.policy="compact,delete"), but in my production environment I got the weird default configuration.

After some testing, it looked like the applying of changelog configurations depends on the Kafka Streams object’s properties. If one of the properties is wrongly sat (i.e. wrong type), the changelog configuration is not applied. Note that there are no error messages is the Kafka Streams properties are wrongly sat, making debugging a tedious job. My Kafka Streams setup is now:

fun setupConfig(appConfig: ConfigVariables) = Properties().apply {
    put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
    put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.applicationId)
    put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java.name)
    put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java.name)
    put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler::class.java)
}

which worked in test, but not in production. Also, when I include other properties, such as,

put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3)

I am getting the default changelog config in both test and production.

Have anyone else experienced this rather arbitrary behaviour from withLoggingEnabled?

To further debug, I have been looking in the logs for kafka/ChangelogTopics.java at f79c2a6e04129832eacec60bbc180173ecc37549 · apache/kafka · GitHub
and the log message looks correct

stream-thread [my_app-8cfe42b4-8c27-4775-99e5-b8dd008c3f17-StreamThread-1-consumer] Created state changelog topics [UnwindowedChangelogTopicConfig(name=my_app-my-store-changelog, topicConfigs={cleanup.policy=compact,delete, retention.ms=172800000, segment.bytes=104857600, delete.retention.ms=3600000}, enforceNumberOfPartitions=false)] from the parsed topology.

However, the created topic has the following config

I have now been testing with a third-party k8s_kafka cluster, and with that cluster everything works as expected. So I guess there is something wrong with my cluster.

I found the error, and it is somewhat related to Kafka Streams: Iterative Development and Blue-Green Deployment | by Brett Jordan | Airwallex Engineering | Medium

If I also changed the application.id when creating the state store, the changelog topics were configured properly.

If I only were to change the state store name, effectively making a new state store, and not change the application.id, the state store would be configured with the weird default configuration.

Thanks, @hermanjakobsen, for sharing your experience - I’ll look into this, and it seems we should update the docs to explain this scenario.

-Bill