How to filter on key column in ksqldb?

I have a source table that looks like this:

+------------------------------------------------------------+------------------------------------------------------------+
|ROWTIME                                                     |id                                                          |
+------------------------------------------------------------+------------------------------------------------------------+
|1718204168745                                               |"2337"                                                      |
|1718101864313                                               |"1337"                                                      |
|1718265930643                                               |"3337"                                                      |
|1718204113848                                               |2337                                                        |
|1718204231979                                               |"2337"                                                      |
|1718263392544                                               |3337                                                        |

The id column is the primary key.

I try to query the data and filter on the id

SELECT RowTime, `id` FROM `Data` WHERE `id` = '"2337"';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
|1718204231979                                   |"2337"                                          |
Query terminated
SELECT RowTime, `id` FROM `Data` WHERE `id` = '"1337"';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
Query terminated
SELECT RowTime, `id` FROM `Data` WHERE `id` = '"3337"';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
|1718265930643                                   |"3337"                                          |
Query terminated
SELECT RowTime, `id` FROM `Data` WHERE `id` = '2337';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
Query terminated
SELECT RowTime, `id` FROM `Data` WHERE `id` = '"2337"';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
|1718204231979                                   |"2337"                                          |
Query terminated
```sql
SELECT RowTime, `id` FROM `Data` WHERE `id` = '3337';
+------------------------------------------------+------------------------------------------------+
|ROWTIME                                         |id                                              |
+------------------------------------------------+------------------------------------------------+
Query terminated

When I export the messages from the topic using confluent cloud I see this

partition	offset	timestamp	timestampType	key	value	headers	exceededFields
2	9	1718265930643	"CREATE_TIME"	"3337"	{"id":"3337"}	[]	null
5	18	1718263392544	"CREATE_TIME"	3337	{"id":"3337"}	[]	null
5	17	1718204231979	"CREATE_TIME"	"2337"	{"id":"2337"}	[]	null
0	15	1718204168745	"CREATE_TIME"	"2337"	{"id":"2337"}	[]	null
2	8	1718204113848	"CREATE_TIME"	2337	{"id":"2337"}	[]	null
3	34	1718203440401	"CREATE_TIME"	"1337"	""	            []	null

I have produced some messages using the confluent cloud UI and some using confluent-cli

confluent kafka topic produce dev-streaming --parse-key --key-format string
Starting Kafka Producer. Use Ctrl-C to exit.
3337:{"id":"3337"}

Why do I have the same key in different partitions and why can’t I filter on some of the ids?

It looks like I can filter on the ids hat I entered using the UI but not the ones i entered using confluent-cli.

Partitioning is not standardized in Kafka, and different producers may use a different default partitioner. The cloud UI uses the Java producer while the Confluent CLI is based on librdkafka. Thus, both (by default) pick different partitions – Java producer is using murmur2 hashing by default, while librdkafka uses CRC32 by default.

ksqlDB also assumes that data is produced with murmur2 hashing, which is used by the Java producer, thus it finds the data written with the cloud UI, but misses the records written with the CLI, because it’s looking in the wrong partition…

You should ensure that data processed by ksqlDB is partitoned using murmur2 hashing.

1 Like

I was assuming that it might be some hash related problem.
This explains that the hash of the string in my WHERE does not match the CRC32 hashed string because it uses murmur2 for that string.

But why can I see the CRC32 hashed keys as strings in my table at all.

Is this documented in the ksqldb documentation? I can’t remember to have read something there.

This should be really highlighted in the documentation.

I mean I have used the vendors own tools to arrive in this self inflicted mess.

Depend on the query… If you do a SELECT * FROM Data; ksqlDB just does a full table scan, ie, read all data from all partitions. If you add a key = xxx ksqlDB hash xxx to find a single partition and only looks to find the rows there… (but if the hashing does not align, it looks in the “wrong” partition…)

Is this documented in the ksqldb documentation? I can’t remember to have read something there.

I don’t think it is…

This should be really highlighted in the documentation.

Agreed. I’ll file a ticket for it.

I mean I have used the vendors own tools to arrive in this self inflicted mess.

:100:

Ksqldb should have a function where you can specify the hash algorithm to use.

WHERE `id` = HASH('1337', CRC32) OR `id` = HASH('1337', MURMUR2)

Feel free to file a ticket: Issues · confluentinc/ksql · GitHub

But I believe it would need to be a property of the WITH clause in a CS or CT statement?

Wouldn’t that set the table or stream to just one hash kind?

As a data consumer I shouldn’t care and control how the key hash is created but because there is no standard I have to take that into account when I want to get the data. Especially that different producers use different hash algorithms on the same topic

Being able to set that on the query itself would be great.

Wouldn’t that set the table or stream to just one hash kind?

Well yes, but a single stream/table can only have one type of a hash. Don’t confuse it with a query which might process more than one input stream/table. But it the stream/table has the hash specified, the query can pick it up automatically, right?

Especially that different producers use different hash algorithms on the same topic

That is something you should not do. It would break a lot of things. I agree that it is not ideal that it’s all loose, but this is how Kafka works. Has actually nothing to do with ksqlDB.

Being able to set that on the query itself would be great.

Cf. my comment above. It should actually be simpler to set it on the stream/table definition, which allow the query to pick it up automatically. If you have more then one query on a single stream/table, you would also only need to specify it ones, when defining the stream/table, instead of for every single query.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.