Create a table from a stream with python client

I try to create a table with python client ksql. My query is:

CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC='AVG5MINTEMP', KEY_FORMAT='JSON', PARTITIONS=1, REPLICAS=1) AS SELECT
  METEO_RAW.TYPE TYPE,
  METEO_RAW.NOME_UNITA NOME_UNITA,
  AVG(METEO_RAW.VALORE) AVG_VALUE
FROM METEO_RAW METEO_RAW
WINDOW TUMBLING ( SIZE 5 MINUTES ) 
GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA
where METEO_RAW.NOME_UNITA = 'Unita 1' AND
METEO_RAW.ID_SENSORE = 2
EMIT CHANGES;

where METEO_RAW is a stream from a Kafka topic. In the UI KSQLdb interface it works fine.
So I try with the sample command to submit a query:

client.create_stream_as(table_name="AVG5MINTEMP",
                        select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(NOME_UNITA.VALORE) AVG_VALUE"],
                        src_table="METEO_RAW",
                        kafka_topic="AVG5MINTEMP",
                        value_format = 'JSON',
                        conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE \
                        WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")

but I receive the error:

DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:199: mismatched input 'WINDOW' expecting {'(', 'EMIT', 'CHANGES', 'FINAL', 'NOT', 'ESCAPE', 'NULL', 'TRUE', 'FALSE', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'CASE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'TYPES', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'KEY', 'SINK', 'SOURCE', 'PRIMARY', 'REPLACE', 'ASSERT', 'ADD', 'ALTER', 'IF', '+', '-', STRING, INTEGER_VALUE, DECIMAL_VALUE, FLOATING_POINT_VALUE, IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER, VARIABLE}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(NOME_UNITA.VALORE) AVG_VALUE FROM METEO_RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) GROUP BY TYPE WHERE NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

I try with create_stram_as in this manner:

client.create_stream_as(table_name="AVG5MINTEMP",
                        select_columns=["METEO_RAW.TYPE TYPE", "METEO_RAW.NOME_UNITA NOME_UNITA", "AVG(METEO_RAW.VALORE) AVG_VALUE"],
                        src_table="METEO_RAW METEO RAW",
                        kafka_topic="AVG5MINTEMP",
                        value_format = 'JSON',
                        conditions="WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES; ")

but I receive the error:

DEBUG:root:KSQL generated: CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;
DEBUG:root:content: {'@type': 'statement_error', 'error_code': 40001, 'message': "line 1:198: mismatched input 'RAW' expecting {';', 'EMIT', 'WHERE', 'WINDOW', 'GROUP', 'HAVING', 'LIMIT', 'PARTITION'}", 'statementText': "CREATE stream AVG5MINTEMP WITH (kafka_topic='AVG5MINTEMP', value_format='JSON') AS SELECT METEO_RAW.TYPE TYPE, METEO_RAW.NOME_UNITA NOME_UNITA, AVG(METEO_RAW.VALORE) AVG_VALUE FROM METEO_RAW METEO RAW where WINDOW TUMBLING ( SIZE 5 MINUTES ) METEO_RAW.NOME_UNITA = 'Unita 1' AND METEO_RAW.ID_SENSORE=2 EMIT CHANGES;", 'entities': []}

What is wrong?
Any help is apreciated.

This looks like something specific to the Python client library. Is it this one? GitHub - bryanyang0528/ksql-python: A python wrapper for the KSQL REST API. You might be best raising an issue on the github repo there.

Yes @rmoff ! it is the client you mentioned. I try to open an issue on the github because it seems there is no manner to make a this type of query.

I try with python library request to make a http call:


import requests

url = 'http://192.XX.XX.XX:8088/ksql'

headers = {'accept': 'application/vnd.ksql.v1+json', 'content-type': 'application/json' }

payload = '{"query": "CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC="AVG5MINTEMP", KEY_FORMAT="JSON", PARTITIONS=1, REPLICAS=1) AS SELECT \

METEO_RAW.TYPE TYPE,\

METEO_RAW.NOME_UNITA NOME_UNITA, \

AVG(METEO_RAW.VALORE) AVG_VALUE \

FROM METEO_RAW METEO_RAW \

WINDOW TUMBLING ( SIZE 5 MINUTES ) \

GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA \

where METEO_RAW.NOME_UNITA = "Unita 1" AND \

METEO_RAW.ID_SENSORE = 2 \

EMIT CHANGES;", "streamsProperties": {}}'

r = requests.post(url, headers=headers, data=payload)

but I receive a 400 error:


DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): 192.XX.XX.XX:8088

DEBUG:urllib3.connectionpool:http://192.XX.XX.XX:8088 "POST /ksql HTTP/1.1" 400 656

What’s the full value of r.text ? The two DEBUG messages you’ve quoted don’t show the actual error. If I had to guess it’ll be the " in your SQL which are also closing the JSON, so you need to fix the escaping of those characters.

@rmoff as you suggested I try to escape ":

url = ‘http://192.20.102.18:8088/query
headers = {‘accept’: ‘application/vnd.ksqlapi.delimited.v1’, ‘content-type’: ‘application/json’ }
payload = ‘{“query”: "CREATE TABLE AVG5MINTEMP WITH (KAFKA_TOPIC=’\AVG5MINTEMP\’, KEY_FORMAT=\‘JSON\’, PARTITIONS=1, REPLICAS=1) AS SELECT
METEO_RAW.TYPE TYPE,
METEO_RAW.NOME_UNITA NOME_UNITA,
AVG(METEO_RAW.VALORE) AVG_VALUE
FROM METEO_RAW METEO_RAW
WINDOW TUMBLING ( SIZE 5 MINUTES )
GROUP BY METEO_RAW.TYPE, METEO_RAW.NOME_UNITA
where METEO_RAW.NOME_UNITA =\‘Unita 1\’ AND
METEO_RAW.ID_SENSORE = 2
EMIT CHANGES;", “streamsProperties”: {}}’

r = requests.post(url, headers=headers, data=payload)

and the debug responses:

DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): 192.20.102.18:8088
DEBUG:urllib3.connectionpool:http://192.20.102.18:8088 “POST /query HTTP/1.1” 406 0

if I print r.text is doesn’t give any information because is empty:

print(r.text)
‘’

Some other idea?
Ths.

This topic was automatically closed after 30 days. New replies are no longer allowed.