When is close method of Transformer called

I am experiencing some weird behavior (maybe it is not weird but expected). When I create a KStream and call a transform in it, the app stays on (until I do CTRL C) and keeps listening for messages and calls the transform of my Transformer.

However when I take the output from the stream and call the Transformer in the next step which also sends output to a topic, the close() method is called as soon as it reads the first message and app exits (as if I have done a CTRL C). Why is this the case.

Can you provide the code for your Transformer and any logs? From what you describe it sounds like there’s some error forcing Kafka Streams to shut down hence eventually the close method on Transformer executes.

1 Like

So the thing is, this transformer code runs fine if I run it as part of the main streams workflow like this,

KGroupedStream<String, String> messages = builder
					.<String, PoapproverSchema>stream("orders-approver-topic-avro",
							Consumed.with(Serdes.String(), valueSpecificAvroSerde))
					.filter((key, value) -> value.getStatus().toString().equalsIgnoreCase("PENDING_APPROVAL"))
					.filter((key, value) -> value.getActive() == true)
					.peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
					.map((key, poas) -> new KeyValue<>(poas.getInvoiceId() + ":" + poas.getGuid().toString(),
							poas.getInvoiceId() + ":" + poas.getGuid().toString()))
					.transform(() -> new PoapproverTransformer(poapproverStore, differentialThreshold), poapproverStore)
					.peek((key, value) -> System.out.println("key=" + key + ", value=" + value)).groupByKey();

However, I need to publish the output of transformer to another topic so I have created another step and am executing the transformer in this second step (transformer in first step commented out and not executed) and this is when it behaves in this strange way. BTW this 2 step execution works fine in my JSON version. It is this Avro version that is causing this issue

messages.reduce((aggValue, newValue) -> newValue, Materialized. *with* (Serdes. *String* (),Serdes. *String* ()))
.toStream().transform(() -> **new** PoapproverTransformer(poapproverStore, differentialThreshold), poapproverStore)
.to("orders-output-topic");

The reduce() will trigger a repartitioning of your data. Thus, I assume you need to specify the correct serdes to be able to write/read into/from the created repartition topic.

Again, there must be some error… Did you inspect the logs? Did you register an uncaught exception handler?

Yes, it does seem it is failing with a class cast exception. It is trying to apply SpecificAvroSerde to a String. The output of key and value is String. So if the grouped kstream has string for key and value why is it trying to apply SpecificAvroSerde?

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic orders-approver-processing-avro-KSTREAM-REDUCE-STATE-STORE-0000000006-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).

Believe I managed to fix it. I changed the type of value serializer to String. Thank you for all the tips.

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.