Records missing after STREAM-TABLE JOIN

Hi everybody, i’m new to KSQL DB, so I’m not sure if it’s a bug or some complex behavior I don’t understand. I’m trying to join stream:

CREATE OR REPLACE STREAM STREAM1
WITH (KAFKA_TOPIC=‘db.stream1’, PARTITIONS=12, REPLICAS=3) AS SELECT
d.after->fid fid,
d.after after,
d.ts_ms timestamp
FROM DB_STREAM d
WHERE after IS NOT NULL
PARTITION BY d.after->fid
EMIT CHANGES;

with table, using this query:

CREATE OR REPLACE STREAM STREAM2
WITH (KAFKA_TOPIC=‘db.stream2’, PARTITIONS=12, REPLICAS=3) AS SELECT
r.after->customer customer,
n.fid fid,
n.name name
FROM STREAM1 r
INNER JOIN TABLE1 n on r.fid = n.fid
EMIT CHANGES;

But in result stream I miss several records (there are corespondent id’s in both stream and table).

What baffles me is if I do not use PARTITION BY in STREAM1 creation
or in join query I use INNER JOIN TABLE1 n on r.after->fid = n.fid instead of r.fid
I get all expected records in result stream.

Can someone please explain to me what may be the reason behind described behavior?
Isn’t it good practice to re-key stream (using PARTITION BY) before joining?

In general you need to consider two things:

  1. processing is ksqlDB has temporal semantics
  2. re-partitioning may introduce unorder

Not 100% sure what the root cause for the observed behavior is, however, you could try to increase max.task.idle.ms configuration parameter.

To give you more background how stream processing with ksqlDB works, and what temporal semantics are, please check out the following Kafka Summit talks:

1 Like

Hey Matthias, thank you for your reply.

I watched some of the lectures you posted, but I still struggle to understand why PARTITION BY can change resulting stream in STREAM-TABLE joins.

Shouldn’t the result by predicated only on an event-time of stream and table records? I don’t think PARTITION BY changes any timestamp only moves field from value to key in kafka topic.

I experimented with max.task.idle.ms parameter, I set it to very high values, but it didn’t help. I also made sure that I have same number of partitions in input stream, table and output stream.

Can you give me a few pointers here?

Shouldn’t the result by predicated only on an event-time of stream and table records? I don’t think PARTITION BY changes any timestamp only moves field from value to key in kafka topic.

The problem is, that PARTITION BY introduces out-of-order data. The table currently has only a single version, so if there is out-of-order data stream-side, it will join to the incorrect version (as the table will be already in the future). We plan to address this, but in the current implementation a stream-table join cannot handle out-of-order data nicely: if just joins the record to whatever table version is available.

What baffles me is if I do not use PARTITION BY in STREAM1 creation
or in join query I use INNER JOIN TABLE1 n on r.after->fid = n.fid instead of r.fid
I get all expected records in result stream.

Not 100% about this. If you use r.after->fid = n.fid ksqlDB should also repartition the data automatically… Can you double check that the timestamps are not modified when you do a manual repartitioning?

The problem is, that PARTITION BY introduces out-of-order data. The table currently has only a single version, so if there is out-of-order data stream-side, it will join to the incorrect version (as the table will be already in the future). We plan to address this, but in the current implementation a stream-table join cannot handle out-of-order data nicely: if just joins the record to whatever table version is available.

Ok. So there is a way to make sure that all records will be in resulting stream? To give you more context, I need to join database tables, which are streamed to kafka topic using CDC. Problem is they need to be joined on foreign key, which is not supported in KSQL right now. So I decided to use stream-table joins instead.

Not 100% about this. If you use r.after->fid = n.fid ksqlDB should also repartition the data automatically… Can you double check that the timestamps are not modified when you do a manual repartitioning?

Yes, they’re not modified, I use WITH TIMESTAMP semantics.

Why do you thing so? I guess we are going down some rabbit hole here :slight_smile:

So I decided to use stream-table joins instead.

Well, this won’t really work, because a stream-table join has totally different semantics… You cannot just swap one join for another… Btw: FK-joins are done and will be available soon.

It does not sound like a timestamp problem, but you are using the “wrong” join.

Well, I’m absolutely not sure of that. That was a question - is it possible to make sure that all records will be in resulting stream ? :slight_smile:

In current version of KSQL it’s not possible to join tables using FK-join. So I did that as a workaround as suggested here → Support non-key joins. · Issue #4424 · confluentinc/ksql · GitHub

I guess this workaround for some reason doesn’t really work (missing records, even when stream timestamps are higher then table timestamps) and my previous question was - is it possible to make it work? Or I just have to wait for new KSQL release with FK-joins.

Thanks again, for your reply:)

If your timestamps are “correct”, ie, stream-record timestamps are larger the corresponding table-record timestamps, no stream-record should get dropped on the floor, assuming you are using the latest version (and maybe set max.task.idle.ms config).

As you report that you are still missing records, even with correct timestamps, I am wondering what version you are using? Or maybe there is some other problem… Can you try a left-join to see what stream-records are missing (might also be good to understand how many)?

Personally, I am not a fan of using the stream-table joins as workaround, because of different semantics, but I guess it could work for some cases.

This topic was automatically closed after 30 days. New replies are no longer allowed.