Exclusion join query

How can I make an “exclusion join query”?

I’m trying to do something like this from traditional SQL

SELECT *
FROM topic1
WHERE NOT EXISTS (
  SELECT 1
  FROM topic2
  WHERE topic2.key1 = topic1.key1
  AND topic2.key2 = topic2.key2
  AND topic2.key2 = topic2.key2
)

In a “after the time window” kind of logic.

To give better perspective consider these records come into the two topics (topic is first column)

topic1,addressX,100,value1
topic2,addressX,100,value1
topic2,addressX,101,value2

The goal of this query is to identify “dangling mismatching ends” per address. So in this case for example, we want to identify that topic1 is missing addressX matching value (after some time period of course)

I tried doing this with a LEFT JOIN as well as a FULL OUTER JOIN but these don’t seem to wait for the other topic to get the data as defined by the window, as soon as one side has something the null variant join rows are output, so there’s no way to identify a dangling record.

Did you try a stream-stream windowed join with GRACE period?

This join should wait after grace period passed, before emitting left/right join results.

You need to be on the latest version though… Cf Announcing ksqlDB 0.23.1 - New Features and Updates

Hmm we’re on 0.21 right now but even so this is confusing to me. If the default grace period is 24h shouldn’t the LEFT/OUTER joins then hold on until that time? Did the actual logic change here?

UPDATED: ah nevermind, you’re right! I didn’t read past the section where the logical change is explained. Thank you, this would fix it, but we’ll need to update to 0.23 then.

1 Like

Correct. The old join behavior was weird, and thus we fixed it in 0.23.

We tried updating from Docker Hub which is the latest image released but the GRACE keyword is rejected on joins with mismatched input 'GRACE' expecting 'ON'

I noticed tho that the server version reported when using this docker image is 0.23.1-rc9 and the image was posted on Dec 15th 2021 almost a month before the announcement of v0.23.1? Is this a mis-release?

Not sure. Did you update both server and client?

Yes, we’re using Docker Hub and we’re on latest with CLI v7.0.1, Server v0.23.1-rc9

Here’s the error with the query I tried:

line 4:19: mismatched input 'GRACE' expecting 'ON'
Statement: SELECT t2.*
FROM topic1 t1
LEFT JOIN topic2 t2
WITHIN 10 MINUTES GRACE PERIOD 10 MINUTES
ON t1.field = t2.field
EMIT CHANGES;
Caused by: line 4:19: mismatched input 'GRACE' expecting 'ON'
Caused by: org.antlr.v4.runtime.InputMismatchException

Confluent 7.0 ships with ksqlDB 0.21 (cf Supported Versions and Interoperability for Confluent Platform | Confluent Documentation)

Can you try ksqlDB CLI 0.23.1: Docker Hub

That worked, but only syntax wise.

The query is now not eager, but it never returns the null cases.

E.g. if I have a single row in topic1 and nothing in topic2 the left join never returns anything. It returns matching records if there are any. I did WITHIN 10 SECONDS GRACE PERIOD 10 SECONDS and waited over a minute to be sure.

Is there something I’m missing about how this is suppose to behave? My understanding is that if grace period times out from the leftside record being “buffered” it’d output it with nulls for the right side values (in case of the left join)

The observed behavior is as expected. Note that time is tracked based on event-time, not wall-clock time. Left/Right join results are only emitted when stream-time (max observed event-time) passed window close time (ie, window-end time plus grace period). If you stop sending data, stream-time cannot advance and thus you won’t observe output.

I see. I just didn’t have long enough output on either of the topics. Is there any way to detect a stall when one side completely stops sending tho?

Time progress is tracked across both inputs for the join case. As long as one input topic has input data, time will advance.

This topic was automatically closed after 30 days. New replies are no longer allowed.