hi. I have stream and I have table
I want somehow join all elements from the table with filtering to each new element in stream
is it possible?
CREATE STREAM IF NOT EXISTS input (data VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='sensors', PARTITIONS=1, REPLICAS=1, VALUE_FORMAT='JSON');
CREATE OR REPLACE TABLE previousData as SELECT EXTRACTJSONFIELD(keywords, '$.key') as unique_field, LATEST_BY_OFFSET(location, 1) as location_array FROM input GROUP BY EXTRACTJSONFIELD(keywords, '$.key');
I need it for searching in the past data for object which are in e.g. 100 meters from current object
or maybe I can for each element from input stream get all elements from the table where
keywords.key != previous.unique_field AND CUSTOM_FIND_DISTANCE(location.longitude, location.latitude, previous.location.longitude, previous.location.latitude) <= 100
found that joins can only join with data by the key, but I can’t do just WHERE statement
I want to get new stream where I have my data from stream and all data from table where keywords.key != uniqueField and also where execution of custom udf for checking distance between location of the element from the stream and location of the element from the table will be less then some number
Like I want to check if current element from stream is closer than 1000 meters from at least one element from the table with differ uniqueField
And in this case I need to receive 2 records instead of one:
I don’t thinks that is possible with ksqlDB. ksqlDB only support equi-joins, and a stream-table join is an “enrichment join”, ie, for each stream-record, there can be max one output record.
It seems, what you would need is a full cross-product, but ksqlDB does not support this.