I have been following the example here for foreign key joins:
However, I want to expand it to a join by a struct. When I update the logic to use a struct to perform the join, the sample code works fine as shown below:
CREATE TABLE ordersstruct (
id INT PRIMARY KEY,
user_id Struct<u_id int, customer_id int>,
value INT
) WITH (
KAFKA_TOPIC = 'my-orders-topic',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 2
);
CREATE TABLE usersstruct (
u_id Struct<u_id INT, customer_id int> PRIMARY KEY,
name VARCHAR,
last_name VARCHAR
) WITH (
KAFKA_TOPIC = 'my-users-topic',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO',
PARTITIONS = 3
);
CREATE TABLE orders_with_users_struct AS
SELECT * FROM ordersstruct JOIN usersstruct ON user_id = u_id
EMIT CHANGES;
INSERT INTO ordersstruct (id, user_id, value) VALUES (1, Struct(u_id := 1, customer_id := 1), 100);
INSERT INTO ordersstruct (id, user_id, value) VALUES (2, Struct(u_id := 1, customer_id := 1), 200);
INSERT INTO ordersstruct (id, user_id, value) VALUES (3, Struct(u_id := 1, customer_id := 3), 300);
INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(u_id := 1, customer_id := 1), 'John', 'Smith');
INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(u_id := 1, customer_id := 2), 'John2', 'Smith2');
INSERT INTO usersstruct (u_id, name, last_name) VALUES (Struct(customer_id := 3, u_id := 1), 'John3', 'Smith3');
However, when I try to use a similar setup using our topics, then I get weird results. Our topics are generated by Debezium which I then stream to create the appropriate structure and then compute tables from the streams. A sample of the code is shown below:
CREATE STREAM careerplan_stream
WITH (KAFKA_TOPIC='careerplan', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);
create stream careerplan_wrap_foreignkey_stream as
SELECT Rowkey, STRUCT(personpk := cp.after->personfk, tenantkey := cp.rowkey->tenantkey) as personfk, cp.after->id
FROM careerplan_stream cp;
Create Table CAREERPLAN_TABLE
with (kafka_topic='CAREERPLAN_WRAP_FOREIGNKEY_STREAM', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);
select * from careerplan_wrap_foreignkey_stream emit changes;
select * from CAREERPLAN_TABLE emit changes;
Create Stream Person_Stream
WITH (KAFKA_TOPIC='person', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);
CREATE STREAM PERSON_STREAM_SIMPLE AS
SELECT STRUCT(personpk := p.rowkey->personpk, tenantkey := p.rowkey->tenantkey), p.after->firstname, p.after->lastname
FROM PERSON_STREAM p
PARTITION BY STRUCT(personpk := p.rowkey->personpk, tenantkey := p.rowkey->tenantkey);
Create Table PERSON_TABLE
with (kafka_topic='PERSON_STREAM_SIMPLE', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO', PARTITIONS=4);
select * from PERSON_TABLE emit changes;
CREATE Table CAREERPLAN_PERSON_TABLE AS
select cp.ROWKEY, p.rowkey->personpk, p.firstname, p.lastname, cp.id as careerplanid
from CAREERPLAN_TABLE cp LEFT JOIN
PERSON_TABLE p ON p.RowKey = cp.personfk
emit changes;
When I query the data in the tables, it looks correct.
However, when I try to do any type of join like a left or inner join then I don’t get the expected results which should be every record from careerplan is mapped to a person record:
Is there some mechanic of a table-table foreign key join that I’m not considering?
There are some alternatives (listed below) that I have tried where the join works correctly. However, I’d like to understand more on why the foreign key join doesn’t work in this case with a struct.
- Table-Table join using primary key by rekeying careerplan to have person key as primary key (won’t work if a user has more than one careerplan)
- Ignoring multi-tenancy and unwrapping the struct to do an integer comparison rather than struct comparison (ie. personfk = personpk) [won’t work if we need to put multiple tenant’s in one topic]
There is one alternative that I have tried that didn’t work:
- Attempting to join using struct notation like
p.RowKey = Struct(personpk := cp.after->personfk, tenantkey := cp.rowkey->tenantkey)
[I believe this is unsupported as the foreign key must be a column and can’t be an expression]