I’m making some test with Kafka and KSQL and I want to know how to deal with duplicate keys. Let me show you my test case:
Firstly, I created a topic named as ACCOUNT with a key “id” and some test data with the same key was pushed to the topic.
And after this I created a Table with this command:
CREATE TABLE accounts
(ID VARCHAR PRIMARY KEY, number VARCHAR, BRANCH VARCHAR, bank VARCHAR, balance double, owner VARCHAR)
WITH (KAFKA_TOPIC='account', VALUE_FORMAT='JSON');
The problem is: a select with “select * from accounts where id = ‘7’ EMIT CHANGES;” shows repeated rows.
What I need to do to keep only the last row based on the primary key?
That table really does contain only one row for each unique key value. When you’re inspecting the table by doing that SELECT at the ksqlDB CLI (which I assume is what you’re doing—set me straight if that’s not the case), you will see updates to that row (say, where id=7) as the row in the table changes. Those updates are not additional rows having that same key, but the same row with non-key values having changed.
I understand the explanation, yes I’m using ksqldb-cli and control-center to test my querys, in the ksqldb-cli I close the terminal and open again, and at this time the select return the row two times and any update was pushed to the topic.
Appear to me that the “primary key” value is different. See my print, during this select any update occurred and appear two rows.
As Tim said, when you use EMIT CHANGE clause, you do not query a table snapshot, but you get the changelog stream of the table from its initial empty state up to now (you issue a so-called push query). Thus, the result of your query is a stream of updated to the table!
ksqlDB also support key-lookups against table snapshots: for this case, you would need to materialize the input topic by reading it as a STREAM, and applying LATEST_BY_OFFSET aggregation function. You can issue so-called pull queried against the result table of the stream aggregation query afterwards.
-- create an input STREAM from the topic
-- note: for a STREAM you use KEY instead of PRIMARY KEY
CREATE STREAM accountStream
(ID VARCHAR KEY, number VARCHAR, BRANCH VARCHAR, bank VARCHAR, balance double, owner VARCHAR)
WITH (KAFKA_TOPIC='account', VALUE_FORMAT='JSON');
-- aggregate the stream into a table
CREATE TABLE accounts AS
SELECT
id,
LATEST_BY_OFFSET(number),
LATEST_BY_OFFSET(BRANCH),
... -- repeat for all columns
FROM accountStream
GROUP BY id;
-- issue a pull query,
-- ie, a single row lockup against the latest table snapshot
SELECT * FROM accounts WHERE id = '7';