Repartition a TABLE source problem in ksqldb join

I want to join two tables or stream/table, but I got this error. Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE’s key column MARKETER_ID instead of [ID]

Not sure what your question is, but currently ksqlDB only supports joins on the primary key of the table (cf Join Event Streams - ksqlDB Documentation) – I agree, that the error message is not very good…

You can only repartition a STREAM using the PARTITION BY clause. Thus, it might be possible to first create a STREAM, repartition it, and create a TABLE from the repartitioned result.

Also note: in upcoming 0.19 release, ksqlDB does add foreign-key joins and thus lifts the restriction on primary key joins.

1 Like

@mjsax thanks for your response. I enter all the necessary information here

I am using version 0.18 Ksqldb. And my topics are created from mysql tables with debezuim and AVRO format.
The relationship in the Mysql database is that all users are in the user table and the people who have the role of marketer are in the marketer table.


ksql> show topics;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 database.mlm.marketer       | 1          | 1
 database.mlm.marketer_info  | 1          | 1
 database.mlm.user           | 1          | 1
 default_ksql_processing_log | 1          | 1
 quickstart-avro-config      | 1          | 1
 quickstart-avro-offsets     | 25         | 1
 quickstart-avro-status      | 5          | 1
 schema-changes.database     | 1          | 1
---------------------------------------------------------------
ksql> describe user_stream;

Name                 : USER_STREAM
 Field          | Type
----------------------------------
 ID             | BIGINT
 ROLE_ID        | VARCHAR(STRING)
 EMAIL          | VARCHAR(STRING)
 MOBILE         | VARCHAR(STRING)
 USERNAME       | VARCHAR(STRING)
 NAME           | VARCHAR(STRING)
 FAMILY         | VARCHAR(STRING)
 VERIFY_CODE    | INTEGER
 MOBILE_VERIFY  | INTEGER
 STATUS         | INTEGER
 PASSWORD       | VARCHAR(STRING)
 REMEMBER_TOKEN | VARCHAR(STRING)
 WALLET         | INTEGER
 VOUCHER        | INTEGER
 CREATED_AT     | BIGINT
 UPDATED_AT     | VARCHAR(STRING)
 PIC            | VARCHAR(STRING)
----------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> describe marketer_table;

Name                 : MARKETER_TABLE
 Field         | Type
------------------------------------------------
 MARKETER_ID   | VARCHAR(STRING)  (primary key)
 ID            | BIGINT
 PARENT_ID     | INTEGER
 LEVEL         | INTEGER
 ANCESTRY      | VARCHAR(STRING)
 MARKETER_CODE | VARCHAR(STRING)
 SUBSET        | INTEGER
 UPDATED_AT    | VARCHAR(STRING)
------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> select * from user_stream as u left join marketer_table as m on u.id = m.id emit changes;
Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column MARKETER_ID instead of [ID]
ksql> select * from user_stream as u left join marketer_table as m on u.id = m.marketer_id emit changes;
Invalid join. Key types differ: BIGINT vs STRING

Your join condition is on u.id = m.id but the primary key of you table is marketer_id. For a stream-table join, you can only do lookups into the table using the primary key.

Invalid join. Key types differ: BIGINT vs STRING

Also, if you have different data types, you would need to cast the stream field to the type of the table’s primary key in the join condition: ON CAST(u.id AS VARCHAR) = m.marketer_id.

@mjsax the marketing table returns null results.
Why do we have to define the main key to create tables in the Ksqldb while the table in Mysql itself has it.
And in this case, when we put both IDs in the join, we encounter the following error.
Cannot repartition a TABLE source. If this is a join, make sure that the criteria uses the TABLE's key column MARKETER_ID instead of [ID]

ksqlDB needs a primary key, to ensure that Kafka’s log-compaction mechanism works.

ksqlDB is not a relational database, and it’s also still a fairly new project. Furthermore, it’s uses a distributes runtime (in contrast to MySQL). Thus, there are a few restrictions we need to apply (or we just did not get to implement yet).

For example, we only support joins on simple join conditions (like ON a = b), not complex join conditions (ON a = b AND c = c) are not supported yet.

the marketing table returns null results

Could be related to temporal semantics. Check out this talk for details: https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

1 Like

HI @mjsax,
is there a possibility to join below scenario,
i have
select …
from table_a a
left join table_b b (a.PK=b.PK)
left join table_c c( c.PK=b.PK)
( now in table c i have a FK which have link to table a)
left join table_a as a_alias (c.FK=a.PK)

Because you join on the PK already, you can just add an additional WHERE c.FK = a.PK – but not sure if this is semantically really what you want?

I still can’t figure out where to use the table and where to use the stream.

It depends on your data and user case. In the end, it’s a different semantical interpretation of your data.

A STREAM is used to model independent events. For example, if you have click-stream data. Each time a user clicks on something, the click is recorded and the event is independent from all other events (ie, clicks). Another example could be credit-card transactions. This this end, you can see a STREAM to be “stateless”, because all events are independent from each other.

On the other hand, a TABLE is used to model state, ie, everything that is true at the same point in time. For example, you could have an account balance per user. Or a user table with user profile information. The point is, that there is a primary key that identifies the user, and assigns a single row to the user. A user has only one account balance at one point in time, never multiple.

Of course, the account balance might change over time: if a user has $100 today, she may have $150 tomorrow. This implies that every row is valid for a “time interval”. Events on the other hand that you model as a STREAM, don’t have a “valid interval” but a scalar “happened-at timestamp”. For example, a user may deposit $100 dollars today, and $50 tomorrow. The second deposit does not “replace/update” the first deposit. Both events are independent. On the other hand, the corresponding account balance changes from $100 to $150 dollars (in case you compute a derive “balance TABLE” from the “deposit STREAM”).

So in a table will the old record gets deleted and just keeps new record?

If you read a topic as a table, the underlying record key acts as PRIMARY KEY and thus, if you have two records with the same key, a later record will “replace” the former record.

The underlying topic is append only, but Kafka will use “log compaction” to delete “old versions” eventually.

1 Like