Hi!
I’m having some trouble understanding spring cloud stream kafka streams:
given this function from the reference document
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.branch(isEnglish, isFrench, isSpanish);
}
it returns multiple KStreams that go to separate kafka bindings (topics). this there a way to branch with multiple return types? For instance something other than WordCount above?
I am able to accomplish this with streambridge and the generic spring streams
public Consumer<Type> processMessage() {
// some code
// to topic A
streambridge.send("topicA", Type1)
// to topic B
streambridge.send("topicB", Type2)
}