Denormalise N:N tables

I would like to know what’s the best approach to denormalise an N:N table in ksqlDB?

For example, let’s explore the scenario where I have the following pipeline: SQL DB → Debenzium → Kafka → ksqlDB. And the following tables exist on the SQL side:

CREATE TABLE dogs (
   id INT(11) NOT NULL AUTO_INCREMENT
  ,name VARCHAR(255)
  ,PRIMARY KEY (id)
);

CREATE TABLE owners (
   id INT(11) NOT NULL AUTO_INCREMENT
  ,name VARCHAR(255)
  ,PRIMARY KEY (id)
);

CREATE TABLE dogs_owners (
   ownerId INT(11)
  ,dogId INT(11)
  ,mainOwner TINYINT(1)
  ,PRIMARY KEY (ownerId, dogId)
);

As you can see the dogs_owners table is an N:N table, where dogs can have multiple owners, and owners can have multiple dogs. Following the CDC pipeline, each of these tables will become a Kafka topic, receiving every change that happens to them (I’m using JSON encoder instead of AVRO if it makes any difference). At this moment I would like to create tables in ksqlDB to reference those, like:

DROP TABLE IF EXISTS dogs;
CREATE TABLE dogs (
   key STRUCT<id INT> PRIMARY KEY
  ,name VARCHAR
) WITH (
  kafka_topic='dogs',
  key_format='json',
  value_format='json'
);

DROP TABLE IF EXISTS owners;
CREATE TABLE owners (
   key STRUCT<id INT> PRIMARY KEY
  ,name VARCHAR
) WITH (
  kafka_topic='owners',
  key_format='json',
  value_format='json'
);

DROP TABLE IF EXISTS dogs_owners;
CREATE TABLE dogs_owners (
   key STRUCT<ownerId INT, dogId INT> PRIMARY KEY
  ,mainOwner INT
) WITH (
  kafka_topic='dogs_owners',
  key_format='json',
  value_format='json'
);

Now I would like to create a denormalised view of those 3 tables, so I would have only one table with all the info. Like:

CREATE TABLE dogs_owners_denormalised AS
  SELECT
     owners.id AS ownerId
    ,owners.name AS ownerName
    ,dogs.id AS dogId
    ,dogs.name AS dogName
    ,dogs_owners.mainOwner
  FROM dogs_owners
  JOIN owners ON (
    owners.key->id = dogs_owners.key->ownerId
  )
  JOIN dogs ON (
    dogs.key->id = dogs_owners.key->dogId
  );

I understand that I cannot do this kind of join, as I need a full primary key match when working with tables. What is the ksqlDB approach when working with N:N tables for this scenario?

I would like to avoid using streams because in my real-world scenario (that has many more tables and relations), I would like to do aggregations on this denormalised final table, and the related data from each table could arrive on a different time, so it’s hard to determinate a stream join for a window period.

Thank you in advance!

I know you don’t want to use streams, but how about creating one stream that reacts to dogs update and another one for owners. Each stream would have a join on the table from the other entity which would help you generate a unique key for the dogs_owners_denormalized table.

If you have multiple tables in real life, you can just add a stream for each of them and joins on all the tables on each stream tables.

Then just create a table out of that destination topic…

Thank you Propster for your response :slightly_smiling_face:

So the idea would be:

DROP STREAM IF EXISTS dogs;
CREATE STREAM dogs (
   key STRUCT<id INT> KEY
  ,name VARCHAR
) WITH (
  kafka_topic='dogs',
  key_format='json',
  value_format='json'
);

DROP STREAM IF EXISTS owners;
CREATE STREAM owners (
   key STRUCT<id INT> KEY
  ,name VARCHAR
) WITH (
  kafka_topic='owners',
  key_format='json',
  value_format='json'
);

DROP TABLE IF EXISTS dogs_owners;
CREATE TABLE dogs_owners (
   key STRUCT<ownerId INT, dogId INT> PRIMARY KEY
  ,mainOwner INT
) WITH (
  kafka_topic='dogs_owners',
  key_format='json',
  value_format='json'
);

and for the main query:

CREATE STREAM dogs_owners_denormalised_stream AS
  SELECT
     owners.key AS ownerKey
    ,owners.name AS ownerName
    ,dogs.key AS dogKey
    ,dogs.name AS dogName
    ,dogs_owners.mainOwner
  FROM dogs_owners
  JOIN owners WITHIN 30 DAYS ON (
    owners.key = STRUCT(ownerId := dogs_owners.key->ownerId)
  )
  JOIN dogs WITHIN 30 DAYS ON (
    dogs.key = STRUCT(dogId := dogs_owners.key->dogId)
  )
  PARTITION BY STRUCT(ownerId := dogs_owners.key->ownerId, dogId := dogs_owners.key->dogId);

