KSQL - Add new field to existing struct

A stream contains struct and a string field

ksql> select * from t_accnt emit changes;
+----------------------------------------------+----------------------------------------------+
|ACCOUNT                                       |OPERATION                                     |
+----------------------------------------------+----------------------------------------------+
|{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40}   |null                                          |
|{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40}   |update                                        |

I want to achieve the below output in KSQL, basically adding a new field to existing structure (or create new structure)

{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40,OPERATION=update}

Here’s one way to do it, although it’s not particularly elegant:

-- Set up test stream
CREATE STREAM TMP (ACCOUNT STRUCT<ID INT, A1 INT, A2 INT>,
                   OPERATION VARCHAR) 
  WITH (KAFKA_TOPIC='test', FORMAT='JSON', PARTITIONS=1);

INSERT INTO TMP VALUES (STRUCT(ID:=1,A1:=2,A2:=3),'UPDATE');
INSERT INTO TMP VALUES (STRUCT(ID:=2,A1:=22,A2:=23),'INSERT');

-- Query data

ksql> SELECT * FROM TMP EMIT CHANGES LIMIT 2;
+----------------------+----------+
|ACCOUNT               |OPERATION |
+----------------------+----------+
|{ID=1, A1=2, A2=3}    |UPDATE    |
|{ID=2, A1=22, A2=23}  |INSERT    |
Limit Reached
Query terminated

-- Add `OPERATION` into `ACCOUNT` struct
ksql> SELECT STRUCT("ID":=ACCOUNT->ID,
                    "A1":=ACCOUNT->A1,
                    "A2":=ACCOUNT->A2,
                    "OPERATION":=OPERATION) AS ACCOUNT 
         FROM TMP 
       EMIT CHANGES LIMIT 2;
+---------------------------------------+
|ACCOUNT                                |
+---------------------------------------+
|{ID=1, A1=2, A2=3, OPERATION=UPDATE}   |
|{ID=2, A1=22, A2=23, OPERATION=INSERT} |
Limit Reached
Query terminated

Hi Robin,

Thank you very much for this , Actually I already tried this forming new struct with limited source struct fields but in my real case i have 50+ fields in source struct and may schema change in future.

Is there any way to explode source struct like ACCNT->* which helps easy to from new struct

Any other alternative /suggestion apart from UDF for this.

You can follow this issue: Add capability to alter a struct in query · Issue #6743 · confluentinc/ksql · GitHub

Sure Many Thanks Robin !!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.