Junk in key of a table created from other streams

I am creating a table with CREATE TABLE k_events WITH (KAFKA_TOPIC='events', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') as SELECT id, AS_VALUE(id) as message_id, COLLECT_LIST(output) as output from k_merged WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id emit changes;

The k_merged stream is created as CREATE STREAM k_merged (id string key, meta STRUCT<...>, output STRUCT<...>) WITH (KAFKA_TOPIC='merged', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

Messages are inserted into k_merged stream with INSERT INTO k_merged SELECT id, meta, STRUCT(....) as output from k_data; The k_data stream is created over an existing topic data where the key is a message ID that used String serdes. The message ID is actually a UUID.

When I select rows from k_events, the id column is appended with junk characters. I can confirm the same when I do a kafkacat against the underlying topic events. I can confirm that, merged and data topics have keys as UUID with no junk data.

Interestingly, when I select as_value(id) the junk is not seen.

Why is message key appended with junk characters?

dtroiano any thoughts?

Nothing is jumping out at me. If you can provide repro steps I’ll take a closer look. e.g., will this happen with a minimal repro where you create a stream, populate it with an INSERT statement, and then run a CREATE TABLE AS SELECT that groups by id? If not, at what point between that and your series of queries do the junk characters show up?

Here is what I have tried so far. No joy.

  1. Removed KEY_FORMAT='KAFKA' everywhere. This probably does not have any effect because key format will default to ksql.persistence.default.format.key that itself defaults to KAFKA.
  2. Ensured that the id column is always STRING and not VARCHAR(STRING) anywhere.

Here are the inconsistencies I observe.

  1. When I do len(id) for the final table, I see output as 36. But, with kafkacat or with ( provectus/kafka-ui: Open-Source Web UI for Apache Kafka Management (github.com)), I see the length as 44 i.e. 8 extra bytes are appended.
  2. When I do a select id on the ksql prompt in the ksqldb container, I don’t see the extra bytes. I can see them with kafkacat and the Kakfa UI.

Thinking of a simple repro for you…

The backing topics for the CREATE STREAM do not have the junk characters. The backing topic of the CREATE TABLE AS SELECT shows the junk. I specifically mention the topic here because the ksql queries do not show the junk.

I looked into the key of one message and see the extra bytes as 01 EF BF BD 20 5E 5D EF BF BD. I pasted them into ASCII to Hex | Text to Hex Code Converter (rapidtables.com) and set to character encoding to UTF-8 (because we are in 2024?). Also, lazy to write a consumer application to hex parse the key. Anyway, according to this SO question What is the format or encoding of a file with data like this? - Stack Overflow, the EF BF BD is a replacement character.

Who and why is causing the replacement…

I can repro this and shared it internally at Confluent to dig into it. It looks like a bug to me.

Here is the repro:

CREATE STREAM base (id STRING KEY, val STRING) WITH (KAFKA_TOPIC='base', PARTITIONS=1, KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

CREATE TABLE derived WITH (KAFKA_TOPIC='derived', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') AS SELECT id, COUNT(*) FROM base WINDOW TUMBLING (SIZE 5 SECOND) GROUP BY id EMIT CHANGES;

INSERT INTO base(id, val) VALUES ('hello', 'dave');
INSERT INTO base(id, val) VALUES ('hello', 'chainhead');

kcat on base and derived topics:

$  kcat -b localhost:9092 -t base -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello,{"VAL":"dave"}
hello,{"VAL":"chainhead"}
% Reached end of topic base [0] at offset 2

$  kcat -b localhost:9092 -t derived -K,
% Auto-selecting Consumer mode (use -P or -C to override)
hello�
n,{"KSQL_COL_0":1}
hello�
nZ�,{"KSQL_COL_0":1}
% Reached end of topic derived [0] at offset 2

@chainhead turns out that this is expected behavior and not junk characters. For a windowed aggregation the key is made up of the original key plus the window itself. I’m not finding a place in ksqlDB docs that mentions this, but Kafka Streams Windowed doc applies to this scenario:

If a KStream gets grouped and aggregated using a window-aggregation the resulting KTable is a so-called “windowed KTable” with a combined key type that encodes the corresponding aggregation window and the original record key.

Much appreciated, @dtroiano for the repro and description!

Since my key is a UUID, for my downstream applications, I will take the first 36 bytes to trim the key.

Thank you!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.