Kafka Streams combinations of message

I have messages in topic (compact topic):

{id: 1, groupId: 1}
{id: 2, groupId: 1}
{id: 3, groupId: 1}
{id: 4, groupId: 2}

I want grouped messages by groupId and then get all possible combinations in each group. For example for groupId=1 combinations:
id:1-id:2, id:1-id:3, id:2-id:3.

How can I do this?

Maybe self-join streams?

Hi @svyat ,

Off the top of my head, first I’d use a KStream.map to swap the key and value.
For example:

Kstream.map((key, value) -> KeyValue.pair(value, key))

Then use KStream.groupByKey() followed by .aggregate(…)
like so:

groupByKey().aggregate(ArrayList::new, (key, value, list) -> list.add(value))

Or something similar to build up the combinations.
So the whole thing would look similar to:

inputStream.map((key, value) -> KeyValue.pair(value, key)).groupByKey().aggregate(ArrayList::new, (key, value, list) -> list.add(value))

HTH,
Bill

Thanks for your answer!

I would like to thank you for your books.
They were fascinating to read! Please keep writing. :pray:

I don’t quite understand the point about swap key and value.
Initially I get a message with an empty key or with a key by id field.
If I swap key and value, then as a key I have a string representation of message object, and as a key a string value of message ID field:

key: {MessageClass{id=1, groupId: 1}
value: 1

Then I won’t be able to get the groups. Maybe I got something wrong?

If I choose groupId as a key, and then in the aggregate method I add values to a special class, within which the combinatorial logic will be implemented. Will such a method work?

Hi @svyat,

I get what you’re saying here; I didn’t realize before the key was a composite that contained the groupId, but I’d still stick with my original advice with a small twist. When you call the map function, I would extract the groupId for the key and make a new value object containing the original value and the id from the old key.

I guess that’s what you’re saying here:

If I choose groupId as a key, and then in the aggregate method I add values to a special class, within which the combinatorial logic will be implemented.

So yes I believe that will work.

-Bill

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.