LAG function as in Stream Analytics

Hi all
Need to move out from azure stream analytics to Kafka with KSQL

What I need. I have Kafka topic “sensors”, I need to process all messages, filter them and only filtered will go to another kafka topic “results”
Here is my KSQL code for this

{
CREATE STREAM IF NOT EXISTS input_1 (source VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='sensors', PARTITIONS=2, REPLICAS=1, VALUE_FORMAT='JSON');           
CREATE STREAM IF NOT EXISTS output_1 (source VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='results', PARTITIONS=2, REPLICAS=1, VALUE_FORMAT='JSON');
INSERT INTO output_1 SELECT * FROM input_1 WHERE EXTRACTJSONFIELD(keywords, '$.id') = 'first' OR EXTRACTJSONFIELD(keywords, '$.id') = 'second';

it works, but also I need to receive data about sensor, get previous record of this sensor which came earlier, compare it and send new data only if another field changed

in Stream Analytics it will be code like


      WITH AllData AS
      (
      SELECT
          e1 as EventHub,
          LAG (e1) OVER (PARTITION BY e1.keywords.id LIMIT DURATION(minute, 30) WHEN e1.keywords.id IS NOT NULL) as prevData
      FROM
          EventHub e1
      )

      SELECT
            *
      INTO
          ServiceBus
      FROM
          AllData WHERE AllData.prevData IS NULL OR AllData.prevData.keywords.value = e1.keyword.value;

I tried with WINDOW TUMBLING function but no result
can someone please help me and modify this part with requirements above

INSERT INTO output_1 SELECT * FROM input_1 WHERE EXTRACTJSONFIELD(keywords, '$.id') = 'first' OR EXTRACTJSONFIELD(keywords, '$.id') = 'second';

Thanks

Hi @pavlenkoxx,

LATEST_BY_OFFSET might work well for this use case, where you ask for the latest 2 records for each sensor ID.

A completely riffed, untested starting point :slight_smile: :

SELECT
sensor_id,
LATEST_BY_OFFSET(keyword, 2) as keyword_array
FROM stream
GROUP BY sensor_id
EMIT CHANGES;

You’d then have keyword_array[0] and keyword_array[1] to compare and use for filtering.

Note that this wouldn’t have a duration limit like the Stream Analytics query does.

HTH,
Dave

1 Like

Hey Dave, I was testing this exactly and it works fine:

CREATE STREAM IF NOT EXISTS sensors (
  source VARCHAR, 
  location STRUCT<longitude VARCHAR, latitude VARCHAR>, 
  keywords VARCHAR) 
  WITH (KAFKA_TOPIC='sensors', PARTITIONS=2, REPLICAS=3, VALUE_FORMAT='JSON');

CREATE OR REPLACE TABLE last_and_previous_sensors as
SELECT source, LATEST_BY_OFFSET(location, 2), LATEST_BY_OFFSET(keywords, 2)
FROM sensors WINDOW TUMBLING (SIZE 10 MINUTES)
GROUP BY source;

insert into sensors (source, location, keywords) values ('building', STRUCT( longitude:= 'AAA', latitude := 'BBB'), 'primary');
insert into sensors (source, location, keywords) values ('building', STRUCT( longitude:= 'BBB', latitude := 'CCC'), 'secondary');
insert into sensors (source, location, keywords) values ('building', STRUCT( longitude:= 'CCC', latitude := 'DDD'), 'primary');
insert into sensors (source, location, keywords) values ('building', STRUCT( longitude:= 'DDD', latitude := 'EEE'), 'primary');

Now, if you run

select * from last_and_previous_sensors;

You’ll get

{
  "SOURCE": "building",
  "WINDOWSTART": 1681398000000,
  "WINDOWEND": 1681398600000,
  "KSQL_COL_0": [
    {
      "LONGITUDE": "BBB",
      "LATITUDE": "CCC"
    },
    {
      "LONGITUDE": "DDD",
      "LATITUDE": "EEE"
    }
  ],
  "KSQL_COL_1": [
    "secondary",
    "primary"
  ]
}
2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.