Hi,
I am new to kafta streams and streaming.
I have a requirement to have some summary table from some event stream. But i couldn’t figure it how to design it. Can someone help me to design this with java ?
The event messages like this.
MyMessage {
message_id,
message_group_id,
status -> Can be PENDING, SUCCESFUL, UNSUCESSFULL
}
Not 100% sure if I understand what you try to do. I believe you want to count the number of “pending” and “success” messages per group?
If yes, you could do something like this:
// You message must have a combined key ID = {m_id, g_id}
// the value contains the status
// to get update semantics (ie status changes from "pending" to "success"),
// -> read the topic as table
KTable<ID, Status> table = builder.table("topic");
// group by the group_id and apply the aggregation
KTable<gId,Counts> result = table
.groupBy((k,v) -> k.getGroupId) // extract the group_id from the combined ID key
.aggregate(
() -> new Counts(),
(k, v, c) -> c.add(v),
(k, v, c) -> c.subtract(v)
);
The aggregation need to return a “pair” Count that contains two fields pending-count and success_count. Your Initializer function creates a new Count object with both counts zero. If a new record comes in, you need to add/subtract from the corresponding count, depending on the status:
// Count.java
int pending = 0;
int success = 0;
public void add(String status) {
if (status.equals("Pending")) ++pending else ++success;
}
public void subtract(String status) {
if (status.equals("Pending")) --pending else --success;
}