This is originally from a Slack thread. Copied here to make it available permanently.
You can join the community Slack here.
SGiuliani
HI guys, I am witnessing something weird. I am running an aggregate method over a stream. Every now and then I stop my java kafka stream app, manually Clear all the messages within the topic and restart the kafka streams app. The “aggregator” keeps the old value.
streamsBuilder.stream(TopicA, Consumed.with(Serdes.String(), ASerdes))
.groupByKey()
.aggregate(AAggr::new, (key, value, aggregate) -> aggregate.process(value), Materialized.with(Serdes.String(), AAggrSerdes))
.toStream()
.to(TopicAAggr, Produced.with(Serdes.String(), AAggrSerdes));
Inside the process method:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Setter
@Getter
public class AAggr{
private String id;
private double sum = 0.0;
private A lastChange;
public AAgr process(A a) {
this.id = a.geId();
this.lastChange = a;
System.out.println("this.sum: " + this.sum + " - a.sum: " + a.sum);
this.sum += a.sum;
return this;
}
}
this.sum
never seems to reset, even when the kafka streams app gets closed and the messages in the input topics are deleted. Are the results written/cached somewhere inside the cluster?
Matthias J Sax @mjsax
Seem you want to “reset” the application: Kafka Streams Application Reset Tool | Confluent Documentation
Myroslav @VMyroslav
I guess you are faced with this issue, because KStreams under the hood, “logged” your message to the internal topic for better reliability. And then it refills state from this internal topic.
You can disable this feature if you do not need it withLoggingDisabled, but then you will lose your data in case of a crash or application restart.
SGiuliani
@mjsax it worked, thanks
SGiuliani
@VMyroslav thanks for this programmatic solution