Hello. I have a kafka streams app and i want to use json schema serde so that i can later map the json fields to table columns in my db.
When i produce with a string value serde the records are inserted into the kafka topic correctly. But when i use the json schema serde nothing is produced. Any tips?
private static void tempProcessor() {
// Configuration properties for Kafka Streams
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaJsonSchemaSerde.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
// 2) Build the Streams topology
StreamsBuilder builder = new StreamsBuilder();
ObjectMapper objectMapper = new ObjectMapper();
DateTimeFormatter tsFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
// 3) Set up the JSON‑Schema Serde for output only
KafkaJsonSchemaSerde<ProcessedTemp> jsonSchemaSerde = new KafkaJsonSchemaSerde<>();
jsonSchemaSerde.configure(
Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081",
AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, true
),
/* isKey= */ false
);
// Read from the "temp" topic
KStream<String, String> inputStream = builder.stream("temp", Consumed.with(Serdes.String(), Serdes.String()));
System.out.println("⏳ Listening to topic: temp");
inputStream.peek((key, value) -> System.out.println("Received message: " + value));
KStream<String, ProcessedTemp> processedStream = inputStream
.map((key, value) -> {
try {
JsonNode root = objectMapper.readTree(value);
JsonNode payload = root.get("payload");
ProcessedTemp out = new ProcessedTemp();
out.MESSAGE_ID = payload.path("message_id").asText();
out.PRODUCER = key;
out.PRODUCED_TIMESTAMP = payload.path("timestamp").asText();
out.KAFKA_TIMESTAMP = payload.path("kafka_timestamp").asText();
out.PROCESSED_TIMESTAMP = ZonedDateTime.now(ZoneOffset.UTC)
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS"));
out.TEMPERATURE_FAHRENHEIT = payload.path("temperature").asDouble();
out.UPDATED_TEMPERATURE_CELSIUS =
(out.TEMPERATURE_FAHRENHEIT - 32) * 5.0 / 9.0;
return KeyValue.pair(key, out);
} catch (Exception e) {
e.printStackTrace();
return KeyValue.pair(key, null);
}
})
.filter((k, v) -> v != null)
.peek((key, out) -> {
try {
System.out.println("About to publish: key=" + key + " value=" + objectMapper.writeValueAsString(out));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
});
processedStream.to(
"processed_KafkaStreams_temp",
Produced.with(Serdes.String(), jsonSchemaSerde)
);
// Build the topology and start the Kafka Streams application
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
System.out.println("Kafka Streams application started.");
// Shutdown hook to stop Kafka Streams on application exit
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}