Creating a table with a defined key contained in structured JSON

Good morning, I’m trying to create a KSQL table from a structured JSON topic.
A sample message looks like this:

{
“MessageType”: “TrailerMaintenanceInfo”,
“Payload”: {
“TrailerCompany”: “99”,
“TrailerNumber”: “AA109T”,
“LastMaintenanceDate”: “2021-11-08T00:00:00Z”,
“UtcDateTimeOfEventMessage”: “2022-12-21T20:05:39Z”
}
}

I’d like my key to be a concatination of the TrailerCompany and TrailerNumber columns, something like 99*AA109T or some such.

Does anyone know of a way to do this within the table creation?
One of my many attempts was as follows: (this didn’t work)

Create TABLE tbl_TrailerChanges
(
MessageType varchar
,Payload STRUCT <
TrailerCompany VARCHAR
,TrailerNumber VARCHAR
,LastMaintenanceDate VARCHAR
,UtcDateTimeOfEventMessage VARCHAR
>
) WITH (
KAFKA_TOPIC = ‘dev.DataChange.Trailers’
,VALUE_FORMAT = ‘JSON’
,KEY = ‘Payload->TrailerCompany’ + ‘*’ + ‘Payload->TrailerNumber’
);

I also added PRIMARY KEY to the TrailerNumber line, just to see if I could get part of the key defined, but it appears you cannot define a Primary Key from data inside a structure.

Thanks for your help.

You would need to stream process with partition by on the result of those two fields merged, put the result in another topic. Create table on that topic with Key_format=‘kafka’.

1 Like

In ksqlDB, creating a STREAM or TABLE implies that you process the data as-in. A schema definition must follow what is in the topic – it does not (and cannot) alter your data layout. Your scheme definition tells ksqlDB how the data is in fact laid out to make it accessible.

If you want to define a TABLE with a certain PK, the corresponding data must be in the records key to begin with. Thus, in your case, as already mentioned by @shubhamshirur, you need to pre-process the data: you can define a STREAM that follows the data layout and issue a CSAS query to write the data in the desired layout into a second topic and create the TABLE from the second topic.

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.