Missing records in STREAM-TABLE join

I noticed a similar issue has already posted here, but reposting as I’m experiencing slightly different behavior.

In my usecase, i’m enriching a stream with with ~300k events using N-way left joins with 4 tables. I’ve since narrowed the problem down to a single stream-table join on 1 partition but still unable get all the events from the original stream.

CREATE STREAM STREAM1 (
ID STRING,
CID STRING
) WITH (
KAFKA_TOPIC=‘db.stream1’,
KEY_FORMAT=‘KAFKA’,
VALUE_FORMAT=‘JSON’
);

– entire contents loaded before db.stream1
CREATE TABLE TABLE1 (
ID STRING PRIMARY KEY,
NAME STRING
) WITH (
KAFKA_TOPIC=‘db.stream2’,
KEY_FORMAT=‘KAFKA’,
VALUE_FORMAT=‘JSON’
);

CREATE STREAM STREAM2
WITH(KAFKA_TOPIC=‘db.stream3’,
KEY_FORMAT=‘KAFKA’,
VALUE_FORMAT=‘JSON’)
AS
select * from STREAM1 p left join TABLE1 c on p.cid = c.id
emit changes;

Running select count() from STREAM1 results in 300K
Running select count(
) from STREAM2 results in 30K

What baffles me is changing STREAM2 to an inner join results in 60K records.

I would be greatful if anyone can point out what i’m doing wrong.

Not 100% sure, but can STREAM1 fields be NULL – you cannot join on a NULL key (or NULL value), and the record would be dropped (even for left join).

What baffles me is changing STREAM2 to an inner join results in 60K records.

That is indeed very strange… :thinking:

– entire contents loaded before db.stream1

How do you know this? In the end, the CREATE TABLE TABLE1 statement is a schema definition – no data will be processed yet. Only when you submit your join query, data would be processed (in timestamp order interleaved between both input of the join).

Thank you Matthias for the quick reply.

CID in STREAM1 can indeed be null causing events to be dropped from original stream. Is there way to approach this enrichment with null CID and not drop events from STREAM1?

I think you could change the join predicate to

...ON IFNULL(p.cid, '') = c.id

basically replacing any NULL with empty string (of course, this assume that c.id is never the empty string; otherwise, you might get incorrect join results).

Thanks, I came to a similar solution using the COALESCE scalar function and assigning to an invalid CID value.

It is still strange how the inner join produces more results than left join in my setup. It could be down to a test setup error between runs. But for now everything is working as expected.

I feel many would fall into this trap expecting left joins to behave more like ANSI SQL and could be better documented here under stream-table joins Join Event Streams - ksqlDB Documentation

1 Like

Glad it works now.

I agree on both points… For updating the docs, I will follow up internally.

For changing the behavior, there is actually already a KIP for Kafka Streams for it (ksqlDB does inherit the behavior from Kafka Streams for this case): KIP-962: Relax non-null key requirement in Kafka Streams - Apache Kafka - Apache Software Foundation – If the KIP lands, ksqlDB might get such a fix implicitly when the dependency to Kafka Streams is updated to a new version.

This topic was automatically closed after 30 days. New replies are no longer allowed.