ksqlDB Table - Append new values to existing column as output of query

This is an alternative take on the below posted question

Given a stream and inputs

CREATE STREAM VEHICLE_LOCATION (
    VIN VARCHAR,
    LOCATION_NAME VARCHAR
) WITH (
    KAFKA_TOPIC = 'vehicle-location',
    FORMAT = 'JSON',
    PARTITIONS = 3
);

## insert few values into the stream
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');

How can i write a query where result table is something like below (with the last row reflecting the current state)? This is along the lines of

  • Read a value from the stream
  • Join it with the table and append the new stream value to the same table’s column (not sure if ksqlDB supports this)
+------------------------------+-------------------------------------------------------------+
|VIN                           |LOCATION_NAME                                                |
+------------------------------+-------------------------------------------------------------+
|2G1WL54T4R9165225             |DALLAS                                                       |
|2G1WL54T4R9165225             |DALLAS, DALLAS                                               |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS                                       |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON                              |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON, HOUSTON,                    |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON, HOUSTON,  HOUSTON, HOUSTON  |

Much appreciated

Your query is basically an aggregation. You can use the collect_list aggregation function:

CREATE TABLE result AS
  SELECT vin, COLLECT_LIST(location_name)
  FROM vehicle_location
  GROUP BY vin;

Cf ksqlDB Aggregate Functions - ksqlDB Documentation

Note that a list-aggregation creates larger-and-larger rows over time, what could become an issue, because Kafka (and the producer) applies some (configurable) limitation on the record size.

1 Like

Thank you very much for your time and the solution.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.