I have a stream that centralizes the entire data of my topic in this form:
CREATE STREAM all_content (
my_key_col STRUCT < `schema` STRUCT< `type` VARCHAR, `optional` BOOLEAN>, `payload` VARCHAR > KEY,
`schema` VARCHAR,
`payload` VARCHAR
) WITH (KAFKA_TOPIC = 'cb_bench_products-get_purge', FORMAT = 'JSON');
I create a new stream which will create a new topic which will take part of the data:
CREATE STREAM HISTORY_CONTENT
WITH (KAFKA_TOPIC='history_content', PARTITIONS=10, REPLICAS=1, FORMAT='JSON')
AS select * from ALL_CONTENT
WHERE my_key_col->`payload` LIKE 'history%';
Then when I do:
select my_key_col->`payload` from HISTORY_CONTENT;
It returns me:
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1 |schema |payload |
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::1 |null |some_data_in_base_64 |
|history::05000228023411_RO_RO11219082::3 |null |some_data_in_base_64 |
|history::05000228023411_RO_RO11219082::8 |null |some_data_in_base_64 |
|history::04532000053245_FR_RO11219082::76 |null |some_data_in_base_64 |
|history::04532000053245_FR_RO11219082::45 |null |some_data_in_base_64 |
|history::04532000053245_FR_RO11219082::3 |null |some_data_in_base_64 |
|history::09999999999911_UA_RO11219082::5 |null |some_data_in_base_64 |
|history::09999999999911_UA_RO11219082::1 |null |some_data_in_base_64 |
|history::09999999999911_UA_RO11219082::8 |null |some_data_in_base_64 |
I would like to create another stream that takes me the minimum value for a given “payload_1”, i.e.:
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1 |schema |payload |
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::1 |null |some_data_in_base_64 |
|history::04532000053245_FR_RO11219082::3 |null |some_data_in_base_64 |
|history::09999999999911_UA_RO11219082::1 |null |some_data_in_base_64 |
And a stream with the maximum value:
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1 |schema |payload |
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::8 |null |some_data_in_base_64 |
|history::04532000053245_FR_RO11219082::76 |null |some_data_in_base_64 |
|history::09999999999911_UA_RO11219082::8 |null |some_data_in_base_64 |
I tried this but without success:
CREATE TABLE HISTORY_MIN WITH (KAFKA_TOPIC='history_min', PARTITIONS=10, REPLICAS=1, FORMAT='JSON') AS select SPLIT(my_key_col->`payload` , '::')[2], MIN(CAST(SPLIT(my_key_col->`payload` , '::')[3] AS BIGINT)) from HISTORY_CONTENT GROUP BY SPLIT(my_key_col->`payload` , '::')[2];
CREATE TABLE HISTORY_MAX WITH (KAFKA_TOPIC='history_max', PARTITIONS=10, REPLICAS=1, FORMAT='JSON') AS select SPLIT(my_key_col->`payload` , '::')[2], MAX(CAST(SPLIT(my_key_col->`payload` , '::')[3] AS BIGINT)) from HISTORY_CONTENT GROUP BY SPLIT(my_key_col->`payload` , '::')[2];