ksqlDB Table - Append new values to existing column as output of query

This is an alternative take on the below posted question

Given a stream and inputs

CREATE STREAM VEHICLE_LOCATION (
    VIN VARCHAR,
    LOCATION_NAME VARCHAR
) WITH (
    KAFKA_TOPIC = 'vehicle-location',
    FORMAT = 'JSON',
    PARTITIONS = 3
);

## insert few values into the stream
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');

How can i write a query where result table is something like below (with the last row reflecting the current state)? This is along the lines of

  • Read a value from the stream
  • Join it with the table and append the new stream value to the same table’s column (not sure if ksqlDB supports this)
+------------------------------+-------------------------------------------------------------+
|VIN                           |LOCATION_NAME                                                |
+------------------------------+-------------------------------------------------------------+
|2G1WL54T4R9165225             |DALLAS                                                       |
|2G1WL54T4R9165225             |DALLAS, DALLAS                                               |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS                                       |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON                              |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON, HOUSTON,                    |
|2G1WL54T4R9165225             |DALLAS, DALLAS, DALLAS, HOUSTON, HOUSTON,  HOUSTON, HOUSTON  |

Much appreciated

Your query is basically an aggregation. You can use the collect_list aggregation function:

CREATE TABLE result AS
  SELECT vin, COLLECT_LIST(location_name)
  FROM vehicle_location
  GROUP BY vin;

Cf ksqlDB Aggregate Functions - ksqlDB Documentation

Note that a list-aggregation creates larger-and-larger rows over time, what could become an issue, because Kafka (and the producer) applies some (configurable) limitation on the record size.

Thank you very much for your time and the solution.