Batch processing in kstreams

Hi, I want to process a batch of x records using a tumbling window. Is it possible to process these records without having to perform any aggregations on them. I want to iterate of the rows, store them in a list and then perform some filtering and aggregations on this list.

Hi @danoomistmatiste

I’m trying to better understand your use case. Are you thinking about something like Grouping in ksqlDB which would result in records that contain arrays of values you could “store in a list” and iterate over in another downstream application, built with a standard programming language or Kafka Streams? Does this documentation on the COLLECT_LIST function speak to your use case at all?

https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/aggregate-functions/#collect_list

No, I am just trying to mimic spark microbatch and sparksql operations. I have a group of say 10 records. I want to apply a filter say select only records with status = “PAID” and the remaining records group them by userid.

I am able to process a batch of rows however, I am having trouble deduping the kstream. Example, kstream has the following rows

key1, value1
key1, value2
key2, value1
key1, value2
key3, value1
key2, value1

and I want

key1, value1
key1, value2
key2, value1
key3, value1

How can I achieve this without using a transformer and state store. Just using kstream functions like grouby, reduce etc.

It seems I am answering my own questions. Which is good in a way I guess. So for deduping the only option available is to implement a keystore. Why can’t you provide a method on the stream like distinct() or even groupBy() should do it. Why does it have to be so difficult.

There is a KIP for it: KIP-655: Windowed Distinct Operation for Kafka Streams API - Apache Kafka - Apache Software Foundation

Figured it out. I implemented a key, value state store to do the deduplication. Works like a charm.

Any idea when this KIP will be made available? It will be of immense help. Currently it seems a lot of work for doing some simple deduplication.

1 Like

There is actually an example for de-duplication on GitHub: kafka-streams-examples/EventDeduplicationLambdaIntegrationTest.java at a2962a8e9d01477390c60ec6ba5cd1ee9ae8d880 · confluentinc/kafka-streams-examples · GitHub

About the KIP. Unclear when if might ship. The KIP progress depends on many factors. You can join the dev mailing list if you are interested: Apache Kafka

2 Likes

Yes, the examples did help understand the concept. Do you have plans to implement programmatic execution of sql like spark sql. Where I can run a sql on a kstream or ktable programatically. Though you have these various options like ksql, kstreams etc. I still feel that there are some shortcomings and even simple operations like deduping , aggregation etc which can be done so very easily in spark sql is not so straightforward and easy to implement in Kafka

1 Like

As you pointed our, there is ksqlDB and we will keep investing heavily to make it more expressive and to address current limitations. Personally, I think that ksqlDB already offers a programmatic way to execute SQL statements over STREAMs and TABLEs. Doing aggregations and joins is actually straightforward with both Kafka Streams and ksqlDB. You should check it out. If we will ever add a “de-duplication” operator to ksqlDB seems to be an open question though (I guess if there is enough user demand we might. On the other hand, you can do it already today: cf. Tombstone message in Table when filtering duplicate events.

For Kafka Streams, there is actually a KIP to add a “distinct” operation to the DSL as pointed out above. There is also a KIP to add more built-in aggregation functions: KIP-747 Add support for basic aggregation APIs - Apache Kafka - Apache Software Foundation

As you can see, there is are many things in-flight…

For ksqlDB, you can also follow KLIPs if you are interesting in future development: ksql/design-proposals at master · confluentinc/ksql · GitHub