I am creating a table with CREATE TABLE k_events WITH (KAFKA_TOPIC='events', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') as SELECT id, AS_VALUE(id) as message_id, COLLECT_LIST(output) as output from k_merged WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id emit changes;
The k_merged stream is created as CREATE STREAM k_merged (id string key, meta STRUCT<...>, output STRUCT<...>) WITH (KAFKA_TOPIC='merged', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
Messages are inserted into k_merged stream with INSERT INTO k_merged SELECT id, meta, STRUCT(....) as output from k_data; The k_data stream is created over an existing topic data where the key is a message ID that used String serdes. The message ID is actually a UUID.
When I select rows from k_events, the id column is appended with junk characters. I can confirm the same when I do a kafkacat against the underlying topic events. I can confirm that, merged and data topics have keys as UUID with no junk data.
Interestingly, when I select as_value(id) the junk is not seen.
Nothing is jumping out at me. If you can provide repro steps I’ll take a closer look. e.g., will this happen with a minimal repro where you create a stream, populate it with an INSERT statement, and then run a CREATE TABLE AS SELECT that groups by id? If not, at what point between that and your series of queries do the junk characters show up?
Removed KEY_FORMAT='KAFKA' everywhere. This probably does not have any effect because key format will default to ksql.persistence.default.format.key that itself defaults to KAFKA.
Ensured that the id column is always STRING and not VARCHAR(STRING) anywhere.
The backing topics for the CREATE STREAM do not have the junk characters. The backing topic of the CREATE TABLE AS SELECT shows the junk. I specifically mention the topic here because the ksql queries do not show the junk.
I can repro this and shared it internally at Confluent to dig into it. It looks like a bug to me.
Here is the repro:
CREATE STREAM base (id STRING KEY, val STRING) WITH (KAFKA_TOPIC='base', PARTITIONS=1, KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
CREATE TABLE derived WITH (KAFKA_TOPIC='derived', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') AS SELECT id, COUNT(*) FROM base WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id EMIT CHANGES;
INSERT INTO base(id, val) VALUES ('hello', 'dave');
INSERT INTO base(id, val) VALUES ('hello', 'chainhead');
kcat on base and derived topics:
$ kcat -b localhost:9092 -t base -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello,{"VAL":"dave"}
hello,{"VAL":"chainhead"}
% Reached end of topic base [0] at offset 2
$ kcat -b localhost:9092 -t derived -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello�
n,{"KSQL_COL_0":1}
hello�
nZ�,{"KSQL_COL_0":1}
% Reached end of topic derived [0] at offset 2
@chainhead turns out that this is expected behavior and not junk characters. For a windowed aggregation the key is made up of the original key plus the window itself. I’m not finding a place in ksqlDB docs that mentions this, but Kafka Streams Windowed doc applies to this scenario:
If a KStream gets grouped and aggregated using a window-aggregation the resulting KTable is a so-called “windowed KTable” with a combined key type that encodes the corresponding aggregation window and the original record key.