Json Schema Serde not publishing to topic

Hello. I have a kafka streams app and i want to use json schema serde so that i can later map the json fields to table columns in my db.

When i produce with a string value serde the records are inserted into the kafka topic correctly. But when i use the json schema serde nothing is produced. Any tips?

private static void tempProcessor() {
        // Configuration properties for Kafka Streams
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature_processor");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaJsonSchemaSerde.class.getName());
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        // 2) Build the Streams topology
        StreamsBuilder builder = new StreamsBuilder();
        ObjectMapper objectMapper = new ObjectMapper();
        DateTimeFormatter tsFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");

        // 3) Set up the JSON‑Schema Serde for output only
        KafkaJsonSchemaSerde<ProcessedTemp> jsonSchemaSerde = new KafkaJsonSchemaSerde<>();
        jsonSchemaSerde.configure(
                Map.of(
                        AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081",
                        AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS,       true
                ),
                /* isKey= */ false
        );

        // Read from the "temp" topic
        KStream<String, String> inputStream = builder.stream("temp", Consumed.with(Serdes.String(), Serdes.String()));
        System.out.println("⏳ Listening to topic: temp");

        inputStream.peek((key, value) -> System.out.println("Received message: " + value));

        KStream<String, ProcessedTemp> processedStream = inputStream
                .map((key, value) -> {
                    try {
                        JsonNode root    = objectMapper.readTree(value);
                        JsonNode payload = root.get("payload");

                        ProcessedTemp out = new ProcessedTemp();
                        out.MESSAGE_ID              = payload.path("message_id").asText();
                        out.PRODUCER                = key;
                        out.PRODUCED_TIMESTAMP      = payload.path("timestamp").asText();
                        out.KAFKA_TIMESTAMP         = payload.path("kafka_timestamp").asText();
                        out.PROCESSED_TIMESTAMP     = ZonedDateTime.now(ZoneOffset.UTC)
                                .format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"));
                        out.TEMPERATURE_FAHRENHEIT  = payload.path("temperature").asDouble();
                        out.UPDATED_TEMPERATURE_CELSIUS =
                                (out.TEMPERATURE_FAHRENHEIT - 32) * 5.0 / 9.0;

                        return KeyValue.pair(key, out);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return KeyValue.pair(key, null);
                    }
                })
                .filter((k, v) -> v != null)
                .peek((key, out) -> {
                    try {
                        System.out.println("About to publish: key=" + key + " value=" + objectMapper.writeValueAsString(out));
                    } catch (JsonProcessingException e) {
                        throw new RuntimeException(e);
                    }
                });

        processedStream.to(
                "processed_KafkaStreams_temp",
                Produced.with(Serdes.String(), jsonSchemaSerde)
        );

        // Build the topology and start the Kafka Streams application
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        System.out.println("Kafka Streams application started.");
        // Shutdown hook to stop Kafka Streams on application exit
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

Hard to say. Did you inspect the logs for any ERRORs or WARNings?

When i first start the kafka streams app the schema is correctly inserted in the internal topic _schemas like so:

{
	"subject": "processed_KafkaStreams_temp-value",
	"version": 1,
	"id": 3,
	"schema": "{\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"title\":\"Processed Temp\",\"type\":\"object\",\"additionalProperties\":false,\"properties\":{\"PRODUCER\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"MESSAGE_ID\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"TEMPERATURE_FAHRENHEIT\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"number\"}]},\"UPDATED_TEMPERATURE_CELSIUS\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"number\"}]},\"PRODUCED_TIMESTAMP\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"KAFKA_TIMESTAMP\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]},\"PROCESSED_TIMESTAMP\":{\"oneOf\":[{\"type\":\"null\",\"title\":\"Not included\"},{\"type\":\"string\"}]}}}",
	"deleted": false,
	"schemaType": "JSON"
}

When my producer sends a message i get these logs from my app:

Received message: {"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"message_id"},{"type":"double","optional":true,"field":"temperature"},{"type":"string","optional":true,"field":"timestamp"},{"type":"string","optional":true,"field":"kafka_timestamp"}],"optional":false,"name":"Temp"},"payload":{"message_id":"25ca4ada-7097-4d28-96b6-4da1afb79a65","temperature":86.23775696338181,"timestamp":"2025-05-11T09:10:17.581029500","kafka_timestamp":"2025-05-11T09:10:17.624"}}
▶️ About to publish: key=mqtt/temp/1 value={"PRODUCER":"mqtt/temp/1","MESSAGE_ID":"25ca4ada-7097-4d28-96b6-4da1afb79a65","TEMPERATURE_FAHRENHEIT":86.23775696338181,"UPDATED_TEMPERATURE_CELSIUS":30.132087201878782,"PRODUCED_TIMESTAMP":"2025-05-11T09:10:17.581029500","KAFKA_TIMESTAMP":"2025-05-11T09:10:17.624","PROCESSED_TIMESTAMP":"2025-05-11T09:10:17.746472100"}

It just gets stuck there and never publishes to the topic even if new messages are produced and are supposed to enter the stream for processing.
Could it be something with my ProcessedTemp class?

package org.example;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

// Only non-null fields
@JsonInclude(JsonInclude.Include.NON_NULL)
public class ProcessedTemp {

    @JsonProperty("PRODUCER")
    public String PRODUCER;

    @JsonProperty("MESSAGE_ID")
    public String MESSAGE_ID;

    @JsonProperty("TEMPERATURE_FAHRENHEIT")
    public Double TEMPERATURE_FAHRENHEIT;

    @JsonProperty("UPDATED_TEMPERATURE_CELSIUS")
    public Double UPDATED_TEMPERATURE_CELSIUS;

    @JsonProperty("PRODUCED_TIMESTAMP")
    public String PRODUCED_TIMESTAMP;

    @JsonProperty("KAFKA_TIMESTAMP")
    public String KAFKA_TIMESTAMP;

    @JsonProperty("PROCESSED_TIMESTAMP")
    public String PROCESSED_TIMESTAMP;

    public ProcessedTemp() {}
}

Still no idea. If there is no error message, it’s very hard to say. If the producer is really stuck sending the message, it would retry over and over again… The logs should tell you (you might need DEBUG level, for both org.apache.kafka.streams.* and org.apache.kafka.clients.producer.*.