[Question][Doubt] Compacted Topic

Hello!

I’m trying to configure and see how compact topics work, but all my attempts have been unsuccessful.

1st I did with data that came from Kafka-connect, then I rekey one of the new topics (so I can have a key), but I never got only the last state of the same key!

Today I’ve been playing with an example that I found on the internet.
I’m going to share one of the examples that I found to see if someone can help me find what is wrong with the configuration.

I’ve been testing Kafka within docker containers, using the latest version on each image (images from confluent).

In here I guess that I have all the required configurations (cleanup.policy, delete.retention, min.cleanable.dirty.ratio) to make compact topics work

kafka-topics --create --zookeeper zookeeper:2181 --topic latest-product-price --replication-factor 1 --partitions 1 --config "cleanup.policy=compact" --config "delete.retention.ms=100"  --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"

Then I’ve started my producer with

kafka-console-producer --broker-list localhost:9092 --topic latest-product-price --property parse.key=true --property key.separator=:

I’ve inserted this in the producer

>1:Tiago
>2:Silvia
>3:Ruizinho
>3:Rui
>1:Tiago
>2:Silvia
>3:Ruizinho
>3:Rui

Then I started my consumer with

kafka-console-consumer --bootstrap-server localhost:9092 --topic latest-product-price --property  print.key=true --property key.separator=: --from-beginning

and I got the this

1:Tiago
2:Silvia
3:Ruizinho
3:Rui
(...)

For key 3 I should have Rui instead of Ruizinho.

What am I doing wrong here?

Thank you in advance!


edit1
Meanwhile, I’ve been testing it and I notice that now the consumer shows (using the same command that I’ve showed before)

3:Rui
1:um
2:dois
3:tres

Is this an issue only with the representation of the data?
I’ve tested it with KsqlDB, but I also got duplicates for the lat value that I’ve updated.
Why is it showing the values Rui and tres for the key 3?.

The value for a key won’t immediately replace what’s there. A compacted topic just guarantees to always have at least the latest value for every key.

I believe there’s a log compaction process that runs that does the actual compaction. Check out Kafka: The Definitive Guide which I’m sure will cover this in detail.

Yes, I know that will not update it immediately.
Yesterday I waited about 30 minutes, with similar configurations that I have posted earlier.

I believe there’s a log compaction process that runs that does the actual compaction
Yes, it has.

I’m starting to believe that I’m facing a representation issue rather than with the compacted topic itself.

Thanks.

Check out kafkacat for inspecting the topic contents in detail - you can check partition, offset, etc to understand exactly what’s happening.

1 Like

I just looked it up - Kafka: The Definitive Guide covers compaction in detail in Chapter 5 (Kafka Internals) under the Physical Storage section:

  • Compaction
  • How Compaction Works
  • Deleted Events
  • When Are Topics Compacted?

Yes, yesterday I gave a look at it.
I’ll read a little more about it because after all of my research about this subject I’m starting to believe that must be a silly thing that is giving me the wrong results or the wrong perception of the results.

I’ll also try kafkacat as you suggested.

Thank you very much @rmoff

1 Like

Just to share the output from kafkacat


(it stills shows two values for key 1)

The metadata that I get is

Metadata for latest-product-price (from broker 1: kafka:29092/1):
 1 brokers:
  broker 1 at kafka:29092 (controller)
 1 topics:
  topic "latest-product-price" with 1 partitions:
    partition 0, leader 1, replicas: 1, isrs: 1

and with kafka-configs describing my topic shows this

Dynamic configs for topic latest-product-price are:
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  min.cleanable.dirty.ratio=0.01 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.cleanable.dirty.ratio=0.01, DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
  delete.retention.ms=100 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:delete.retention.ms=100, DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
  segment.ms=100 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.ms=100}

Conclusion: I’ll read chapter 5 with more attention because I still don’t get why it updated all the other values but the very last update that I do on the topic. The new value is saved and the old one remains as well.
I’ll post updates if I get an advance on this.