Equivalent of SQL Union in KSQLDB

Hello,
I have a requirement to create a table by creating multiple aggregates on a KSQL stream.
Eg.

CREATE TABLE AGGREGATES AS
SELECT GENDER AS ATTRIBUTE, COUNT() AS COUNTS FROM PERSON_STREAM GROUP BY GENDER
UNION
SELECT CITY AS ATTRIBUTE, COUNT(
) AS COUNTS FROM PERSON_STREAM GROUP BY CITY

I am not able to figure out how something like this be achieved in KSQLDB. I do not want to create an separate table for each attribute. Thanks a lot in advance.

Hi @n.kant I think one way to solve this is to create your “aggregated” stream firs, and use the INSERT INTO SELECTsyntax from all the different queries you wish to UNION. The you can run your create table with group by… Have a look at this tutoria: How to merge many streams into one stream using ksqlDB

Let us know how it goes!

Hi Gianluca,
Thanks a lot for coming back to me on this. I just though of bringing in a simple example to demonstrate what we are trying to achieve.

SELECT * FROM SAMPLESTREAM EMIT CHANGES;

EMPID EMPNAME EMPCITY EMPDEPARTMENT
1 Peter London Dev
1 Sam London Dev
3 Nick London Dev
4 Arthur Oxford Test
4 Dave Oxford Test
5 Michelle Oxford Test
6 Ramesh Oxford Test
7 Mariam Oxford Test

what we want to achieve in a single KSQLDB table is:

ATTRIBUTE_TYPE ATTRIBUTE_NAME COUNTS
CITY London 3
CITY Oxford 5
DEP Dev 3
DEP Test 5

In a typical database env we will do

SELECT ‘CITY’ AS ATTRIBUTE_TYPE , EMPCITY AS ATTRIBUTE_NAME, COUNT() AS COUNTS FROM SAMPLESTREAM GROUP BY EMPCITY
UNION
SELECT ‘DEPT’ AS ATTRIBUTE_TYPE , EMPDEPARTMENT AS ATTRIBUTE_NAME, COUNT(
) AS COUNTS FROM SAMPLESTREAM GROUP BY EMPDEPARTMENT

I am bale to create two separate KSQL tables

CREATE TABLE CITY_COUNT
AS
SELECT ‘CITY’ AS ATTRIBUTE_TYPE
,EMPCITY AS ATTRIBUTE_NAME
,COUNT(*) AS COUNTS
FROM SAMPLESTREAM GROUP BY EMPCITY
EMIT CHANGES;

Results:

ATTRIBUTE_TYPE ATTRIBUTE_NAME COUNTS
CITY London 3
CITY Oxford 5

and

CREATE TABLE DEPT_COUNT
AS
SELECT ‘DEPT’ AS ATTRIBUTE_TYPE
,EMPDEPARTMENT AS ATTRIBUTE_NAME
,COUNT(*) AS COUNTS
FROM SAMPLESTREAM GROUP BY EMPDEPARTMENT
EMIT CHANGES;

Result:

ATTRIBUTE_TYPE ATTRIBUTE_NAME COUNTS
DEP Dev 3
DEP Test 5

Based on your suggestion not sure how can I create an aggregated stream from stream SAMPLESTEAM as when I try TO CREATE an aggregated stream using the query below:

CREATE STREAM CITY_COUNT_STREAM
AS
SELECT ‘CITY’ AS ATTRIBUTE_TYPE
,EMPCITY AS ATTRIBUTE_NAME
,COUNT(*) AS COUNTS
FROM SAMPLESTREAM GROUP BY EMPCITY
EMIT CHANGES;

I get the error

“Could not determine output schema for query due to error: Invalid result type. Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.”

Look forward to hearing from you.

Hi @n.kant !
One way to do this is to create first the stream (without the group by) and then create a table out of that stream with the group by?SMG like:

CREATE STREAM all_attrs (ATTRIBUTE_TYPE VARCHAR, ATTRIBUTE_NAME VARCHAR)
WITH (kafka_topic=‘all_attrs’, partitions=3, value_format=‘avro’);

INSERT INTO all_attrs SELECT ‘CITY’ AS ATTRIBUTE_TYPE, EMPCITY AS ATTRIBUTE_NAME FROM SAMPLESTREAM ;

INSERT INTO all_attrs SELECT ‘DEP’ AS ATTRIBUTE_TYPE, EMPNAME AS ATTRIBUTE_NAME FROM SAMPLESTREAM ;

THEN you can create the table with the counts:

CREATE TABLE COUNT_BY_ATTRTYPE
AS
SELECT ATTRIBUTE_TYPE
,ATTRIBUTE_NAME
,COUNT(*) AS COUNTS
FROM all_attrs GROUP BY ATTRIBUTE_TYPE, ATTRIBUTE_NAME
EMIT CHANGES;

I didn’t test the code, so there could very well be typos :slight_smile: Let us know how it goes!

This topic was automatically closed after 30 days. New replies are no longer allowed.