Hi,
I’m new on KSQLDB and I will apreciate your help.
I have a data producer (I query an API of a meteo station) and I send these data to a Kafka topic (meteo).
This is a json file of some data:
[{"topic":"meteo","partition":0,"offset":607283,"timestamp":1627487102065,"timestampType":"CREATE_TIME","headers":[],"key":"Temperatura Aria","value":{"unita":"1","nome_unita":"Unità 1","id_sensore":"2","nome":"Temperatura Aria","id_station":"2052","timestamp":"1627423328","data_ora":"2021-07-28 00:02:08","valore":"22.650"},"__confluent_index":0}]
I want to create a Kafka Stream using KSQLDB to find anomalies, for example, on temperature.
I create a table in KSQL:
CREATE TABLE meteo_stream (id VARCHAR PRIMARY KEY) WITH
(kafka_topic='meteo', value_format='AVRO');
whne I try to query this table with:
CREATE STREAM temperatura_stream (Temperatura Aria bigint, valore varchar) WITH
(kafka_topic='meteo', value_format='JSON');
I receive the error:
line 1:52: extraneous input 'bigint' expecting {',', ')'}
There is something wrong in sending message to meteo Kafka topic?
Is it possibile to create a stream analysis in Python using KQSL?
Thanks for your help.