I am using persistent key-value state stores in my Kafka Streams application. I am running three replicas of the same application in OpenShift. As known, the state stores are backed by changelog topics.
My application is processing records within a time interval and for storing the records we are using a unique key per time interval
key = records.key()+ ";" + timeIntervalStart + timeIntervalEnd
Since there exists an ever-growing number of intervals, we will have an evergrowing number of keys. Note that I am deleting the expired records with store.delete(key)
at a frequent interval to keep the state store clean. However, the number of records still pile up in the changelog, resulting in long restoration time for my application. As of now, the changelog topics contains tens of millions of records.
Using a WindowStore
is not an option as the business logic of the application has some quirks that makes it rather tedious to use WindowStore
.
Obviously, the default config of the key-value store changelog topics is not sufficient, as I need to remove records from the topic more frequently.
I have therefore tried to set the config with the withLoggingEnabled
method:
val builder = StreamsBuilder()
val megabyte = 1024L * 1024L
val segmentMB = 100 * megabyte
val compactDeleteCleanupPolicy = "compact,delete"
val compactRetention = Duration.ofHours(1)
val retention = Duration.ofDays(1)
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"),
Serdes.String(),
topicSerdeConfig.inputValueSerde
).withLoggingEnabled(
mapOf(
TopicConfig.CLEANUP_POLICY_CONFIG to compactDeleteCleanupPolicy,
TopicConfig.RETENTION_MS_CONFIG to retention.toMillis().toString(),
TopicConfig.DELETE_RETENTION_MS_CONFIG to compactRetention.toMillis().toString(),
TopicConfig.SEGMENT_BYTES_CONFIG to segmentMB.toString()
)
)
)
...
The store is then later used in a ContextualProcessor
.
However, the result of withLoggingEnabled
is rather arbitrary. Sometimes it works, but mostly it results in rather strange default configurations for the changelog topic:
Even weirder, applying the same state store configuration with withLoggingEnabled
for a state store gave different results in my test environment and production environment. In my test environment, the correct configuration was set (e.g. cleanup.policy="compact,delete"
), but in my production environment I got the weird default configuration.
After some testing, it looked like the applying of changelog configurations depends on the Kafka Streams object’s properties. If one of the properties is wrongly sat (i.e. wrong type), the changelog configuration is not applied. Note that there are no error messages is the Kafka Streams properties are wrongly sat, making debugging a tedious job. My Kafka Streams setup is now:
fun setupConfig(appConfig: ConfigVariables) = Properties().apply {
put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.bootstrapServers)
put(StreamsConfig.APPLICATION_ID_CONFIG, appConfig.applicationId)
put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java.name)
put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde::class.java.name)
put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler::class.java)
}
which worked in test, but not in production. Also, when I include other properties, such as,
put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3)
I am getting the default changelog config in both test and production.
Have anyone else experienced this rather arbitrary behaviour from withLoggingEnabled
?