CREATE TABLE dogs_owners_denormalised (
   key STURCT<ownerId INT, dogId INT> PRIMARY KEY
  ,ownerKey STRUCT<ownerId INT>
  ,ownerName VARCHAR
  ,dogKey STRUCT<dogId INT>
  ,dogName VARCHAR
  ,mainOwner INT
) WITH (
  kafka_topic='dogs_owners_denormalised_stream',
  key_format='json',
  value_format='json'
);

The problem that I have with this approach is the WITHIN <period> of the stream join, as the related information could be untouched for a long period of time (so no stream events) :thinking: Maybe the wrong piece here is trying to use a stream-focused database with something that doesn’t fit as stream data. Thanks again! I will step back and give some thoughts on the flow.

Hi @rafaeljusto,

Consider the stream-table-table pattern here, where dogs and owners would be the lookup TABLEs. You’d need to be careful of the “Important” note here:

ksqlDB currently provides best-effort on time synchronization, but there are no guarantees, which can cause missing results or leftRecord-NULL results.

In the case of the example, if a dog-owner tuple in the dogs_owners stream doesn’t yet have a corresponding dog and/or owner in the dogs / owners tables, you’ll get a row emitted with null dog and owner attributes.

Example stream-table-table join code is below.

HTH,
Dave

DROP TABLE IF EXISTS dogs;
CREATE TABLE dogs (
   dog_id INT PRIMARY KEY
  ,name VARCHAR
) WITH (
  kafka_topic='dogs',
  key_format='json',
  value_format='json',
  partitions=1
);

DROP TABLE IF EXISTS owners;
CREATE TABLE owners (
   owner_id INT PRIMARY KEY
  ,name VARCHAR
) WITH (
  kafka_topic='owners',
  key_format='json',
  value_format='json',
  partitions=1
);

INSERT INTO dogs(dog_id, name) VALUES (1, 'Fido');
INSERT INTO dogs(dog_id, name) VALUES (2, 'Spot');
INSERT INTO dogs(dog_id, name) VALUES (3, 'Max');


INSERT INTO owners(owner_id, name) VALUES(1, 'Sue');
INSERT INTO owners(owner_id, name) VALUES(2, 'Bob');
INSERT INTO owners(owner_id, name) VALUES(3, 'Mary');



DROP STREAM IF EXISTS dogs_owners;
CREATE STREAM dogs_owners (
   owner_id INT KEY,
   dog_id INT KEY,
   main_owner_id INT
) WITH (
  kafka_topic='dogs_owners',
  key_format='json',
  value_format='json',
  partitions=1
);

INSERT INTO dogs_owners(owner_id, dog_id, main_owner_id) VALUES(1, 1, 1);
INSERT INTO dogs_owners(owner_id, dog_id, main_owner_id) VALUES(1, 2, 2);
INSERT INTO dogs_owners(owner_id, dog_id, main_owner_id) VALUES(2, 2, 2);
INSERT INTO dogs_owners(owner_id, dog_id, main_owner_id) VALUES(3, 3, 3);

SET 'auto.offset.reset'='earliest';

SELECT do.owner_id, do.dog_id, do.main_owner_id, d.dog_id, d.name, o.owner_id, o.name
FROM dogs_owners do
JOIN dogs d ON do.dog_id=d.dog_id 
JOIN owners o ON do.owner_id=o.owner_id
EMIT CHANGES;

+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+
|DO_OWNER_ID                 |DO_DOG_ID                   |MAIN_OWNER_ID               |D_DOG_ID                    |D_NAME                      |O_OWNER_ID                  |O_NAME                      |
+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+
|1                           |1                           |1                           |1                           |Fido                        |1                           |Sue                         |
|1                           |2                           |2                           |2                           |Spot                        |1                           |Sue                         |
|2                           |2                           |2                           |2                           |Spot                        |2                           |Bob                         |
|3                           |3                           |3                           |3                           |Max                         |3                           |Mary                        |
1 Like

Actually, I was thinking about 2 seperate “main queries” producing to the same topic, but looking back at it, you can probably simply create tables for dogs (dogs_t) and owners (owners_t) and then you only need a stream for dogs_owners (dogs_owners_s).

Your main query would look like this :

CREATE STREAM dogs_owners_denormalised_stream 
	  AS SELECT owners.key AS ownerKey ,
			owners.name AS ownerName ,
			dogs.key AS dogKey ,
			dogs.name AS dogName ,
			dogs_owners.mainOwner 
	     FROM dogs_owners_s dogs_owners
		JOIN owners_t owners ON
			( owners.key = STRUCT(ownerId := dogs_owners.key->ownerId) ) 
		JOIN dogs_t dogs ON 
			( dogs.key = STRUCT(dogId := dogs_owners.key->dogId) ) 
PARTITION BY STRUCT(ownerId := dogs_owners.key->ownerId, dogId := dogs_owners.key->dogId);

Since you can’t have a new row in dogs_owners without already having inserted rows in dogs and owners, you shouldn’t run in any “null” value problems.

1 Like

Oups… pretty much answered with the same solution as dtroiano… sorry for the duplicate! :stuck_out_tongue:

Thank you Dave and JP! :slightly_smiling_face: This approach will definitely help me out.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.