Join stream to all table elements with filtering

hi. I have stream and I have table
I want somehow join all elements from the table with filtering to each new element in stream
is it possible?

CREATE STREAM IF NOT EXISTS input (data VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='sensors', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON'); 

CREATE OR REPLACE TABLE previousData as SELECT EXTRACTJSONFIELD(keywords, '$.key') as unique_field, LATEST_BY_OFFSET(location, 1) as location_array FROM input GROUP BY EXTRACTJSONFIELD(keywords, '$.key'); 

I need it for searching in the past data for object which are in e.g. 100 meters from current object
or maybe I can for each element from input stream get all elements from the table where

keywords.key != previous.unique_field AND CUSTOM_FIND_DISTANCE(location.longitude, location.latitude, previous.location.longitude, previous.location.latitude) <= 100

found that joins can only join with data by the key, but I can’t do just WHERE statement

thanks for your help

I want somehow join all elements from the table with filtering to each new element in stream is it possible?

Not sure what you mean by this. Can you maybe give an example with records for both stream and table, what result you want?

Okay
I have table with records like

  1. { “uniqueField”: “123”, “location_array”:[{“longitude”: 1.2134, “latitude”: 2.3168}]}
  2. { “uniqueField”: “234”, “location_array”:[{“longitude”: 2.2134, “latitude”: 3.3168}]}
  3. { “uniqueField”: “345”, “location_array”:[{“longitude”: 85.767, “latitude”: 32.3169}]}
  4. { “uniqueField”: “567”, “location_array”:[{“longitude”: 85.768, “latitude”: 32.3168}]}
  5. { “uniqueField”: “765”, “location_array”:[{“longitude”: 85.788, “latitude”: 32.3278}]}

And I have stream with data like

{ “data”: “text”, “location_array”:{“longitude”: 85.778, “latitude”: 32.3268}, “keywords”: “{\”key\”:\”765\”}” }

I want to get new stream where I have my data from stream and all data from table where keywords.key != uniqueField and also where execution of custom udf for checking distance between location of the element from the stream and location of the element from the table will be less then some number
Like I want to check if current element from stream is closer than 1000 meters from at least one element from the table with differ uniqueField

And in this case I need to receive 2 records instead of one:

  1. { “data”: “text”, “location_array”:{“longitude”: 85.778, “latitude”: 32.3268}, “keywords”: “{\”key\”:\”765\”}”, “uniqueField”: “345”, “location_array”:[{“longitude”: 85.767, “latitude”: 32.3169}]}
  2. { “data”: “text”, “location_array”:{“longitude”: 85.778, “latitude”: 32.3268}, “keywords”: “{\”key\”:\”765\”}”, “uniqueField”: “567”, “location_array”:[{“longitude”: 85.768, “latitude”: 32.3168}]}

but it should be zero records returned (when it’s no records for where statement)

And also need option to filter that need to compare only with objects from the table where uniqueField is “123” for example

I don’t thinks that is possible with ksqlDB. ksqlDB only support equi-joins, and a stream-table join is an “enrichment join”, ie, for each stream-record, there can be max one output record.

It seems, what you would need is a full cross-product, but ksqlDB does not support this.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.