Spring Cloud Kafka Streams

Hi!

I’m having some trouble understanding spring cloud stream kafka streams:

given this function from the reference document

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> input
            .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
            .groupBy((key, value) -> value)
            .windowedBy(TimeWindows.of(5000))
            .count(Materialized.as("WordCounts-branch"))
            .toStream()
            .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                    new Date(key.window().start()), new Date(key.window().end()))))
            .branch(isEnglish, isFrench, isSpanish);
}

it returns multiple KStreams that go to separate kafka bindings (topics). this there a way to branch with multiple return types? For instance something other than WordCount above?

I am able to accomplish this with streambridge and the generic spring streams

public Consumer<Type> processMessage() {

    // some code
// to topic A
    streambridge.send("topicA", Type1)
// to topic B
    streambridge.send("topicB", Type2)
    }

While I am a Spring user and a Kafka Streams User, I am not a Kafka-Streams/Spring Cloud user. What I see here is a standard Kafka Stream question just wrapped withing Spring Cloud notation.

How about moving your branch logic into your .map() function and return a different value type based on the rule. Then, instead of branching use a topic name extractor that sends to “topicA” if the type is Type1 and “topicB” if the type is Type2, etc.

I am not familiar with how this would fit into Spring Cloud’s Functional wrapper of a KStream (since .to() ends the stream – returns void), but this is how I would achieve what you are looking for with Kafka Streams.

// lambda function that is a TopicNameExtractor

.to((k, v) -> {
  if (v instanceof TypeA) { 
     return "topicA"; 
  } else if (v instanceof TypeB) {
     return "topicB";
  } else {
     return "other-topic";
 }
})
2 Likes