How to get min and max of split value?

I have a stream that centralizes the entire data of my topic in this form:

CREATE STREAM all_content (
  my_key_col STRUCT < `schema` STRUCT< `type` VARCHAR, `optional` BOOLEAN>, `payload` VARCHAR > KEY,
  `schema` VARCHAR,
  `payload` VARCHAR
) WITH (KAFKA_TOPIC = 'cb_bench_products-get_purge', FORMAT = 'JSON'); 

I create a new stream which will create a new topic which will take part of the data:

CREATE STREAM HISTORY_CONTENT 
  WITH (KAFKA_TOPIC='history_content', PARTITIONS=10, REPLICAS=1, FORMAT='JSON') 
  AS select * from ALL_CONTENT 
  WHERE my_key_col->`payload` LIKE 'history%';

Then when I do:

select my_key_col->`payload` from HISTORY_CONTENT;

It returns me:

+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1                                   |schema                                      |payload                                     | 
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::1    |null                                        |some_data_in_base_64                        | 
|history::05000228023411_RO_RO11219082::3    |null                                        |some_data_in_base_64                        |  
|history::05000228023411_RO_RO11219082::8    |null                                        |some_data_in_base_64                        | 
|history::04532000053245_FR_RO11219082::76   |null                                        |some_data_in_base_64                        | 
|history::04532000053245_FR_RO11219082::45   |null                                        |some_data_in_base_64                        | 
|history::04532000053245_FR_RO11219082::3    |null                                        |some_data_in_base_64                        | 
|history::09999999999911_UA_RO11219082::5    |null                                        |some_data_in_base_64                        | 
|history::09999999999911_UA_RO11219082::1    |null                                        |some_data_in_base_64                        | 
|history::09999999999911_UA_RO11219082::8    |null                                        |some_data_in_base_64                        |

I would like to create another stream that takes me the minimum value for a given “payload_1”, i.e.:

+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1                                   |schema                                      |payload                                     | 
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::1    |null                                        |some_data_in_base_64                        | 
|history::04532000053245_FR_RO11219082::3    |null                                        |some_data_in_base_64                        | 
|history::09999999999911_UA_RO11219082::1    |null                                        |some_data_in_base_64                        | 

And a stream with the maximum value:

+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|payload_1                                   |schema                                      |payload                                     | 
+--------------------------------------------+--------------------------------------------+--------------------------------------------+
|history::05000228023411_RO_RO11219082::8    |null                                        |some_data_in_base_64                        | 
|history::04532000053245_FR_RO11219082::76   |null                                        |some_data_in_base_64                        |  
|history::09999999999911_UA_RO11219082::8    |null                                        |some_data_in_base_64                        | 

I tried this but without success:

CREATE TABLE HISTORY_MIN WITH (KAFKA_TOPIC='history_min', PARTITIONS=10, REPLICAS=1, FORMAT='JSON') AS select SPLIT(my_key_col->`payload` , '::')[2], MIN(CAST(SPLIT(my_key_col->`payload` , '::')[3] AS BIGINT)) from HISTORY_CONTENT GROUP BY SPLIT(my_key_col->`payload` , '::')[2];

CREATE TABLE HISTORY_MAX WITH (KAFKA_TOPIC='history_max', PARTITIONS=10, REPLICAS=1, FORMAT='JSON') AS select SPLIT(my_key_col->`payload` , '::')[2], MAX(CAST(SPLIT(my_key_col->`payload` , '::')[3] AS BIGINT)) from HISTORY_CONTENT GROUP BY SPLIT(my_key_col->`payload` , '::')[2];

This topic was automatically closed after 30 days. New replies are no longer allowed.