Kafka Stream newbie question

Hi,
I am new to kafta streams and streaming.
I have a requirement to have some summary table from some event stream. But i couldn’t figure it how to design it. Can someone help me to design this with java ?

The event messages like this.

MyMessage {
      message_id,
      message_group_id,
       status ->    Can be  PENDING, SUCCESFUL, UNSUCESSFULL
  }

I can have events flowing like

Message_Id,   group_id,   status
    1         1          PENDING
    2         1          PENDING.
    3         2          PENDING
    1         1          SUCCESS

I want to reach some table like

group_id   pending_count,  success_count
      1            1           1
      2           1            0

Not 100% sure if I understand what you try to do. I believe you want to count the number of “pending” and “success” messages per group?

If yes, you could do something like this:

// You message must have a combined key ID = {m_id, g_id}
// the value contains the status

// to get update semantics (ie status changes from "pending" to "success"),
// -> read the topic as table
KTable<ID, Status> table = builder.table("topic");

// group by the group_id and apply the aggregation
KTable<gId,Counts> result = table
  .groupBy((k,v) -> k.getGroupId) // extract the group_id from the combined ID key
  .aggregate(
    () -> new Counts(),
    (k, v, c) -> c.add(v),
    (k, v, c) -> c.subtract(v)
  );

The aggregation need to return a “pair” Count that contains two fields pending-count and success_count. Your Initializer function creates a new Count object with both counts zero. If a new record comes in, you need to add/subtract from the corresponding count, depending on the status:

// Count.java

int pending = 0;
int success = 0;

public void add(String status) {
  if (status.equals("Pending")) ++pending else ++success;
}

public void subtract(String status) {
  if (status.equals("Pending")) --pending else --success;
}
1 Like

Thanks for the information.
I implemented according to your suggestions.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.