Hello.
I have the following kafka record key : mqtt/temp/5
and the kafka record value :
{
"message_id": "19ce932c-cc1f-4409-97f7-0e45a9a29c02",
"temperature": 86.82796014582067,
"timestamp": "2025-01-24T17:35:50.756009600"
}
Can I use KSQLDB to extract that record key and add it to the record value so that the processed stream has
{
"producer_id":"mqtt/temp/5",
"message_id": "19ce932c-cc1f-4409-97f7-0e45a9a29c02",
"temperature": 86.82796014582067,
"timestamp": "2025-01-24T17:35:50.756009600"
}
??
I currently have this implementation. What should I modify if possible?
CREATE STREAM temp_stream (
message_id STRING,
temperature DOUBLE,
timestamp VARCHAR
) WITH (
KAFKA_TOPIC = 'temp',
VALUE_FORMAT = 'JSON',
PARTITIONS = 1
);
CREATE STREAM processed_ksqldb_temp WITH (
KAFKA_TOPIC = 'processed_ksqldb_temp',
VALUE_FORMAT = 'JSON_SR',
PARTITIONS = 1
) AS
SELECT
message_id,
((temperature -32)*5/9) AS updated_temperature_Celsius,
timestamp,
TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd''T''HH:mm:ss.SSS') AS processed_timestamp
FROM temp_stream;