We have 2 compact topics (C_USERS [key - AID] and A_USERS [key - account_name]) both having AVRO Key & Value Schema with “record” type, and we want to do left join, take some attributes from A_USERS topic and put the data to target compacted topic (USERS_ENRICHED [key - AID])
And I am following this thread as reference.
Please Note: The “A_USERS” topic has been produced by a Transactional Producer.
Please find the steps we performed:
A) For Topic: C_USERS
CREATE OR REPLACE STREAM DEV1_KSTREAM_C_USERS WITH (KAFKA_TOPIC='dev-1-C_USERS', FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_SCHEMA_ID=xxx);
CREATE OR REPLACE STREAM DEV1_KSTREAM_C_USERS_REKEYED WITH (KAFKA_TOPIC='dev-1-C_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx) AS SELECT STRUCT(AID := ROWKEY->AID) AS ROWKEY,...<column_names> FROM DEV1_KSTREAM_C_USERS PARTITION BY STRUCT(AID := ROWKEY->AID) EMIT CHANGES;
CREATE OR REPLACE TABLE DEV1_KTABLE_C_USERS WITH (KAFKA_TOPIC='dev-1-C_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx);
B) For Topic: A_USERS
SET 'processing.guarantee' = 'exactly_once_v2';
SET 'consumer.isolation.level' = 'read_committed';
CREATE OR REPLACE STREAM DEV1_KSTREAM_A_USERS WITH (KAFKA_TOPIC='dev-1-A_USERS', FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_SCHEMA_ID=xxx);
CREATE OR REPLACE STREAM DEV1_KSTREAM_A_USERS_REKEYED WITH (KAFKA_TOPIC='dev-1-A_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx) AS SELECT STRUCT(`account_name` := ROWKEY->`account_name`) AS ROWKEY,..., <column_names> FROM DEV1_KSTREAM_A_USERS PARTITION BY STRUCT(`account_name` := ROWKEY->`account_name`) EMIT CHANGES;
CREATE OR REPLACE TABLE DEV1_KTABLE_A_USERS WITH (KAFKA_TOPIC='dev-1-A_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx);
C) Now Join 2 KTables
CREATE OR REPLACE TABLE
DEV1_KDEV_C_USERS_EVENTS_TABLE_KSQL_C_USERS_A_USERS
WITH (
KAFKA_TOPIC='dev-1-USERS_ENRICHED',
PARTITIONS=1,
REPLICAS=3,
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED',
VALUE_SCHEMA_ID=xxx,
KEY_FORMAT='AVRO',
KEY_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED_KEY',
KEY_SCHEMA_ID=xxx
)
AS
SELECT
KT1.ROWKEY AS ROWKEY,
KT1.AID AS AID,
KT1.EMAIL_ADDRESS AS EMAIL_ADDRESS,
KT1.PROFILE_CODE AS PROFILE_CODE,
KT1.IS_DISABLED AS IS_DISABLED,
KT2.`given_name` AS FIRST_NAME,
KT2.`sn` AS LAST_NAME
FROM
DEV1_KTABLE_C_USERS KT1
LEFT OUTER JOIN
DEV1_KTABLE_A_USERS KT2
ON
STRUCT(AID := KT1.ROWKEY->AID) = STRUCT(`account_name` := KT2.ROWKEY->`account_name`)
EMIT CHANGES;
When I am trying to execute the JOIN query, I am getting the below error:
Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got STRUCT(AID:=KT1.ROWKEY->ADID) = STRUCT(account_name:=KT2.ROWKEY->account_name).
And, if I join with the below changes -
CREATE OR REPLACE TABLE DEV1_KDEV_C_USERS_EVENTS_TABLE_KSQL_C_USERS_A_USERS
WITH (
KAFKA_TOPIC='dev-1-USERS_ENRICHED',
PARTITIONS=1,
REPLICAS=3,
VALUE_FORMAT='AVRO',
VALUE_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED',
VALUE_SCHEMA_ID=xxx,
KEY_FORMAT='AVRO',
KEY_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED_KEY',
KEY_SCHEMA_ID=xxx
)
AS
SELECT
KT1.ROWKEY AS ROWKEY,
KT1.AID AS AID,
KT1.EMAIL_ADDRESS AS EMAIL_ADDRESS,
KT1.PROFILE_CODE AS PROFILE_CODE,
KT1.IS_DISABLED AS IS_DISABLED,
KT2.`given_name` AS FIRST_NAME,
KT2.`sn` AS LAST_NAME
FROM
DEV1_KTABLE_C_USERS KT1
LEFT OUTER JOIN
DEV1_KTABLE_A_USERS KT2
ON
--STRUCT(AID := KT1.ROWKEY->AID) = STRUCT(`account_name` := KT2.ROWKEY->`account_name`)
KT1.ROWKEY=KT2.ROWKEY
EMIT CHANGES;
And I am getting error:
Could not determine output schema for query due to error: Invalid join condition: types don’t match. Got KT1.ROWKEY{STRUCT<
AID
STRING>} = KT2.ROWKEY{STRUCT<account_name
STRING>}
Am I missing something here? Kindly help!
PLEASE NOTE: It’s working fine while I am doing the join on Primary Keys with Primitive Datatype