Hi all
Need to move out from azure stream analytics to Kafka with KSQL
What I need. I have Kafka topic “sensors”, I need to process all messages, filter them and only filtered will go to another kafka topic “results”
Here is my KSQL code for this
{
CREATE STREAM IF NOT EXISTS input_1 (source VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='sensors', PARTITIONS=2, REPLICAS=1, VALUE_FORMAT='JSON');
CREATE STREAM IF NOT EXISTS output_1 (source VARCHAR, location STRUCT<longitude VARCHAR, latitude VARCHAR>, keywords VARCHAR) WITH (KAFKA_TOPIC='results', PARTITIONS=2, REPLICAS=1, VALUE_FORMAT='JSON');
INSERT INTO output_1 SELECT * FROM input_1 WHERE EXTRACTJSONFIELD(keywords, '$.id') = 'first' OR EXTRACTJSONFIELD(keywords, '$.id') = 'second';
it works, but also I need to receive data about sensor, get previous record of this sensor which came earlier, compare it and send new data only if another field changed
in Stream Analytics it will be code like
WITH AllData AS
(
SELECT
e1 as EventHub,
LAG (e1) OVER (PARTITION BY e1.keywords.id LIMIT DURATION(minute, 30) WHEN e1.keywords.id IS NOT NULL) as prevData
FROM
EventHub e1
)
SELECT
*
INTO
ServiceBus
FROM
AllData WHERE AllData.prevData IS NULL OR AllData.prevData.keywords.value = e1.keyword.value;
I tried with WINDOW TUMBLING function but no result
can someone please help me and modify this part with requirements above
INSERT INTO output_1 SELECT * FROM input_1 WHERE EXTRACTJSONFIELD(keywords, '$.id') = 'first' OR EXTRACTJSONFIELD(keywords, '$.id') = 'second';
Thanks