Mixing Processor API with Stream DSL

Hi.
How can I combine Stream DSL within a Processor based Topology?
Taking this sample topology from the KStream 101 course:

        final Topology topology = new Topology();
        topology.addSource("source-node", Serdes.String().deserializer(), electronicSerde.deserializer(), inputTopic);
        topology.addProcessor("aggregate-price", new TotalPriceOrderProcessorSupplier(storeName), "source-node");
       // --> DSL OP HERE <--
        topology.addSink(
            "sink-node", 
            outputTopic, 
            stringSerde.serializer(), 
            doubleSerde.serializer(), 
            "aggregate-price");

Is it possible to insert a stateless DSL operation like peek or filter before sending to the sink topic?

Hi rafaeltuelho,

While you can mix in the Processor API from the DSL with process or one of the transformXXX variants, you can’t do it the other way. There’s no mechanism to mix the DSL into the Processor API.

To add in peek or filter functionality, you need to use the addProcessor method.
Here’s an example of adding a simulated peek operation printing to the console

 topology.addProcessor("printing",
                () -> (Processor<String, Double, String, Double>) record -> System.out.printf("key[%s] value[%s] %n", record.key(), record.value()),
                "aggregate-price");

HTH,
Bill

1 Like

Got it!

Thanks @bbejeck

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.