I am trying to implement an N-input stream and N-output stream
Where I have multiple input topics and multiple output topics.
E.g.:
Input_topic_1 ----> Output_topic_1 and Output_topic_2
Input_topic_2 -----> Output_topic_2 and Output_topic_3
Input_topic_3 -----> Output_topic_1 and Output_topic_3
Single stream works with method signature:
@Bean
public Function<KStream<String, String>, KStream<String, JsonNode>[]> process()
Even BiFunction works for 2 inputs.
@Bean
public BiFunction<KStream<String, String>, KStream<String, String>, KStream<String, JsonNode>[]> process() {
But My use case is to process more than 2 inputs. In Java, there is Function And BiFunction to take input parameters, I have tried using
@Bean
public Function<KStream<String, String>[], KStream<String, JsonNode>[]> process()
And
@Bean
public Function<List<KStream<String, String>>, KStream<String, JsonNode>[]> process()
but App not able to recognize process method:
app.yaml:
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
num.stream.threads: 2
zkNodes: localhost:2181
brokers: localhost:9092
default:
consumer:
applicationId: app1
function:
definition: process
bindings:
process-in-0:
destination: in-1
process-in-1:
destination: in-2
process-in-2:
destination: in-3
process-out-0:
destination: out-1
process-out-1:
destination: out-2
process-out-2:
destination: out-3
I am sure, something is wrong with the method signature. Please help me to get the correct approach to achieve n-input and n-output. TIA!!