Create Table without data aggregation

I just started to use the ksqlDB Confluent feature, and it stood out that it is not possible to proceed with the following command: CREATE TABLE AS SELECT A, B, C FROM [STREAM_A] [EMIT CHANGES];

I wonder why this is not possible or if there’s a way of doing it? Data aggregation here is feeling a heavy process to a simple solution.

The source is a STREAM and not a TABLE. The field types are:

  • String
  • Integers
  • Record

Let me share an example of the executed command that returns an error as a result.

CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , timestamp
    , servicename
    , content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
[EMIT CHANGES];

@rmoff - Ticket created here to continue the discussion started on Stack Overflow - HERE

Additionally to everything discussed in the stack overflow ticket, I’m starting to wonder if the concept of a Table in ksqlDB is the same as a table in, for example, BigQuery.
Would be able to clarify this, please?

Hey @dsboudart, thanks for bringing the question here. Let me re-post my StackOverflow answer for completeness and then we can take it from there.

There are two are different types of object in ksqlDB:

  • A STREAM is an unbounded series of events - just like a Kafka topic. The only difference is that a STREAM has a declared schema.
  • A TABLE is state, for a given key. It’s the same as KTable in Kafka Streams if you’re familiar with that.

Both are backed by Kafka topics.

So you can do this - note that it’s creating a STREAM not a TABLE

CREATE STREAM test_stream
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , timestamp
    , servicename
    , content->assignedcontent
FROM created_stream
WHERE content->assignedcontent IS NOT NULL;

If you really want to create a TABLE then use the LATEST_BY_OFFSET aggregation, assuming you’d using id as your key:

CREATE TABLE test_table
WITH (KEY_FORMAT='JSON',VALUE_FORMAT='AVRO')
AS
SELECT id
    , LATEST_BY_OFFSET(timestamp)
    , LATEST_BY_OFFSET(servicename)
    , LATEST_BY_OFFSET(content->assignedcontent)
FROM created_stream
WHERE content->assignedcontent IS NOT NULL
GROUP BY id;

So, over to you - perhaps you can explain why it is you think you need to create a TABLE and not a STREAM? Can you illustrate with actual data - what data have you got coming in, and how are you wanting to access it after processing with ksqlDB?

This topic was automatically closed after 30 days. New replies are no longer allowed.