Aggregate on value from inside nested struct

Trying the follow
Want to count sales per terminal, window by hour, once got this working can add more fields like store name also…

CREATE TABLE json_sales_per_terminal_point WITH (KAFKA_TOPIC='json_sales_per_terminal_point',
	store->id as store_id,
	TerminalPoint as terminal_point,
    count(1) as sales_per_terminal
FROM json_salescompleted1
group by store->id , TerminalPoint

Example record

 	"InvoiceNumber": "1341243123341232",
	"SaleDateTime": "2024-06-11T16:53:39.911+02:00",
	"SaleTimetamp": "1718117619911",
	"Store" : {
		"Id": "2143412",
		"Name": "sdfgsjdjndnjdfgs"
	"Clerk": {
		"Id": "231",
		"Name": "grfvnowifgbvuwe"
	"TerminalPoint": "124",
			"Id": "234123412",
			"Name": "",
			"Brand": "fgtwruyergfd",
			"Category": "something",
			"Price": 12412.00,
			"Quantity": 3
	"Net": 442.23,
	"VAT": 10.00,
	"Total": 452.23,
	"PayDateTime": "2023-12-12-T13:22:37.000+02:00",
	"PayTimetamp": "1718117619911",
	"Paid": 452.23,
	"FinTransactionID": "42dfgt245wsdg34231rfwfg234234"


Key format does not support schema.
format: KAFKA
schema: Persistence{columns=[STORE_ID STRING KEY, TERMINAL_POINT STRING KEY], features=}
reason: The ‘KAFKA’ format only supports a single field. Got: [STORE_ID STRING KEY, TERMINAL_POINT STRING KEY]

tl;dr it should work if you change VALUE_FORMAT='JSON' to FORMAT='JSON' so that JSON is used for both the key and value.

See the docs here on how primary key gets determined. (Bit confusing that it’s under the Joins doc.) Basically, because you are grouping by store->id , TerminalPoint, they get used as the composite primary key. The problem is that the (default) KAFKA key format only supports primitive single field data types so that’s why you hit this error.

1 Like


knew my idea was sound… it was the how to build, for it to make sense.
made changes as per above and now getting the basic data.
Need to expand a bit, change the date format to something useful… and then imply an order of sort, oldest to newest and then within each window the store_id together and then terminals in order. or something like that.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.