I am creating a table with CREATE TABLE k_events WITH (KAFKA_TOPIC='events', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') as SELECT id, AS_VALUE(id) as message_id, COLLECT_LIST(output) as output from k_merged WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id emit changes;
The k_merged stream is created as CREATE STREAM k_merged (id string key, meta STRUCT<...>, output STRUCT<...>) WITH (KAFKA_TOPIC='merged', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
Messages are inserted into k_merged stream with INSERT INTO k_merged SELECT id, meta, STRUCT(....) as output from k_data; The k_data stream is created over an existing topic data where the key is a message ID that used String serdes. The message ID is actually a UUID.
When I select rows from k_events, the id column is appended with junk characters. I can confirm the same when I do a kafkacat against the underlying topic events. I can confirm that, merged and data topics have keys as UUID with no junk data.
Interestingly, when I select as_value(id) the junk is not seen.
Why is message key appended with junk characters?
Nothing is jumping out at me. If you can provide repro steps I’ll take a closer look. e.g., will this happen with a minimal repro where you create a stream, populate it with an INSERT statement, and then run a CREATE TABLE AS SELECT that groups by id? If not, at what point between that and your series of queries do the junk characters show up?
Here is what I have tried so far. No joy.
- Removed
KEY_FORMAT='KAFKA' everywhere. This probably does not have any effect because key format will default to ksql.persistence.default.format.key that itself defaults to KAFKA.
- Ensured that the
id column is always STRING and not VARCHAR(STRING) anywhere.
Here are the inconsistencies I observe.
- When I do
len(id) for the final table, I see output as 36. But, with kafkacat or with ( provectus/kafka-ui: Open-Source Web UI for Apache Kafka Management (github.com)), I see the length as 44 i.e. 8 extra bytes are appended.
- When I do a
select id on the ksql prompt in the ksqldb container, I don’t see the extra bytes. I can see them with kafkacat and the Kakfa UI.
Thinking of a simple repro for you…
The backing topics for the CREATE STREAM do not have the junk characters. The backing topic of the CREATE TABLE AS SELECT shows the junk. I specifically mention the topic here because the ksql queries do not show the junk.
I looked into the key of one message and see the extra bytes as 01 EF BF BD 20 5E 5D EF BF BD. I pasted them into ASCII to Hex | Text to Hex Code Converter (rapidtables.com) and set to character encoding to UTF-8 (because we are in 2024?). Also, lazy to write a consumer application to hex parse the key. Anyway, according to this SO question What is the format or encoding of a file with data like this? - Stack Overflow, the EF BF BD is a replacement character.
Who and why is causing the replacement…
I can repro this and shared it internally at Confluent to dig into it. It looks like a bug to me.
Here is the repro:
CREATE STREAM base (id STRING KEY, val STRING) WITH (KAFKA_TOPIC='base', PARTITIONS=1, KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');
CREATE TABLE derived WITH (KAFKA_TOPIC='derived', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') AS SELECT id, COUNT(*) FROM base WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id EMIT CHANGES;
INSERT INTO base(id, val) VALUES ('hello', 'dave');
INSERT INTO base(id, val) VALUES ('hello', 'chainhead');
kcat on base and derived topics:
$ kcat -b localhost:9092 -t base -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello,{"VAL":"dave"}
hello,{"VAL":"chainhead"}
% Reached end of topic base [0] at offset 2
$ kcat -b localhost:9092 -t derived -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello�
n,{"KSQL_COL_0":1}
hello�
nZ�,{"KSQL_COL_0":1}
% Reached end of topic derived [0] at offset 2
@chainhead turns out that this is expected behavior and not junk characters. For a windowed aggregation the key is made up of the original key plus the window itself. I’m not finding a place in ksqlDB docs that mentions this, but Kafka Streams Windowed doc applies to this scenario:
If a KStream gets grouped and aggregated using a window-aggregation the resulting KTable is a so-called “windowed KTable” with a combined key type that encodes the corresponding aggregation window and the original record key.
Much appreciated, @dtroiano for the repro and description!
Since my key is a UUID, for my downstream applications, I will take the first 36 bytes to trim the key.
Thank you!