I just started to use the ksqlDB Confluent feature, and it stood out that it is not possible to proceed with the following command: CREATE TABLE AS SELECT A, B, C FROM [STREAM_A] [EMIT CHANGES];
I wonder why this is not possible or if there’s a way of doing it? Data aggregation here is feeling a heavy process to a simple solution.
The source is a STREAM and not a TABLE. The field types are:
String
Integers
Record
Let me share an example of the executed command that returns an error as a result.
CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
, timestamp
, servicename
, content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
[EMIT CHANGES];
@rmoff - Ticket created here to continue the discussion started on Stack Overflow - HERE
Additionally to everything discussed in the stack overflow ticket, I’m starting to wonder if the concept of a Table in ksqlDB is the same as a table in, for example, BigQuery.
Would be able to clarify this, please?
Hey @dsboudart, thanks for bringing the question here. Let me re-post my StackOverflow answer for completeness and then we can take it from there.
There are two are different types of object in ksqlDB:
A STREAM is an unbounded series of events - just like a Kafka topic. The only difference is that a STREAM has a declared schema.
A TABLE is state, for a given key. It’s the same as KTable in Kafka Streams if you’re familiar with that.
Both are backed by Kafka topics.
So you can do this - note that it’s creating a STREAM not a TABLE
CREATE STREAM test_stream
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
, timestamp
, servicename
, content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL;
If you really want to create a TABLE then use the LATEST_BY_OFFSET aggregation, assuming you’d using id as your key:
CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
, LATEST_BY_OFFSET(timestamp)
, LATEST_BY_OFFSET(servicename)
, LATEST_BY_OFFSET(content->assignedcontent)
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
GROUP BY id;
So, over to you - perhaps you can explain why it is you think you need to create a TABLE and not a STREAM? Can you illustrate with actual data - what data have you got coming in, and how are you wanting to access it after processing with ksqlDB?