Thanks @rmoff and @gianlucanatali for your answers.
Before reading your answers I could advance a bit in the problem.
Actually I receive more than the lights, I receive also the buildings where that lights are placed, so I got something like this:
BUILDING |ELEMENT|VALUE
---------+-------+-----
Building1|Light1 |on
Building2|Light1 |broken
Building1|Light3 |off
So I thought that the only way I could filter by the three lights could be if the information is sorted this way:
BUILDING |Light1|Light2|Light3
---------+------+------+------
Building1|on |off |off
Building2|off |broken|off
That way is trivial to filter: WHERE BUILDING = ‘Building1’ AND Light1 = ‘on’ AND Light2 = ‘on’ AND Light3 = ‘on’
So I transformed the info this way:
CREATE STREAM STR_STATUS (
BUILDING VARCHAR,
ELEMENT VARCHAR,
VALUE VARCHAR
) WITH (
kafka_topic='STATUS',
value_format='JSON_SR'
);
CREATE TABLE TBL_LIGHT1 AS
SELECT
BUILDING,
LATEST_BY_OFFSET(VALUE) VALUE
FROM
STR_STATUS
WHERE
ELEMENT = 'Light1'
GROUP BY BUILDING
EMIT CHANGES;
CREATE TABLE TBL_LIGHT2 AS
SELECT
BUILDING,
LATEST_BY_OFFSET(VALUE) VALUE
FROM
STR_STATUS
WHERE
ELEMENT = 'Light2'
GROUP BY BUILDING
EMIT CHANGES;
CREATE TABLE TBL_LIGHT3 AS
SELECT
BUILDING,
LATEST_BY_OFFSET(VALUE) VALUE
FROM
STR_STATUS
WHERE
ELEMENT = 'Light3'
GROUP BY BUILDING
EMIT CHANGES;
CREATE TABLE TBL_ALL_LIGHTS_ON AS
SELECT
T1.BUILDING AS BUILDING,
CASE
WHEN T1.VALUE = 'on' AND T2.VALUE = 'on'
THEN true
ELSE false
END AS ALL_LIGHTS_ON
FROM TBL_LIGHT1 T1
INNER JOIN TBL_LIGHT2 T2
ON T1.BUILDING = T2.BUILDING
EMIT CHANGES;
Note the join in the last table I have only two lights, because my current version of KSQLDB does not allow more joins (I think I can solve this simply updating to a newer version, thanks for the tip about the docker image @rmoff!).
Note also about the new field building, without it you will need to do as @rmoff did (select 1 … group by 1).
Also it seems like a bit of an overkill to create a table for every Light, as there can be more elements like doors, sensors, etc.
I have yet to try what @gianlucanatali proposed, but with this tables I created I managed at least to create a table which
tells me if all lights (well, at the moment 2 of them) are on.
So I connected a Sink connector to it, but I was hoping that only records with different values would be sent, but actually
all records are sent.
I was thinking in making a custom Transformation that would check if current value is the same than the last value, and only allow sending it if they are different, but again I don’t know if there is someway to do this easier. I’m also concerned with this because of concurrency when I add more brokers, connect instances, tasks in the connector…
I will update this thread when I try @gianlucanatali idea, and I will bring my question about the sink connector to the kafka-connect forum (See: Send messages to sink connector only if value has changed)
About the questions:
Is the total number of “lights” fixed?
No, and there could be more elements like doors, etc. Their state could be more complex than ‘on’ and ‘off’ such as ‘broken’ or ‘uninstalled’
Do you have a test case that someone could use to reproduce the environment? Something like a docker-compose / script
I will try to provide that in the next update.
Thanks again for your help!