Facing issues when aggregating streams with protobuf messages

Hi Team,

I am having protobuf messages of type AccountHolder and doing the aggreagation of it, want to store the aggregrated result into another protobuf message type ProcessedStream. Below is my code, I am getting error like “Incompatible equality constraint: ProcessedStream and AccountHolder” at aggregate(). I am new to kafkaf stream so please kindly help! Thanks in advance!!

KStream<String, AccountHolderOuterClass.AccountHolder> stream = builder.stream("streams-test-app-input", Consumed.with(Serdes.String(),accountHolderSerde));
        AccountHolderOuterClass.AccountHolder initialValues = AccountHolderOuterClass.AccountHolder.newBuilder()
                .setAmount(0).setName(null).setTime(Instant.ofEpochMilli(0L).toString())
                .build();

        KTable<String, ProcessedStreamOuterClass.ProcessedStream> bankbalance = stream.groupByKey(Grouped.with(Serdes.String(),accountHolderSerde))
                .aggregate(() -> initialValues,(key, transaction, balance) -> balanceCalc(transaction,  balance), Named.as("balance-agg"),Materialized.with(Serdes.String(),accountHolderSerde));

private static ProcessedStreamOuterClass.ProcessedStream balanceCalc(AccountHolderOuterClass.AccountHolder transaction, AccountHolderOuterClass.AccountHolder balance){

        Long bal_time = Instant.parse(balance.getTime()).toEpochMilli();
        Long trans_time = Instant.parse(transaction.getTime()).toEpochMilli();
        Instant max = Instant.ofEpochMilli(Math.max(bal_time,trans_time));

        int count = 1;
        count++;
        ProcessedStreamOuterClass.ProcessedStream totalBalance = ProcessedStreamOuterClass.ProcessedStream.newBuilder()
                .setBalance(balance.getAmount()+transaction.getAmount())
                .setCount(Integer.toString(count))
                .setTime(max.toString())
                .build();

        return ProcessedStreamOuterClass.ProcessedStream.newBuilder().build();

    }

Seems the Materialized.with(Serdes.String(),accountHolderSerde) you pass into aggregate() is the issue? The result is ProcessedStream and thus the value serde you pass in must be a processedStreamSerde.

Note that Materialized is types as <K,VR> (for “value result”) not <K,V>.

@mjsax My bad, thanks for the reply though. I fixed it after changing the serde to ProcessedStream it worked.

1 Like