I am experiencing some weird behavior (maybe it is not weird but expected). When I create a KStream and call a transform in it, the app stays on (until I do CTRL C) and keeps listening for messages and calls the transform of my Transformer.
However when I take the output from the stream and call the Transformer in the next step which also sends output to a topic, the close() method is called as soon as it reads the first message and app exits (as if I have done a CTRL C). Why is this the case.
Can you provide the code for your Transformer and any logs? From what you describe it sounds like there’s some error forcing Kafka Streams to shut down hence eventually the close method on Transformer executes.
However, I need to publish the output of transformer to another topic so I have created another step and am executing the transformer in this second step (transformer in first step commented out and not executed) and this is when it behaves in this strange way. BTW this 2 step execution works fine in my JSON version. It is this Avro version that is causing this issue
The reduce() will trigger a repartitioning of your data. Thus, I assume you need to specify the correct serdes to be able to write/read into/from the created repartition topic.
Again, there must be some error… Did you inspect the logs? Did you register an uncaught exception handler?
Yes, it does seem it is failing with a class cast exception. It is trying to apply SpecificAvroSerde to a String. The output of key and value is String. So if the grouped kstream has string for key and value why is it trying to apply SpecificAvroSerde?
org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic orders-approver-processing-avro-KSTREAM-REDUCE-STATE-STORE-0000000006-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL,#to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).