KSQLDB table with a condition that involves several messages of a stream

Hi,

I’m using KSQLDB on a Confluent 5.5.1 installation using docker containers.

I have a source connector that receives status messages from a server and saves them in a topic called STATUS. It looks like this:

ELEM   |VALUE
-------+-----
Light1 |on
Light1 |off
Light3 |on
Light2 |off

What I want is to create an output topic, so a sink connector could send messages to another server. This topic will have
only a boolean value that will be true only if Light1, Light2 and Light3 are ‘on’, false otherwise.

So it would look like this:

ELEMENT|VALUE                      ALL_LIGHTS_ON
-------+-----                      -------------
Light1 |off    ->  [ ][ ][ ]    -> false
Light2 |on         [ ][O][ ]
Light2 |off        [ ][ ][ ]
Light2 |on         [ ][O][ ]
Light3 |on         [ ][O][O]
Light1 |on     ->  [O][O][O]    -> true

Having the state of ALL_LIGHTS_ON changing only two times, I expect the sink connector to only send two messages to the output server.

I modelled first a stream to read from my topic, it looks like this:

CREATE STREAM STR_STATUS (
	ELEMENT VARCHAR,
	VALUE VARCHAR
) WITH (
	kafka_topic='STATUS',
	value_format='JSON_SR'
);

Then I created a table to have pairs element value, having the last known value of every element:

CREATE TABLE TBL_STATUS AS
	SELECT
   		ELEMENT,
		LATEST_BY_OFFSET(VALUE) AS VALUE
	FROM
		STR_STATUS
	GROUP BY ELEMENT
	EMIT CHANGES;

But I’m pretty much stuck there, I don’t know how to create this condition. In regular SQL one may do something like that using self joins but my version of KSQLDB does not support this.

So I was wondering if this could be possible to achieve using KSQLDB (even upgrading to a newer version) or should I solve this problem in another part of the architecture? (For example filtering in the Sink connector every message that cannot meet the requirementes of the condition).

Any help is welcome, thanks.

N.b. New versions of ksqlDB are released frequently and I’d recommend generally aiming to run the latest particularly if you’re just at the dev stage with it. There’s a Docker container confluentinc/ksqldb-server:0.20.0 that you can use in place of the Confluent Platform version you may have.

I had a play around with your question. I think the route you’re taking is the right one, and you then need an aggregation of the table to use COLLECT_SET from which you could do a check to see if it was [on, on, on]. However - that’s not possible in ksqlDB at the moment

ksql> SELECT 1, COLLECT_SET(VALUE) FROM TBL_STATUS GROUP BY 1 EMIT CHANGES;
The aggregation function collect_set cannot be applied to a table source, only to a stream source.

I’ll keep on looking at this to see if I can think of another route - but it may be you need to do this programatically (e.g. with Kafka Streams)

1 Like

Hi there, interesting usecase!
Is the total number of “lights” fixed?
I didn’t have time to try this, so not sure if this helps but what if:

  • You send the event as 1 or 0 (1 is on 0 is off)
  • In the group by you do both a SUM and A COUNT (so you know how many are on and total number of lights for a specific “room”)
  • You create a new stream out of the topic created by the table, and filter for the condition where SUM=COUNT (all lights on) . You add a constant to the stream that says TRUE
  • You create another stream out of the topic created by the table, and filter for the condition where SUM=0 (No lights on) . You add a constant to the stream that says FALSE

Or maybe even a more sophisticated CASE statement…
Do you have a test case that someone could use to reproduce the environment? Something like a docker-compose / script?

1 Like

Thanks @rmoff and @gianlucanatali for your answers.

Before reading your answers I could advance a bit in the problem.

Actually I receive more than the lights, I receive also the buildings where that lights are placed, so I got something like this:

BUILDING |ELEMENT|VALUE
---------+-------+-----
Building1|Light1 |on
Building2|Light1 |broken
Building1|Light3 |off

So I thought that the only way I could filter by the three lights could be if the information is sorted this way:

BUILDING |Light1|Light2|Light3
---------+------+------+------
Building1|on    |off   |off
Building2|off   |broken|off

That way is trivial to filter: WHERE BUILDING = ‘Building1’ AND Light1 = ‘on’ AND Light2 = ‘on’ AND Light3 = ‘on’

So I transformed the info this way:

CREATE STREAM STR_STATUS (
	BUILDING VARCHAR,
	ELEMENT VARCHAR,
	VALUE VARCHAR
) WITH (
	kafka_topic='STATUS',
	value_format='JSON_SR'
);

CREATE TABLE TBL_LIGHT1 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light1'
		GROUP BY BUILDING
	EMIT CHANGES;

CREATE TABLE TBL_LIGHT2 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light2'
		GROUP BY BUILDING
	EMIT CHANGES;
	
CREATE TABLE TBL_LIGHT3 AS
	SELECT
		BUILDING,
		LATEST_BY_OFFSET(VALUE) VALUE
	FROM
		STR_STATUS
	WHERE
		ELEMENT = 'Light3'
		GROUP BY BUILDING
	EMIT CHANGES;
	
CREATE TABLE TBL_ALL_LIGHTS_ON AS
	SELECT
		T1.BUILDING AS BUILDING,
		CASE
			WHEN T1.VALUE = 'on' AND T2.VALUE = 'on'
			THEN true
			ELSE false
		END AS ALL_LIGHTS_ON
	FROM TBL_LIGHT1 T1
	INNER JOIN TBL_LIGHT2 T2
	ON T1.BUILDING = T2.BUILDING
	EMIT CHANGES;

Note the join in the last table I have only two lights, because my current version of KSQLDB does not allow more joins (I think I can solve this simply updating to a newer version, thanks for the tip about the docker image @rmoff!).

Note also about the new field building, without it you will need to do as @rmoff did (select 1 … group by 1).

Also it seems like a bit of an overkill to create a table for every Light, as there can be more elements like doors, sensors, etc.

I have yet to try what @gianlucanatali proposed, but with this tables I created I managed at least to create a table which
tells me if all lights (well, at the moment 2 of them) are on.

So I connected a Sink connector to it, but I was hoping that only records with different values would be sent, but actually
all records are sent.

I was thinking in making a custom Transformation that would check if current value is the same than the last value, and only allow sending it if they are different, but again I don’t know if there is someway to do this easier. I’m also concerned with this because of concurrency when I add more brokers, connect instances, tasks in the connector…

I will update this thread when I try @gianlucanatali idea, and I will bring my question about the sink connector to the kafka-connect forum (See: Send messages to sink connector only if value has changed)

About the questions:

Is the total number of “lights” fixed?

No, and there could be more elements like doors, etc. Their state could be more complex than ‘on’ and ‘off’ such as ‘broken’ or ‘uninstalled’

Do you have a test case that someone could use to reproduce the environment? Something like a docker-compose / script

I will try to provide that in the next update.

Thanks again for your help!