Extract kafka record key and add it to record value

Hello.
I have the following kafka record key : mqtt/temp/5
and the kafka record value :

{
	"message_id": "19ce932c-cc1f-4409-97f7-0e45a9a29c02",
	"temperature": 86.82796014582067,
	"timestamp": "2025-01-24T17:35:50.756009600"
}

Can I use KSQLDB to extract that record key and add it to the record value so that the processed stream has

{
        "producer_id":"mqtt/temp/5",
	"message_id": "19ce932c-cc1f-4409-97f7-0e45a9a29c02",
	"temperature": 86.82796014582067,
	"timestamp": "2025-01-24T17:35:50.756009600"
}

??

I currently have this implementation. What should I modify if possible?

CREATE STREAM temp_stream (
    message_id STRING,
    temperature DOUBLE,
    timestamp VARCHAR
) WITH (
    KAFKA_TOPIC = 'temp',
    VALUE_FORMAT = 'JSON',
    PARTITIONS = 1
);
CREATE STREAM processed_ksqldb_temp WITH (
    KAFKA_TOPIC = 'processed_ksqldb_temp',
    VALUE_FORMAT = 'JSON_SR',
    PARTITIONS = 1
) AS 
SELECT 
    message_id,
    ((temperature -32)*5/9) AS updated_temperature_Celsius,
    timestamp,
    TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd''T''HH:mm:ss.SSS') AS processed_timestamp
FROM temp_stream;

You’d have to mark message_id as KEY (see documentation here) in the wrapping stream definition, and then you can pull it into the value using AS_VALUE.

1 Like