I try to create a table with python client ksql. My query is:
CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC='AVG5MINTEMP', KEY_FORMAT='JSON', PARTITIONS=1, REPLICAS=1) AS SELECT
METEO_RAW.TYPE TYPE,
METEO_RAW.NOME_UNITA NOME_UNITA,
AVG(METEO_RAW.VALORE) AVG_VALUE
FROM METEO_RAW METEO_RAW
WINDOW TUMBLING ( SIZE 5 MINUTES )
GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA
where METEO_RAW.NOME_UNITA = 'Unita 1' AND
METEO_RAW.ID_SENSORE = 2
EMIT CHANGES;
where METEO_RAW is a stream from a Kafka topic. In the UI KSQLdb interface it works fine.
So I try with the sample command to submit a query:
client.create_stream_as(table_name="AVG5MINTEMP",
select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(NOME_UNITA.VALORE) AVG_VALUE"],
src_table="METEO_RAW",
kafka_topic="AVG5MINTEMP",
value_format = 'JSON',
conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE \
WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")
but I receive the error:
DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:199: mismatched input 'WINDOW' expecting {'(', 'EMIT', 'CHANGES', 'FINAL', 'NOT', 'ESCAPE', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'KEY', 'SINK', 'SOURCE', 'PRIMARY', 'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', '+', '-', STRING, INTEGER_VALUE, DECIMAL_VALUE, FLOATING_POINT_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(NOME_UNITA.VALORE) AVG_VALUE FROM METEO_RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}
I try with create_stram_as in this manner:
client.create_stream_as(table_name="AVG5MINTEMP",
select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(METEO_RAW.VALORE) AVG_VALUE"],
src_table="METEO_RAW METEO RAW",
kafka_topic="AVG5MINTEMP",
value_format = 'JSON',
conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")
but I receive the error:
DEBUG:root:KSQL generated: CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;
DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:198: mismatched input 'RAW' expecting {';', 'EMIT', 'WHERE', 'WINDOW', 'GROUP', 'HAVING', 'LIMIT', 'PARTITION'}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}
What is wrong?
Any help is apreciated.