How KTable takes latest event and ignore others?

My topology just joins 2 tables. For simplicity tables are built over 1-partition topics

BiConsumer<KTable<String, String>, KTable<String, String>> t1t2() {
        return (t1, t2) ->
            t1.join(t2, String::concat)
                    .toStream()
                    .peek((key, value) -> System.out.println(key +" : " + value))
                    .peek((key, value) -> {sleep 10 seconds)

topic t1 has 2 events:

a:_
b:_

I started application and I put the following sequence of events to topic t2

a:a
a:b
b:a
a:c
b:b

I see the following ouput of my application:

a : _a
a : _c
b : _b

Events a:b and b:a are ignored by topology (no output a:_b and b:_a). They do not exist in t2-STATE-STORE-0000000003-changelog.
That is fine because I expected it.
My question is:

  • How it works, why Kafka Stream knows that event a:b can be ignored
  • Which configuration parameters are involved in that process

Sounds related to caching. You can disable caching globally, by setting config cache.max.bytex.buffering = 0 or on a per-table basis passing in a Materialized config object with withCachingDisabled().

Cf Kafka Streams Memory Management | Confluent Documentation

Thanks, it works as described.

But I read a doc about the config and I found:

The semantics of caching is that data is flushed to the state store and forwarded to the next downstream processor node whenever the earliest of commit.interval.ms or cache.max.bytes.buffering (cache pressure) hits.

I have default cache.max.bytes.buffering (==big) and commit.interval.ms=100
In my case I enter events to topic manually so it takes more than 100 ms but I see in t2-STATE-STORE-0000000003-changelog caching fully works.

So in my case: move data from topic t2 to state store t2-STATE-STORE commit.interval.ms does not matter

I guess your call to “sleep” could be the issue? When you sleep, you block the thread and no commit can happen (the commit is executed on the same thread).

Note that commit.interval.ms is basically a “lower bound” but not a guarantee when a commit happens.

It’s also related to max.poll.records configs: we don’t check if we need to commit after every processed records (for higher efficiently).