A stream contains struct and a string field
ksql> select * from t_accnt emit changes;
+----------------------------------------------+----------------------------------------------+
|ACCOUNT |OPERATION |
+----------------------------------------------+----------------------------------------------+
|{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40} |null |
|{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40} |update |
I want to achieve the below output in KSQL, basically adding a new field to existing structure (or create new structure)
{ACCNT_ID=1234, A1=10, A2=20, A3=30, A4=40,OPERATION=update}
rmoff
25 February 2021 13:32
2
Here’s one way to do it, although it’s not particularly elegant:
-- Set up test stream
CREATE STREAM TMP (ACCOUNT STRUCT<ID INT, A1 INT, A2 INT>,
OPERATION VARCHAR)
WITH (KAFKA_TOPIC='test', FORMAT='JSON', PARTITIONS=1);
INSERT INTO TMP VALUES (STRUCT(ID:=1,A1:=2,A2:=3),'UPDATE');
INSERT INTO TMP VALUES (STRUCT(ID:=2,A1:=22,A2:=23),'INSERT');
-- Query data
ksql> SELECT * FROM TMP EMIT CHANGES LIMIT 2;
+----------------------+----------+
|ACCOUNT |OPERATION |
+----------------------+----------+
|{ID=1, A1=2, A2=3} |UPDATE |
|{ID=2, A1=22, A2=23} |INSERT |
Limit Reached
Query terminated
-- Add `OPERATION` into `ACCOUNT` struct
ksql> SELECT STRUCT("ID":=ACCOUNT->ID,
"A1":=ACCOUNT->A1,
"A2":=ACCOUNT->A2,
"OPERATION":=OPERATION) AS ACCOUNT
FROM TMP
EMIT CHANGES LIMIT 2;
+---------------------------------------+
|ACCOUNT |
+---------------------------------------+
|{ID=1, A1=2, A2=3, OPERATION=UPDATE} |
|{ID=2, A1=22, A2=23, OPERATION=INSERT} |
Limit Reached
Query terminated
Hi Robin,
Thank you very much for this , Actually I already tried this forming new struct with limited source struct fields but in my real case i have 50+ fields in source struct and may schema change in future.
Is there any way to explode source struct like ACCNT->* which helps easy to from new struct
Any other alternative /suggestion apart from UDF for this.
rmoff
25 February 2021 15:57
4
Sure Many Thanks Robin !!
system
Closed
4 March 2021 16:09
6
This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.