Table not filtering by key

Hi,

I’m facing an issue that for sure I’m doing something wrong, but I have no idea what.

I’ve created a table from a topic that I’m feeding from a python microservice. Also I am sending a key. Then I created a table like that:

CREATE TABLE eb_chat_session (ID_PK VARCHAR PRIMARY KEY, ID_USER VARCHAR, EMAIL VARCHAR, ID_SESSION VARCHAR, TYPE_CHAT VARCHAR, QUESTION_INDEX VARCHAR, FROM_BOT BOOLEAN, FIRST_NAME VARCHAR) WITH (KAFKA_TOPIC='eb_chat_session', VALUE_FORMAT='JSON');

Description:

Name                 : EB_CHAT_SESSION
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : eb_chat_session (partitions: 25, replication: 1)
Statement            : CREATE TABLE EB_CHAT_SESSION (ID_PK STRING PRIMARY KEY, ID_USER STRING, EMAIL STRING, ID_SESSION STRING, TYPE_CHAT STRING, QUESTION_INDEX STRING, FROM_BOT BOOLEAN, FIRST_NAME STRING) WITH (KAFKA_TOPIC='eb_chat_session', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

 Field          | Type                           
-------------------------------------------------
 ID_PK          | VARCHAR(STRING)  (primary key) 
 ID_USER        | VARCHAR(STRING)                
 EMAIL          | VARCHAR(STRING)                
 ID_SESSION     | VARCHAR(STRING)                
 TYPE_CHAT      | VARCHAR(STRING)                
 QUESTION_INDEX | VARCHAR(STRING)                
 FROM_BOT       | BOOLEAN                        
 FIRST_NAME     | VARCHAR(STRING)                
-------------------------------------------------

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic eb_chat_session)

And then I’ve created a queryable table:

CREATE TABLE EB_SESSION_MAT_TABLE AS SELECT * FROM EB_CHAT_SESSION;

Now the problem:

When I try to filter by ID_PK, which is the key, I get nothing:

SELECT * FROM EB_SESSION_MAT_TABLE WHERE ID_PK = 'spaces/nMdVCYAAAAE';

But if I filter by ID_SESSION which contains the same value of ID_PK I get a row:

SELECT * FROM EB_SESSION_MAT_TABLE WHERE ID_SESSION = 'spaces/nMdVCYAAAAE';

I’d would like to undertand why I’m not able to filter by KEY.

An update here:

If I use ‘=’ it does not bring nothing, but if I use ‘!=’ or ‘<>’ it brings everthing different of the specified value, as it should be, of course:

SELECT * FROM EB_SESSION_MAT_TABLE WHERE ID_PK <> 'spaces/nMdVCYAAAAE';

Thanks

I am wondering if the format of the key is not a string (but a JSON primitive for string). In other words “foo” vs foo in the key.

Can you provide the contents of a message from the original topic?

Do something like this where you get the key and value and have a byte deserializer for the key in case there is any non-ascii characters (e.g. someone stored avro in the key and 0x00,0x00,0x00,0x00,0x01 exists in the key – that the string deserializer wouldn’t show)

alias kcc='kafka-console-consumer \
        --bootstrap-server localhost:19092 \
        --key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer  \
        --property print.key=true \
        --property key.separator="|" \
        --from-beginning \
        --topic'

Hi Niel. Thanks for your answer.

I have some news about this problems but first I’m going to show the results you asked me about:

When running the command you sent me I get this:

[appuser@broker ~]$ kafka-console-consumer --bootstrap-server broker:9092 –key-deserializer=org.apache.kafka.common.serialization.BytesDeserializer --property print.key=true --property key.separator="|" –from-beginning --topic eb_chat_session 
spaces/2FD6GiBBBE|{"id_user": "1", "email": "email@email.com.br", "id_session": "spaces/2FD6GiBBBBE", "type_chat": "gchat1", "question_index": "", "from_bot": true, "first_name": "Marcel Fenerich"}

And an extra view when printing it from KSQLDB-CLIENT:

print 'eb_chat_session'
rowtime: 2021/12/29 13:16:32.411 Z, key: [sp@5200935470741143877/7017564189765429325], value:  {"id_user": "1", "email": "email@email.com.br", "id_session": "spaces/2FD6GiBBBBE", "type_chat": "gchat", "question_index": "", "from_bot": true, "first_name": "Marcel Fenerich"}, partition: 11

I realised that when using the function AS_VALUE() I’m able to get the result, like this:

select * from EB_SESSION_MAT_TABLE where AS_VALUE(ID_PK) = 'spaces/2FD6GiBBBBE';

Another way it works is using the function ELT():

SELECT * FROM EB_SESSION_MAT_TABLE WHERE ELT(1,ID_PK) = 'spaces/2FD6GiBBBBE';

It should be ok for me, but unfortunately I cannot use this kind of function on keys when I’m trying to join a table and a stream for example:

SELECT * FROM eb_incoming_messages_stream EIMS JOIN eb_chat_session ECS ON AS_VALUE(ECS.ID_PK) = EIMS.ID_SESSION EMIT CHANGES;

Invalid join condition: stream-table joins require to join on the table's primary key. Got AS_VALUE(ECS.ID_PK) = EIMS.ID_SESSION.

Just to make it clear it is important to say that I’m producing this keyed events from a python code by using the confluent-kafka-python plugin:

_p.produce(topic=_topic,value=json.dumps(data).encode('utf-8'),callback=delivery_report,key=_key)

I made sure the _key variable is an string.

Cheers

the key of the topic looks odd to me

key: [sp@5200935470741143877/7017564189765429325]

is python somehow sending this with an array serde

I would have expected key: spaces/2FD6GiBBBBE

can you check the datatype of _key that you are sending in with the producer?

1 Like

Also make sure that the partitioning is correct on write. ksqlDB assumes that input data is partitioned using the default partitioning from the Java client. If you are using Confluent’s Python producer, it uses a different partitioning by default though and thus ksqlDB might not be able to find the record, as it would hash to a different partition.

Using AS_VALUE basically changes your query from a key-lookup (using hashing) to a full table scan, and that’s why the record is found (all shards will be queried). Similarly, the query using <> does a full table scan (instead of a key-based index lookup).

If this is the root cause, you should change the producer partitioner to match the Java one.

@nbuesing I just checked it out and it is a STRING.

@mjsax Thanks for explaining what is going on with the queries stuffs, it was very enlightening. Now, about the partitioning I will check it out. I still not sure how to do that.

I was looking for some docs and I came out with this:

About the Confluent’s Python producer I understand that the default partitioner uses something like this: partition = hash(key) % num_partitions, which is cool.

About the Java Client I was not able to find anything, also this link Overview (ksqldb-api-client 0.27.2 API) is not working at the moment, and I did not find this specific information in others places.

But when looking at API’s docs of the python client I cannot find where I could change the default producer partitioner.

The topic I’m producing has 25 partitions.

Could you please help me to undertand how I could match this partitioner strategies?

About:

partition = hash(key) % num_partitions

The question is what hash function is used. The Java client uses murmur2 hashing by default, but the Python client is using consistent_random by default.

You can change the partitioner via config partitioner by setting it to murmur2. Cf librdkafka/CONFIGURATION.md at 2b76b65212e5efda213961d5f84e565038036270 · edenhill/librdkafka · GitHub

That is absolutely right.

The root of the problem was the partitioner.

In the case when you are instanciating the producer object you need to set the partitioner to mumur2.

producer_setup_data = {'bootstrap.servers': 'localhost:9092','partitioner':'murmur2'}
_p = Producer(producer_setup_data)

Thank you a lot.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.