Create Table without data aggregation

I just started to use the ksqlDB Confluent feature, and it stood out that it is not possible to proceed with the following command: CREATE TABLE AS SELECT A, B, C FROM [STREAM_A] [EMIT CHANGES];

I wonder why this is not possible or if there’s a way of doing it? Data aggregation here is feeling a heavy process to a simple solution.

The source is a STREAM and not a TABLE. The field types are:

  • String
  • Integers
  • Record

Let me share an example of the executed command that returns an error as a result.

CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , timestamp
    , servicename
    , content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
[EMIT CHANGES];

@rmoff - Ticket created here to continue the discussion started on Stack Overflow - HERE

Additionally to everything discussed in the stack overflow ticket, I’m starting to wonder if the concept of a Table in ksqlDB is the same as a table in, for example, BigQuery.
Would be able to clarify this, please?

Hey @dsboudart, thanks for bringing the question here. Let me re-post my StackOverflow answer for completeness and then we can take it from there.

There are two are different types of object in ksqlDB:

  • A STREAM is an unbounded series of events - just like a Kafka topic. The only difference is that a STREAM has a declared schema.
  • A TABLE is state, for a given key. It’s the same as KTable in Kafka Streams if you’re familiar with that.

Both are backed by Kafka topics.

So you can do this - note that it’s creating a STREAM not a TABLE

CREATE STREAM test_stream
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , timestamp
    , servicename
    , content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL;

If you really want to create a TABLE then use the LATEST_BY_OFFSET aggregation, assuming you’d using id as your key:

CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , LATEST_BY_OFFSET(timestamp)
    , LATEST_BY_OFFSET(servicename)
    , LATEST_BY_OFFSET(content->assignedcontent)
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
GROUP BY id;

So, over to you - perhaps you can explain why it is you think you need to create a TABLE and not a STREAM? Can you illustrate with actual data - what data have you got coming in, and how are you wanting to access it after processing with ksqlDB?