Cleanup Policy of compact AND delete

Hello,

I had a question about having a cleanup policy of compact AND delete.
I also asked in the slack channel (see link) but no clear result was found.

Here is my example topic configuration

 kafka-topics.sh --describe --bootstrap-server $KAFKA_STAGING --topic my-topic
Topic:my-topic       PartitionCount:10       ReplicationFactor:2     Configs:min.insync.replicas=2,cleanup.policy=compact,delete,retention.ms=2592000000,message.format.version=2.6-IV0,max.message.bytes=6291456,unclean.leader.election.enable=true,segment.ms=1200000

With this configuration, I had messages which were several months old in my topic.
This leads me to believe that topics that are “compacted and deleted” only delete segments, which are not compacted and not written to at all (so more like a whole topic cleanup instead of individual messages).

Does someone with more experience have the use cases in mind, which lead to the implementation of the current compact and delete behavior? Because I somehow fail to understand good examples when I should use compaction and deletion with the current behavior.

2 Likes

Partitions are compacted by the log cleaner thread. Partitions are evaluated for the delete retention setting by the log manager thread. As you suspected, the log manager thread deletes a partition log segment if the newest included message is older than the retention.ms setting.

One use case I have seen is where a compacted topic has a steady stream of messages with unique keys continuously added. Using the compact policy by itself, the topic size will grow continuously. Using compact,delete with retention.bytes, the log manager thread will prune the topic such that the combined size of all log segments for each topic partition will be limited to retention.bytes thus limiting the amount of data retained for the compacted topic.

1 Like

I don’t get the usecase though. If I have a steady stream of unique keys, why would I ever want to use compaction then?

I think the commonly expected behaviour is that messages will be first deleted and THEN compacted and not the other way round.

The steady stream of unique keys includes multiple messages for each key. At any given time, the consumer that is processing the messages only needs the most recent message for any given key so compact retention is desired. These messages do not need to be retained forever though. A simple way of limiting the accumulation of topic data over time is to use compact,delete with retention.bytes. It isn’t a common use case but does come up occasionally.

Regarding how a topic is compacted by the log cleaner thread, it does this for each “dirty” segment, first copying each message that is to be retained to a new log segment, then pointing to that new log segment, and finally deleting the “dirty” segment. If that new segment is small, the log cleaner thread will sometimes combine it with another new segment that is built from another “dirty” segment.

2 Likes

Okay I noticed that you brought up retention.bytes rather than the retention.msthat was part of the slack discussion. I see how this might be useful then. Do you see a usecase for using [compact, delete] along with retention.ms?

The “problem” is that the compaction takes place before any records will be deleted. This again leads to new segments where often at least one record in that record is new because it had been compacted. Eventually you’ll end up with many old messages in that segment, but that segment can still not be deleted because it contains a message that is newer than the configured retention time.

One use case is, if I recall correctly, aging out old windows when doing windowed aggregations with ksqlDB and Kafka Streams.

How many log segments are being retained for each partition and what are their sizes? If you reduce the partition log segment size using segment.bytes, this will limit the number of records being retained due to a single recent message being present in a given log segment. Your specific use case will determine what segment.bytes value would be most appropriate.

2 Likes

The behavior in relation to the segments sounds interesting!
For my primary use case (where I just wanted old, not-updated keys to vanish) I had a segment time of 20 minutes (it is a very active topic with lots of messages).
I don’t know how many segments there are right now and how setting size in addition to time would affect that.

The cluster is hosted on Amazon (MSK). Is there some Kafka Admin Interface I can use to check the segments? I don’t think I have access to the underlying machine.

1 Like

My apologies for the delay in responding.

Kafka topic data is written to data directories on each broker in the cluster. The data directories location is identified by the log.dirs broker config setting.

A separate directory is created for each topic partition under a data directory. The directory name is [topic name]-[partition number]. The log segments for the partition are located under its respective directory.

You probably have to reach out to the MSK support team in this case. MSK likely has its own ins and outs, do’s and don’ts here.

I was hoping that Kafka would expose this via some API but if not I’ll just file it under “too deep under the hood” to be worth the effort :sweat_smile:

I have a similar use case, but like the original question in this topic, I’m using retention.ms instead of retention.bytes. I can understand some old messages could live longer, but I can see messages that are months old, and I wouldn’t expect that with a retention of 24hrs. The config for the topic in question is the following:

cleanup.policy=delete,compact
max.compaction.lag.ms=300000
min.compaction.lag.ms=2000
min.insync.replicas=2
retention.ms=86400000
segment.ms=86400000

am I missing something? Is there any combination of configs/events that could explain the situation I’m experiencing?

This is explained by how the log cleaner operates. During compaction, it does remove individual records is a more recent record exists for a given key. But it does not delete individual records based upon the retention.ms setting. It only deletes entire segments if all records in the segment are older than retention.ms. The old records in question still exist since the segment that contains them also contains records that have not expired based upon retention.ms.

Wouldn’t segment.ms come into play, rolling the segment and causing it to be deleted eventually? or does it work in a different way/order?
Just a thought, but can changing the topic’s configuration after some time to apply compaction leave old messages from before the change in any way? I’m just trying to better understand the order things are applied and have an answer to what I see.