Fresh aggregate for each join like BlockingQueue#drainTo

I want join stream and table. If there are many events matching table element, I would like to process them together (like BlockingQueue.drainTo)

My topology is:

eventStream.groupByKey()
        .aggregate(
                StringBuilder::new,
                (key, event, aggregate) -> aggregate.append("|").append(event)),
        .toStream()
        .join(table, (eventBag, tElem) -> "table element "  + tElem + " received events " + eventBag.toString())
        .peek((key, value) -> System.out.println(key +" : " + value))
        .sleep(10 seconds)

I enter events to eventStream:

  • s1:run
  • s1:stop
  • s1:kill

I expected output

s1 : table element received events run
s1 : table element received events stop|kill

I expected output

s1 : table element received events run
s1 : table element received events run|stop|kill

I see that KSTREAM-AGGREGATE-STATE-STORE only accumulates events.
How reset accumulation after join?

You can’t the aggregation is independent of the join, and it’s just accumulating all records for the same key across all time.

In your example, I am not sure why you want to process run as “one group” and “stop,kill” as the second group. What is you condition to process event together?

You could maybe us an windowed aggregation upstream (using “emit final” semantics) to group input records based on time, to process them together downstream.

You could also do a custom aggregation upstream using transform() – this way, you have full control what records you want to process together, and what “groups” of records you set downstream into the join (including a reset of the aggregation).

In my case I’m afraid that topology processing will be slower than incoming event pressure. That is emulated by sleep.

Why I want process 2 groups independently? Because if first event is sell 1, second event is sell 2, third event is sell 3, I want process (sell 1) then (sell 2 | sell 3) not (sell 1) then (sell 1 | sell 2 | sell 3) because correct effective processing is sell 1 then sell 5 not sell 1 then sell 6.

I think “window” is not my case because I want process incoming events immediately, without delay and “group by key” is optimization. My “true” join is stream-table - without window.
I will explore transform()

In my case I’m afraid that topology processing will be slower than incoming event pressure. That is emulated by sleep.

Why do you think so? What is the expected throughput? Kafka Streams scales very well, and can processed hundreds of thousands of messages per second…

I want process (sell 1 ) then (sell 2 | sell 3 )

Your example is still rather high level. Why process sell-1 in the first group, and sell-2+sell-3 in the second group, instead of sell-1+sell-2 in the first group, and sell-3 in the second group, or even process in three groups? What is the actual criteria when you split a group?

I think “window” is not my case because I want process incoming events immediately,

Records are processed immediately. Also, if you say you want to process sell2+sell3 in one group, you actually don’t process sell2 right away, but you wait for sell3 to arrive, too…

Using transform() might still be the best way forward. Let us know how it goes.

In my case I’m afraid that topology processing will be slower than incoming event pressure. That is emulated by sleep.

Why do you think so? What is the expected throughput? Kafka Streams scales very well, and can processed hundreds of thousands of messages per second…

I believe that Kafka Streams is efficient, but I’m afraid that my own code, part of topology. will be slow. Not very slow, but slow enough to have a lag against sell stream.

I want process (sell 1 ) then (sell 2 | sell 3 )

Your example is still rather high level. Why process sell-1 in the first group, and sell-2+sell-3 in the second group, instead of sell-1+sell-2 in the first group, and sell-3 in the second group, or even process in three groups? What is the actual criteria when you split a group?

In my head I have analogy to Java BlockingQueue. You may call poll() and process events one by one. But you may call drainTo() and get all waiting events at once.

I know Kafka Streams have caching and can poll few events from topic in advance and merge them as single event for KTable.

Going back BlockingQueue, if I use drainTo() and incoming events are [sell1, sell2, sell3, sell4, sell5] depending on timeline I can receive [sell1] [sell2, sell3] [sell4, sell5]. As Kafka polls topic in advance, it could be theoretically possible.

But I think it is not done and I must accept that my topology will process [sell1] [sell2] [sell3] [sell4] [sell5]

I think “window” is not my case because I want process incoming events immediately,

Records are processed immediately. Also, if you say you want to process sell2+sell3 in one group, you actually don’t process sell2 right away, but you wait for sell3 to arrive, too…

I don’t want wait for sell3. But If Kafka Stream can look ahead on partition and see sell3 is ready, I could process both sells at once

Using transform() might still be the best way forward. Let us know how it goes.

I tried but I it does not look promising. Event if possible, it needs reimplementing cache, which Kafka already has.

[/quote]

I see. Kafka Streams abstracts away the underlying polling and “batches” of data that are fetched, but provides a record-by-record processing API.

Hence, if you want to process data together, you need to build it manually by buffering up records in a state store.

