I have a KSQL stream ip_locator that reads data from a Kafka topic locator and selects two fields (ip and uaid) from the geolocation and user nested structures, respectively. This stream works fine and emits AVRO-formatted.
ksql> print 'locator' FROM BEGINNING LIMIT 1;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2022/04/4 11:13:33.131 Z, key: <null>, value: {"IP": "10.10.10.10", "UAID": "ff6204b2f1bf11eda05b0242ac120003"}, partition: 3
Topic printing ceased
CREATE STREAM ip_locator WITH(VALUE_FORMAT='AVRO',KEY_FORMAT='KAFKA',
KAFKA_TOPIC='locator',PARTITIONS=4, REPLICAS=4) AS
SELECT
geolocation->ip as ip,
user->uaid as uaid
FROM input
EMIT CHANGES;
Now I want to create a new stream ip_key that uses the same data from ip_locator but sets the Kafka topic key as a composite structure with the ip and uaid fields. For this, I use the STRUCT function to build the key.
CREATE STREAM ip_key
WITH (FORMAT='AVRO', KAFKA_TOPIC='locator.key', PARTITIONS=4,REPLICAS=4) AS
SELECT
STRUCT("ip":= ip, "uaid" := uaid) KEY,
AS_VALUE(ip) as IP,
AS_VALUE(uaid) as uaid,
FROM ip_locator
PARTITION BY STRUCT("ip":= ip, "uaid" := uaid)
EMIT CHANGES;
However, when I try to print the Kafka topic locator.key that corresponds to the new stream, I get an error message saying that the key format is not recognized
ksql> print 'locator.key' FROM BEGINNING LIMIT 3;
Key format: does not match any supported format. It may be a STRING with encoding other than UTF8, or some other format.
Value format: AVRO
rowtime: 2022/04/4 11:13:33.131 Z, key: \x00\x00\x00\x00\x94\x02\x1C10.10.10.10\x02Pff6204b2f1bf11eda05b0242ac120003, value: {"IP": "10.10.10.10", "UAID": "ff6204b2f1bf11eda05b0242ac120003"}, partition: 3
Topic printing ceased
Thank you for the help.