Hi,
I’m using KSQLDB on a Confluent 5.5.1 installation using docker containers.
I have a source connector that receives status messages from a server and saves them in a topic called STATUS. It looks like this:
ELEM |VALUE
-------+-----
Light1 |on
Light1 |off
Light3 |on
Light2 |off
What I want is to create an output topic, so a sink connector could send messages to another server. This topic will have
only a boolean value that will be true only if Light1, Light2 and Light3 are ‘on’, false otherwise.
So it would look like this:
ELEMENT|VALUE ALL_LIGHTS_ON
-------+----- -------------
Light1 |off -> [ ][ ][ ] -> false
Light2 |on [ ][O][ ]
Light2 |off [ ][ ][ ]
Light2 |on [ ][O][ ]
Light3 |on [ ][O][O]
Light1 |on -> [O][O][O] -> true
Having the state of ALL_LIGHTS_ON changing only two times, I expect the sink connector to only send two messages to the output server.
I modelled first a stream to read from my topic, it looks like this:
CREATE STREAM STR_STATUS (
ELEMENT VARCHAR,
VALUE VARCHAR
) WITH (
kafka_topic='STATUS',
value_format='JSON_SR'
);
Then I created a table to have pairs element value, having the last known value of every element:
CREATE TABLE TBL_STATUS AS
SELECT
ELEMENT,
LATEST_BY_OFFSET(VALUE) AS VALUE
FROM
STR_STATUS
GROUP BY ELEMENT
EMIT CHANGES;
But I’m pretty much stuck there, I don’t know how to create this condition. In regular SQL one may do something like that using self joins but my version of KSQLDB does not support this.
So I was wondering if this could be possible to achieve using KSQLDB (even upgrading to a newer version) or should I solve this problem in another part of the architecture? (For example filtering in the Sink connector every message that cannot meet the requirementes of the condition).
Any help is welcome, thanks.