What is till unclear to me it, why you would want/need to process multiple records together, and why you cannot just each record individually?

I’m exploring Kafka Streams opportunities and one question to me is about “drainTo()”

Basically I expect very frequent updates to a topic and I explore option to reduce lag. I have no concrete use-case.

Hence, if you want to process data together, you need to build it manually by buffering up records in a state store.

I will just start my adventure with Kafka Stream, so I will use standard patterns at the beginning. I just wanted to know if there is a standard pattern for “drainTo” and it looks it isn’t

Thank you for help

That right, there is no “drainTo”. Such a feature was never requested, and given that Kafka Streams is distributed, it’s also unclear to me what it would exactly mean?

  • It seems it would need to be scoped to a single partition?
  • Given how KS is built, it just adding a KStream#drainTo() sound question able, becaues the upstream program could be arbitraty complex (not show how/if we could limit it to the “source node” case, ie, only allow it after StreamsBuilder#stream()#drainTo())
  • If there is lag, would you want to “drainTo” endOffset? (What if endOffset moves in-between?)
  • Would you want to base it on consumer.poll()? Also hard to do and depends on many consumer configs (like max.poll.records and fetch.max.bytes)

In the end, KS is somewhat opinionated (for a good reason) but still tries to give you as many building blocks to customize your logic. It’s often hard (to impossible) to just add a new feature, and often it’s also not require because the pattern you want to apply might actually not be ideal to begin with for stream processing.

Let’s me answer:

  1. It seems it would need to be scoped to a single partition?
    Yes
  2. Given how KS is built, it just adding a KStream#drainTo() sound question able, becaues the upstream program could be arbitraty complex

Consider topology (keys and values String)

eventStream
  .mapValues(s -> s.toUpperCase())
  .toTable()
  .join(table, (eventItem, tableItem) -> tableItem.concat(eventItem))
  .toStream()
  .peek((key, value) -> System.out.println(key +" : " + value))
  .sleep(10 seconds)

KTable has entry: k:.
3 events goes to KStream: k:a, k:b, k:c
output:

k:.A
k:.C

Upstream is complex (.mapValues(s -> s.toUpperCase()) but Kafka Stream can manage it and replace k:B, k:C with k:C. So it should be possible replace k:b, k:c with list [k:b, k:c] - or at least it does not depend on upstream complexity.

  1. If there is lag, would you want to “drainTo” endOffset? (What if endOffset moves in-between?)

I think it would good enough “drainTo” k:c event, like in previous example

  1. Would you want to base it on consumer.poll()?

Too technical for me. But I think that if Kafka Stream can omit event when build KTable, then can also not omit event but append to “drainTo()”

but Kafka Stream can manage it and replace k:B, k:C with k:C

That’s semantically not sound. A KStream transformation cannot just “drop” k:B on the floor. A KStream is semantically independent events, not updates.

So it should be possible replace k:b, k:c with list [k:b, k:c] - or at least it does not depend on upstream complexity.

But what is the criteria to put k:b and k:c into a list? How should KS know? What is the algorithm for it?

but Kafka Stream can manage it and replace k:B, k:C with k:C

That’s semantically not sound. A KStream transformation cannot just “drop” k:B on the floor. A KStream is semantically independent events, not updates.

Please remember that original question was: “upstream program could be arbitraty complex”. I put this example to show, that regardless upstream complexity, KS can optimize join:

  • if k:b, k:c comes “faster that topology sleep” the only one join is processed: k:.- k:C
  • if k:b, k:c comes “slower that topology sleep” two joins are processed: k:. - k:B and k:. - k:C

So it should be possible replace k:b, k:c with list [k:b, k:c] - or at least it does not depend on upstream complexity.

But what is the criteria to put k:b and k:c into a list? How should KS know? What is the algorithm for it?

That is clear:

eventStream.groupByKey()
  .aggregate(new List(), (key, value, aggregate) -> aggregate.add(value))

But when aggregate list is passed downstream, new aggregation list should be started

But when aggregate list is passed downstream, new aggregation list should be started

But when to sent the list downstream? When it has 2 elements in it? Or 3 elements? How is the decision make when to start a new list?

Let me remind second sample topology

eventStream
  .mapValues(s -> s.toUpperCase())
  .toTable()
  .join(table, (eventItem, tableItem) -> tableItem.concat(eventItem))

eventStream has events k:a, k:b, k:c
join has k:A, k:C

Answer to question when to sent the list downstream: KS in some moment decided to sent downstream k:C and in the same moment the list should be sent down

How is the decision make when to start a new list?
New list should be started when old list is sent downstream.