Inconsistent query results on Join Queries

Hi,

Current doing exploring the use of the Confluent Community Platform V.7.0.0 and have come across the following oddity when joining datatsets in KSQL. We are using a 3 node cluster that is running on hardened VM’s so things like the /tmp has been repointed using the ENV variables.

The issue that we have come across is that when using a Join, the joined data does not always return, unfortuantely I am not in a position to provide any example datasets as all the data has been torn down, I will attempt to explain the issue as clearly as I can though. The issue has been seen on at least two different queries and both where slightly different.

This example may be pushing the use case of KSQL beyond what it is intended for;

stream = stream dataset with a data struct field
materialised_table = create table as select with configuration data

Select a.field, b.field
From stream as a
join materialised_table as b on b.key = coalesce(a.data[‘nofield’],‘key_value’) ;

The join here is effectively creating a 1=1 join so should always return the field from the table, however what I noticed when running this query that we would either get 100%, 99% or 50% of the records returned, changing the join to be a left outer join would as expected return 100% of the a records but with either 50% or 99% o of the joined b records. Things get even weirder when changing the query as follows;

stream = stream dataset with a data struct field
materialised_table_object = create table as select with the same object type as the stream data
materialised_table_config = create table as select with configuration data

Select a.field, c.field
From stream as a
join materialised_table_object as b on a.id = b.id
join materialised_table_config as c on c.key = coalesce(a.data[‘nofield’],‘key_value’) ;

adding an additional join in first, would mean that 100% of the records joined, even without using b in the returned dataset. If however swapping the order of teh joins around, the same issue was experienced as above where not all expected records are returned.

As mentioned the config join is probably pushing the boundries of the intention of the joins, however the same issue has also been encountered when using more standard joins, when adding an additional non referenced join in to the query seems to ensure that all records are returned.

i.e.
select
from stream
join table_a on stream-id = table_a.id
join table_b on stream.fk_if = tableb.id

which would return 100% of table_b and 0% of table_a

vs

select
from stream
join table_c on stream-id = table_c.id
join table_a on stream-id = table_a.id
join table_b on stream.fk_if = tableb.id

which would return 100% of bother a and b. where tables a and c hold different objects.

Any idea what could be causing this problem? as the issue has been relatively starighforward to spot with a small test dataset but if we where to move forward with something like this in production, it would prove very difficult to track an issue like this down. Are we just pushing the boundries of joins inside of KSQL?

Thanks in advance for any help,

Leander

Could be related to timestamps. Please check out this talk: https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

Hope it helps.

Hi,

Thanks for the video, it was an interesting watch.

However I dont think this is the cause, the explanation given in the video matches how I would expect the joins to work, yet the results I am seeing are non-determanistic.

I will try and provide a clearer example, however without a load of work, I cant provide a generic dataset for testing. I am hoping I can be pointed in the right direction with something I am doing wrong before it gets that far.

event_stream == event stream of all changes from a CI system. All events are lumped together with a category that denotes the type of object that is changed. the majority of the data is passed in as a JSON/struct object which is defined in the AVRO schema as a string for both the key and the value.

lifecycle_table = create table as select data[‘id’], data[‘name’] as lifecycle from event_stream where category = ‘lifecycle’ emit changes;

type_table = create table as select data[‘id’], data[‘name’] as type from event_stream where category = ‘type’ emit changes;

Now with the taps turned off (no new events flowing through) running the following query retrurns non-determanistic results;

select a.data['id#], b.name as lifecycle, c.name as type
from event_stream a
left outer join lifecycle_table b on a.data[‘lifecycle_id’] = b.id
left outer join type_table c on a.data[‘type_id’] = c.id
where category = ‘CI’;

This can return the following results

1, retired, physical
2, retired, virtual
3, active, null
4, retired, null
5, maintenance, null

or just by immediately re-running the query with no new events;
1, retired, null
2, retired, null
3, active, null
4, retired, null
5, maintenance, null

If I change the order of the joins around, I have even seen the following;

select a.data['id#], b.name as lifecycle, c.name as type
from event_stream a
left outer join type_table c on a.data[‘type_id’] = c.id
left outer join lifecycle_table b on a.data[‘lifecycle_id’] = b.id
where category = ‘CI’;

1, null, null
2, null, null
3, null, null
4, null, null
5, null, null

it gets weirder yet, by defining using an extra materialised table in the equasion;

another_table = create table as select data['id], data[‘name’] from event_stream where category = ‘someothervalidcategory’ emit changes;

then by running

select a.data['id#], b.name as lifecycle, c.name as type
from event_stream a
left outer join another_table d on a.data[‘id’] = d.id
left outer join lifecycle_table b on a.data[‘lifecycle_id’] = b.id
left outer join type_table c on a.data[‘type_id’] = c.id
where category = ‘CI’;

1, retired, physical
2, retired, virtual
3, active, virtual
4, retired, virtual
5, maintenance, physical

is returned.

However again changing the query to the following;

select a.data['id#], b.name as lifecycle, c.name as type
from event_stream a
left outer join another_table d on a.data[‘id’] = d.id
left outer join type_table c on a.data[‘type_id’] = c.id
left outer join lifecycle_table b on a.data[‘lifecycle_id’] = b.id
where category = ‘CI’;

swapping b & c around in the order of the joins would consistently return;
1, null, null
2, null, null
3, null, null
4, null, null
5, null, null

Based on this result behaviour, the results produced are not deterministic.

The tables that have been defined where done so by setting the offset to earlist so that all event data gets processed.

When running the join query again, this is being done with the offset having been set to earliest.

Is this then whats causing the oddity in the behaviour? Would overwritting and setting the timestamp to 0 on the materialised tables then potentially fix this problem as suggested in the video? as those records would always be older then any of the streamed events? would that make sense as all the data here is effectively coming from the same source topic, just sliced in different ways?

Thanks again for the help,

Leander

Did you verify the timestamps of your input data? Do the table records have smaller timestamps than the stream records? Also, did you increase max.task.idle.ms config to allow ksqlDB to time-synchronized the inputs?

Also note, that repartitioning (happens as you join the stream to multiple tables using different columns) may result in un-order in the stream.

Is this then whats causing the oddity in the behaviour? Would overwritting and setting the timestamp to 0 on the materialised tables then potentially fix this problem as suggested in the video?

Well, that might be possible (but not straightforward to do with ksqlDB compare to Kafka Streams), but it effectively disables time-synchronization. Not sure if you really want this.

Hi,

I havent verified the timestamps yet, but I do strongly suspect that this could be the cause due to the nature of how the vender product treats it data and how it is being loaded in to Kafka.

I have control over providing my own timestamp (at least for some of the data that I am materialising and joining on) on the way in to kafka. So I can definitely set that to 0. If I then define my streams/tables using my timestamp attribute as the timestamp, will the joining respect that value or is it based on when the data was written to disk?

I am not to worried about forcing the timestamp to 0 as part of the data load for this particular dataset as there is the potential for future events to give it a current timestamp if the data does ever change.

currently max.task.idle.ms is set to 0, I have had a look at the description for this config item, but I dont understand what it is doing and how I work out what it should be instead.

Thanks,

Leander

will the joining respect that value or is it based on when the data was written to disk?

Yes, ksqlDB uses ROWTIME for this purpose.

currently max.task.idle.ms is set to 0, I have had a look at the description for this config item, but I dont understand what it is doing and how I work out what it should be instead.

Check out https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/ for details on the config. (Slides 34ff)