Issue with Table-Table foreign key join

I have been following the example here for foreign key joins:

However, I want to expand it to a join by a struct. When I update the logic to use a struct to perform the join, the sample code works fine as shown below:

CREATE TABLE ordersstruct (
     id INT PRIMARY KEY,
     user_id Struct<u_id int, customer_id int>,
     value INT
   ) WITH (
     KAFKA_TOPIC = 'my-orders-topic', 
     VALUE_FORMAT = 'AVRO',
     PARTITIONS = 2
   );

CREATE TABLE usersstruct (
     u_id Struct<u_id INT, customer_id int> PRIMARY KEY,
     name VARCHAR,
     last_name VARCHAR
   ) WITH (
     KAFKA_TOPIC = 'my-users-topic', 
     VALUE_FORMAT = 'AVRO',
	 KEY_FORMAT = 'AVRO',
     PARTITIONS = 3
   );

CREATE TABLE orders_with_users_struct AS
SELECT * FROM ordersstruct JOIN usersstruct ON user_id = u_id
EMIT CHANGES;

INSERT INTO ordersstruct (id, user_id, value) VALUES (1, Struct(u_id := 1, customer_id := 1), 100);
INSERT INTO ordersstruct (id, user_id, value) VALUES (2, Struct(u_id := 1, customer_id := 1), 200);
INSERT INTO ordersstruct (id, user_id, value) VALUES (3, Struct(u_id := 1, customer_id := 3), 300);

INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(u_id := 1, customer_id := 1), 'John', 'Smith');
INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(u_id := 1, customer_id := 2), 'John2', 'Smith2');
INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(customer_id := 3, u_id := 1), 'John3', 'Smith3');

However, when I try to use a similar setup using our topics, then I get weird results. Our topics are generated by Debezium which I then stream to create the appropriate structure and then compute tables from the streams. A sample of the code is shown below:

CREATE STREAM careerplan_stream 
 WITH (KAFKA_TOPIC='careerplan', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);

create stream careerplan_wrap_foreignkey_stream as
SELECT Rowkey, STRUCT(personpk := cp.after->personfk, tenantkey := cp.rowkey->tenantkey) as personfk, cp.after->id
FROM careerplan_stream cp;

Create Table CAREERPLAN_TABLE
with (kafka_topic='CAREERPLAN_WRAP_FOREIGNKEY_STREAM', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);

select * from careerplan_wrap_foreignkey_stream emit changes;
select * from CAREERPLAN_TABLE emit changes;

Create Stream Person_Stream
 WITH (KAFKA_TOPIC='person', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);
 
CREATE STREAM PERSON_STREAM_SIMPLE AS
SELECT STRUCT(personpk := p.rowkey->personpk, tenantkey := p.rowkey->tenantkey), p.after->firstname, p.after->lastname
FROM PERSON_STREAM p
PARTITION BY STRUCT(personpk := p.rowkey->personpk, tenantkey := p.rowkey->tenantkey);

Create Table PERSON_TABLE
with (kafka_topic='PERSON_STREAM_SIMPLE', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);

select * from PERSON_TABLE emit changes;

CREATE Table CAREERPLAN_PERSON_TABLE AS
select cp.ROWKEY, p.rowkey->personpk, p.firstname, p.lastname, cp.id as careerplanid
from CAREERPLAN_TABLE cp LEFT JOIN
	PERSON_TABLE p ON p.RowKey = cp.personfk
	emit changes;

When I query the data in the tables, it looks correct.

However, when I try to do any type of join like a left or inner join then I don’t get the expected results which should be every record from careerplan is mapped to a person record:

Is there some mechanic of a table-table foreign key join that I’m not considering?

There are some alternatives (listed below) that I have tried where the join works correctly. However, I’d like to understand more on why the foreign key join doesn’t work in this case with a struct.

  1. Table-Table join using primary key by rekeying careerplan to have person key as primary key (won’t work if a user has more than one careerplan)
  2. Ignoring multi-tenancy and unwrapping the struct to do an integer comparison rather than struct comparison (ie. personfk = personpk) [won’t work if we need to put multiple tenant’s in one topic]

There is one alternative that I have tried that didn’t work:

  1. Attempting to join using struct notation like p.RowKey = Struct(personpk := cp.after->personfk, tenantkey := cp.rowkey->tenantkey) [I believe this is unsupported as the foreign key must be a column and can’t be an expression]

As you are using AVRO, you might hit: Foreign key join with Schema Registry enabled key format on the right-side table may miss results · Issue #8528 · confluentinc/ksql · GitHub

We are actively working on a fix for it.

Thanks for the response! I will keep an eye on this ticket and try again once it is fixed.

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.