(out of interest, what is the device that’s emitting this data? It sounds interesting!)
The first thing to do is define a STREAM on top of the topic, declaring all or some of the schema:
CREATE STREAM meteo_raw (type VARCHAR KEY, unita INT, nome_unita VARCHAR, id_sensore INT, nome VARCHAR, id_station INT, timestamp BIGINT, data_ora VARCHAR, valore DOUBLE)
WITH (KAFKA_TOPIC='meteo', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
Now you can query the data in the topic:
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> SELECT * FROM meteo_raw EMIT CHANGES;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|TYPE |UNITA |NOME_UNITA |ID_SENSORE |NOME |ID_STATION |TIMESTAMP |DATA_ORA |VALORE |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|Temperatura Aria |1 |Unità 1 |2 |Temperatura Aria |2052 |1627423328 |2021-07-28 00:02:0|22.65 |
| | | | | | | |8 | |
From here, you can filter it:
SELECT * FROM meteo_raw
WHERE type='Temperatura Aria'
AND valore > 35 EMIT CHANGES;
and optionally route this to another stream:
CREATE STREAM high_temperature AS
SELECT * FROM meteo_raw
WHERE type='Temperatura Aria'
AND valore > 35 EMIT CHANGES;
You can build an aggregate too, such as:
CREATE TABLE readings_by_time WITH (KEY_FORMAT='JSON') AS
SELECT type, nome_unita, max(valore) as max_value, avg(valore) as avg_value
FROM meteo_raw WINDOW TUMBLING (SIZE 30 MINUTES)
GROUP BY type, nome_unita;
The KEY_FORMAT='JSON' is necessary because the key becomes a complex value that a raw string alone cannot support
ksql> SELECT * FROM readings_by_time WHERE TYPE='Temperatura Aria' AND nome_unita='Unità 1';
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|TYPE |NOME_UNITA |WINDOWSTART |WINDOWEND |MAX_VALUE |AVG_VALUE |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|Temperatura Aria |Unità 1 |1627635600000 |1627637400000 |22.65 |22.65 |
Query terminated