Hi Team,
I am having protobuf messages of type AccountHolder and doing the aggreagation of it, want to store the aggregrated result into another protobuf message type ProcessedStream. Below is my code, I am getting error like “Incompatible equality constraint: ProcessedStream and AccountHolder” at aggregate(). I am new to kafkaf stream so please kindly help! Thanks in advance!!
KStream<String, AccountHolderOuterClass.AccountHolder> stream = builder.stream("streams-test-app-input", Consumed.with(Serdes.String(),accountHolderSerde));
AccountHolderOuterClass.AccountHolder initialValues = AccountHolderOuterClass.AccountHolder.newBuilder()
.setAmount(0).setName(null).setTime(Instant.ofEpochMilli(0L).toString())
.build();
KTable<String, ProcessedStreamOuterClass.ProcessedStream> bankbalance = stream.groupByKey(Grouped.with(Serdes.String(),accountHolderSerde))
.aggregate(() -> initialValues,(key, transaction, balance) -> balanceCalc(transaction, balance), Named.as("balance-agg"),Materialized.with(Serdes.String(),accountHolderSerde));
private static ProcessedStreamOuterClass.ProcessedStream balanceCalc(AccountHolderOuterClass.AccountHolder transaction, AccountHolderOuterClass.AccountHolder balance){
Long bal_time = Instant.parse(balance.getTime()).toEpochMilli();
Long trans_time = Instant.parse(transaction.getTime()).toEpochMilli();
Instant max = Instant.ofEpochMilli(Math.max(bal_time,trans_time));
int count = 1;
count++;
ProcessedStreamOuterClass.ProcessedStream totalBalance = ProcessedStreamOuterClass.ProcessedStream.newBuilder()
.setBalance(balance.getAmount()+transaction.getAmount())
.setCount(Integer.toString(count))
.setTime(max.toString())
.build();
return ProcessedStreamOuterClass.ProcessedStream.newBuilder().build();
}