Hi all,
I’m encountering an issue using latest_by_offset()
where empty events are sometimes generated in my stream. Here’s the scenario:
{‘key’: ‘0’, ‘x’: ‘A’}
{‘key’: ‘0’, ‘x’: ‘A’}
{‘key’: ‘0’, ‘x’: ‘B’}
{‘key’: ‘0’, ‘x’: ‘A’}
{‘key’: ‘0’, ‘x’: ‘A’}
I want to capture events only when there is a transition between values, like when ‘A’ changes to ‘B’ and vice versa. For the example above, I expect only two events:
{‘key’: ‘0’, ‘curr_x’: ‘B’}
{‘key’: ‘0’, ‘curr_x’: ‘A’}
my current ksqlDB setup looks like:
CREATE STREAM input_s (
key VARCHAR KEY,
x VARCHAR)
WITH (kafka_topic='input');
CREATE TABLE change_t
WITH (kafka_topic='changed') AS
SELECT
s.key AS id,
latest_by_offset(s.x) AS curr_x
FROM input_s AS s
GROUP BY s.key
HAVING latest_by_offset(s.x) <> latest_by_offset(s.x, 2)[1];
After a value transition (e.g., the last event {'key': '0', 'x': 'A'}
), an empty event is created in the changed
topic. I would like to prevent this behavior and ensure only valid transitions are emitted.
(SELECT * FROM aggregated_t EMIT CHANGES;
shows a record with ‘<TOMBSTONE>’ as value for ‘xcurr_’.)
Does anyone have suggestions on how to fix this?
Thanks in advance for your help!
Best,
Oliver