Help with a stream with KSQLdb

Hi,

I’m new on KSQLDB and I will apreciate your help.
I have a data producer (I query an API of a meteo station) and I send these data to a Kafka topic (meteo).
This is a json file of some data:

[{"topic":"meteo","partition":0,"offset":607283,"timestamp":1627487102065,"timestampType":"CREATE_TIME","headers":[],"key":"Temperatura Aria","value":{"unita":"1","nome_unita":"Unità 1","id_sensore":"2","nome":"Temperatura Aria","id_station":"2052","timestamp":"1627423328","data_ora":"2021-07-28 00:02:08","valore":"22.650"},"__confluent_index":0}]

I want to create a Kafka Stream using KSQLDB to find anomalies, for example, on temperature.
I create a table in KSQL:

CREATE TABLE meteo_stream (id VARCHAR PRIMARY KEY) WITH
    (kafka_topic='meteo', value_format='AVRO');

whne I try to query this table with:

CREATE STREAM temperatura_stream (Temperatura Aria bigint, valore varchar) WITH
  (kafka_topic='meteo', value_format='JSON');

I receive the error:

line 1:52: extraneous input 'bigint' expecting {',', ')'}

There is something wrong in sending message to meteo Kafka topic?
Is it possibile to create a stream analysis in Python using KQSL?
Thanks for your help.

The error is coming because Temperatura Aria is not a valid field name.

What are you trying to do with the data? Your CREATE STREAM statement isn’t running a query.

Hi,

thank you @rmoff for your precious help. I suspected the nouns are not correct. I modify the nouns, for example Temperatura Aria became temperaturaAria (it is air temperature).
I need to make a query to aggregate same data (e.g. temperature) and detect changes or detect anomalies: temperature is > 35 degrees.
Is it possible with a KSQL query or it needs of a more powerful tool?
Thanks.

(out of interest, what is the device that’s emitting this data? It sounds interesting!)

The first thing to do is define a STREAM on top of the topic, declaring all or some of the schema:

CREATE STREAM meteo_raw (type VARCHAR KEY, unita INT, nome_unita VARCHAR, id_sensore INT, nome VARCHAR, id_station INT, timestamp BIGINT, data_ora VARCHAR, valore DOUBLE)
WITH (KAFKA_TOPIC='meteo', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

Now you can query the data in the topic:

ksql> SET 'auto.offset.reset' = 'earliest';

ksql> SELECT * FROM meteo_raw EMIT CHANGES;
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|TYPE              |UNITA             |NOME_UNITA        |ID_SENSORE        |NOME              |ID_STATION        |TIMESTAMP         |DATA_ORA          |VALORE            |
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|Temperatura Aria  |1                 |Unità 1           |2                 |Temperatura Aria  |2052              |1627423328        |2021-07-28 00:02:0|22.65             |
|                  |                  |                  |                  |                  |                  |                  |8                 |                  |

From here, you can filter it:

SELECT * FROM meteo_raw 
WHERE type='Temperatura Aria' 
  AND valore > 35 EMIT CHANGES;

and optionally route this to another stream:

CREATE STREAM high_temperature AS 
SELECT * FROM meteo_raw 
WHERE type='Temperatura Aria' 
  AND valore > 35 EMIT CHANGES;

You can build an aggregate too, such as:

CREATE TABLE readings_by_time WITH (KEY_FORMAT='JSON') AS 
  SELECT type, nome_unita, max(valore) as max_value, avg(valore) as avg_value
  FROM meteo_raw WINDOW TUMBLING (SIZE 30 MINUTES)
  GROUP BY type, nome_unita;

The KEY_FORMAT='JSON' is necessary because the key becomes a complex value that a raw string alone cannot support

ksql> SELECT * FROM readings_by_time WHERE TYPE='Temperatura Aria' AND nome_unita='Unità 1';
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|TYPE                         |NOME_UNITA                   |WINDOWSTART                  |WINDOWEND                    |MAX_VALUE                    |AVG_VALUE                    |
+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+
|Temperatura Aria             |Unità 1                      |1627635600000                |1627637400000                |22.65                        |22.65                        |
Query terminated

Hi,

@rmoff these data become from a meteo station installed in a vineyard. Querying an API every 15 minutes I receive data and send they to meteo Kafka topic and a MongoDB sink.
So, I’m using Confluent Platform in local and I’m using UI for KSQL.
I try your commands:

CREATE STREAM meteo_raw (type VARCHAR KEY, unita INT, nome_unita VARCHAR, id_sensore INT, nome VARCHAR, id_station INT, timestamp BIGINT, data_ora VARCHAR, valore DOUBLE)
WITH (KAFKA_TOPIC=‘meteo’, KEY_FORMAT=‘KAFKA’, VALUE_FORMAT=‘JSON’);

it works;

SET ‘auto.offset.reset’ = ‘earliest’;

it works…i suppose no output;

SELECT * FROM meteo_raw EMIT CHANGES;

this query doesn’t generate any message similar your.

I suppose it is not an absence of messages because every 15 minutes there are message for Kafka topic.
Any suggestion?

If you’re using the web interface you need to set this under Add query properties instead.

This topic was automatically closed after 30 days. New replies are no longer allowed.