When I create a table and specify a primary key it is not being used

I create a table with

CREATE TABLE CUSTOMERS (IMAGENID STRING PRIMARY KEY, FQDN STRING) WITH (KAFKA_TOPIC='IMAGEN', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

My Topic has ImagenID variable
But when I look at the table, it has not used that and has instead used a different string for the ID which looks like a GUID

And example topic message is

{
  "ImagenID": "25AjTwFdI0hmHFL2hMJF",
  "UserID": 3275,
  "Category": "Search",
  "OrganisationID": 0,
  "Action": "View",
  "SubAction": null,
  "ObjectID": 0,
  "ObjectType": null,
  "ObjectOrg": 0,
  "FQDN": "www.someurl.com",
  "Referrer": "https://www.someurl.com/search/results",
  "timestamp": 1648115728000,
  "intField_0": 1,
  "intField_1": 0,
  "intField_2": 0,
  "intField_3": 0,
  "intField_4": 0,
  "intField_5": 0,
  "stringField_0": "search title",
  "stringField_1": null,
  "stringField_2": null,
  "stringField_3": null,
  "stringField_4": null,
  "stringField_5": null,
  "Values": [],
  "id": "2a4a3baa-51d6-42f1-ba4a-aea016c8b549",
  "_rid": "TpIGAKCPadX8Q3IEAAAAAw==",
  "_self": "dbs/TpIGAA==/colls/TpIGAKCPadU=/docs/TpIGAKCPadX8Q3IEAAAAAw==/",
  "_etag": "\"8400d9ea-0000-0d00-0000-623c40100000\"",
  "_attachments": "attachments/",
  "_ts": 1648115728,
  "_lsn": 75688459
}

I am trying to create a table which shows the latest FQDN for a given ImagenID

An example row from the table is (not the same row as record above)

|"\"849bd7a7-3ed3-48d3-9b20-dd5e521737c2\""                |www.someurl.com                                   |

If you’re creating the table against an existing topic then the PRIMARY KEY DDL is telling ksqlDB to use the key of the Kafka message as the value for IMAGENID field.

Thanks, although I am not sure I understand

So how can I make a table and choose the primary key?

It depends how that topic is populated. If you are populating it, then populate ImagenID into the key of the message instead of the value. If you don’t have that option then you can rekey it in ksqlDB:

CREATE STREAM CUSTOMERS_STREAM (IMAGENID STRING, FQDN STRING) WITH (KAFKA_TOPIC='IMAGEN', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON');

SET 'auto.offset.reset' = 'earliest';

CREATE TABLE CUSTOMERS AS
    SELECT IMAGENID, LATEST_BY_OFFSET(FQDN) AS FQDN
    FROM CUSTOMERS_STREAM
    GROUP BY IMAGENID;

The key thing to remember (sorry…) is that a ksqlDB table represents the latest value(s) for a given key. The way that this is physically stored in Kafka is as a message on a topic in which the key is the message key. So if you are taking an existing topic, the message key must be the key of the table that you are declaring over it. If it’s not, you can use ksqlDB to wrangle it into the form you need.

Great. Thanks for your help

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.