KSQLDB table with a condition that involves several messages of a stream

Thanks @rmoff and @gianlucanatali for your answers.

Before reading your answers I could advance a bit in the problem.

Actually I receive more than the lights, I receive also the buildings where that lights are placed, so I got something like this:

BUILDING |ELEMENT|VALUE
---------+-------+-----
Building1|Light1 |on
Building2|Light1 |broken
Building1|Light3 |off

So I thought that the only way I could filter by the three lights could be if the information is sorted this way:

BUILDING |Light1|Light2|Light3
---------+------+------+------
Building1|on    |off   |off
Building2|off   |broken|off

That way is trivial to filter: WHERE BUILDING = ‘Building1’ AND Light1 = ‘on’ AND Light2 = ‘on’ AND Light3 = ‘on’

So I transformed the info this way:

CREATE STREAM STR_STATUS (
	BUILDING VARCHAR,
	ELEMENT VARCHAR,
	VALUE VARCHAR
) WITH (
	kafka_topic='STATUS',
	value_format='JSON_SR'
);

CREATE TABLE TBL_LIGHT1 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light1'
		GROUP BY BUILDING
	EMIT CHANGES;

CREATE TABLE TBL_LIGHT2 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light2'
		GROUP BY BUILDING
	EMIT CHANGES;
	
CREATE TABLE TBL_LIGHT3 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light3'
		GROUP BY BUILDING
	EMIT CHANGES;
	
CREATE TABLE TBL_ALL_LIGHTS_ON AS
	SELECT
		T1.BUILDING AS BUILDING,
		CASE
			WHEN T1.VALUE = 'on' AND T2.VALUE = 'on'
			THEN true
			ELSE false
		END AS ALL_LIGHTS_ON
	FROM TBL_LIGHT1 T1
	INNER JOIN TBL_LIGHT2 T2
	ON T1.BUILDING = T2.BUILDING
	EMIT CHANGES;

Note the join in the last table I have only two lights, because my current version of KSQLDB does not allow more joins (I think I can solve this simply updating to a newer version, thanks for the tip about the docker image @rmoff!).

Note also about the new field building, without it you will need to do as @rmoff did (select 1 … group by 1).

Also it seems like a bit of an overkill to create a table for every Light, as there can be more elements like doors, sensors, etc.

I have yet to try what @gianlucanatali proposed, but with this tables I created I managed at least to create a table which
tells me if all lights (well, at the moment 2 of them) are on.

So I connected a Sink connector to it, but I was hoping that only records with different values would be sent, but actually
all records are sent.

I was thinking in making a custom Transformation that would check if current value is the same than the last value, and only allow sending it if they are different, but again I don’t know if there is someway to do this easier. I’m also concerned with this because of concurrency when I add more brokers, connect instances, tasks in the connector…

I will update this thread when I try @gianlucanatali idea, and I will bring my question about the sink connector to the kafka-connect forum (See: Send messages to sink connector only if value has changed)

About the questions:

Is the total number of “lights” fixed?

No, and there could be more elements like doors, etc. Their state could be more complex than ‘on’ and ‘off’ such as ‘broken’ or ‘uninstalled’

Do you have a test case that someone could use to reproduce the environment? Something like a docker-compose / script

I will try to provide that in the next update.

Thanks again for your help!