KsqlDB Table-Table join on STRUCT Type Primary Key

We have 2 compact topics (C_USERS [key - AID] and A_USERS [key - account_name]) both having AVRO Key & Value Schema with “record” type, and we want to do left join, take some attributes from A_USERS topic and put the data to target compacted topic (USERS_ENRICHED [key - AID])

And I am following this thread as reference.

Please Note: The “A_USERS” topic has been produced by a Transactional Producer.

Please find the steps we performed:

A) For Topic: C_USERS

CREATE OR REPLACE STREAM DEV1_KSTREAM_C_USERS WITH (KAFKA_TOPIC='dev-1-C_USERS', FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_SCHEMA_ID=xxx);
CREATE OR REPLACE STREAM DEV1_KSTREAM_C_USERS_REKEYED WITH (KAFKA_TOPIC='dev-1-C_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx) AS SELECT STRUCT(AID := ROWKEY->AID) AS ROWKEY,...<column_names> FROM DEV1_KSTREAM_C_USERS PARTITION BY STRUCT(AID := ROWKEY->AID) EMIT CHANGES;
CREATE OR REPLACE TABLE DEV1_KTABLE_C_USERS WITH (KAFKA_TOPIC='dev-1-C_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx);

B) For Topic: A_USERS

SET 'processing.guarantee' = 'exactly_once_v2';
SET 'consumer.isolation.level' = 'read_committed';
CREATE OR REPLACE STREAM DEV1_KSTREAM_A_USERS WITH (KAFKA_TOPIC='dev-1-A_USERS', FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_SCHEMA_ID=xxx);
CREATE OR REPLACE STREAM DEV1_KSTREAM_A_USERS_REKEYED WITH (KAFKA_TOPIC='dev-1-A_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx) AS SELECT STRUCT(`account_name` := ROWKEY->`account_name`) AS ROWKEY,..., <column_names> FROM DEV1_KSTREAM_A_USERS PARTITION BY STRUCT(`account_name` := ROWKEY->`account_name`) EMIT CHANGES;
CREATE OR REPLACE TABLE DEV1_KTABLE_A_USERS WITH (KAFKA_TOPIC='dev-1-A_USERS_REKEYED', VALUE_FORMAT='AVRO', VALUE_SCHEMA_ID=xxx, KEY_FORMAT='AVRO', KEY_SCHEMA_ID=xxx);

C) Now Join 2 KTables

CREATE OR REPLACE TABLE 
    DEV1_KDEV_C_USERS_EVENTS_TABLE_KSQL_C_USERS_A_USERS
    WITH (
      KAFKA_TOPIC='dev-1-USERS_ENRICHED',
       PARTITIONS=1,
    REPLICAS=3,
    VALUE_FORMAT='AVRO',
    VALUE_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED',
	VALUE_SCHEMA_ID=xxx,
	KEY_FORMAT='AVRO',
	KEY_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED_KEY',
	KEY_SCHEMA_ID=xxx
  )
AS
  SELECT
    KT1.ROWKEY AS ROWKEY,
    KT1.AID AS AID,
	KT1.EMAIL_ADDRESS AS EMAIL_ADDRESS,
	KT1.PROFILE_CODE AS PROFILE_CODE,
	KT1.IS_DISABLED AS IS_DISABLED,
    KT2.`given_name` AS FIRST_NAME,
	KT2.`sn` AS LAST_NAME
  FROM
     DEV1_KTABLE_C_USERS KT1
  LEFT OUTER JOIN
    DEV1_KTABLE_A_USERS KT2
  ON
    STRUCT(AID := KT1.ROWKEY->AID) = STRUCT(`account_name` := KT2.ROWKEY->`account_name`)
EMIT CHANGES;

When I am trying to execute the JOIN query, I am getting the below error:

Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got STRUCT(AID:=KT1.ROWKEY->ADID) = STRUCT(account_name:=KT2.ROWKEY->account_name).

And, if I join with the below changes -

CREATE OR REPLACE TABLE DEV1_KDEV_C_USERS_EVENTS_TABLE_KSQL_C_USERS_A_USERS
  WITH (
    KAFKA_TOPIC='dev-1-USERS_ENRICHED',
    PARTITIONS=1,
    REPLICAS=3,
    VALUE_FORMAT='AVRO',
    VALUE_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED',
	VALUE_SCHEMA_ID=xxx,
	KEY_FORMAT='AVRO',
	KEY_SCHEMA_FULL_NAME='c.z.kafka.USERS_ENRICHED_KEY',
	KEY_SCHEMA_ID=xxx
  )
AS
  SELECT
    KT1.ROWKEY AS ROWKEY,
    KT1.AID AS AID,
	KT1.EMAIL_ADDRESS AS EMAIL_ADDRESS,
	KT1.PROFILE_CODE AS PROFILE_CODE,
	KT1.IS_DISABLED AS IS_DISABLED,
    KT2.`given_name` AS FIRST_NAME,
	KT2.`sn` AS LAST_NAME
  FROM
     DEV1_KTABLE_C_USERS KT1
  LEFT OUTER JOIN
    DEV1_KTABLE_A_USERS KT2
  ON
    --STRUCT(AID := KT1.ROWKEY->AID) = STRUCT(`account_name` := KT2.ROWKEY->`account_name`)
	KT1.ROWKEY=KT2.ROWKEY
EMIT CHANGES;

And I am getting error:

Could not determine output schema for query due to error: Invalid join condition: types don’t match. Got KT1.ROWKEY{STRUCT<AID STRING>} = KT2.ROWKEY{STRUCT<account_name STRING>}

Am I missing something here? Kindly help!

PLEASE NOTE: It’s working fine while I am doing the join on Primary Keys with Primitive Datatype

This topic was automatically closed after 30 days. New replies are no longer allowed.