How to create a TABLE from STREAM with PRIMARY KEY in ksqldb

I’m creating a data conversion pipeline that will have MySQL as SOURCE and ElasticSearch as SYNC. My data source is modeled in MySQL like this:

CREATE TABLE `person` (
    `_uid`      CHAR(36) NOT NULL PRIMARY KEY,
    `_created`  DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    `_updated`  DATETIME(3) NULL,
    `_disabled` DATETIME(3) NULL,
    `name`      VARCHAR(100) NOT NULL,
    `age`       INT NOT NULL
);
CREATE TABLE `contact_type` (
    `_uid`  CHAR(36) NOT NULL PRIMARY KEY,
    `value` VARCHAR(100) NOT NULL
);
CREATE TABLE `person_contact` (
    `_uid`             CHAR(36) NOT NULL PRIMARY KEY,
    `_created`         DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
    `_updated`         DATETIME(3) NULL,
    `_disabled`        DATETIME(3) NULL,
    `is_default`       TINYINT NOT NULL DEFAULT 0,
    `value`            TEXT NOT NULL,
    `contact_type_uid` CHAR(36) NOT NULL,
    `person_uid`       CHAR(36) NOT NULL,
    FOREIGN KEY (`contact_type_uid`) REFERENCES `contact_type` (`_uid`),
    FOREIGN KEY (`person_uid`) REFERENCES `person` (`_uid`)
);

To import the data into ksqldb I created a connector like this:

CREATE SOURCE CONNECTOR "source-mysql" WITH (
  "connector.class"     = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  "connection.url"      = 'jdbc:mysql://mysql:3306/db',
  "connection.user"     = 'root',
  "connection.password" = 'root',
  "mode"                = 'bulk',
  "topic.prefix"        = 'source-mysql-'
);

And then I created the source’s pointing to the topic’s like this:

CREATE STREAM "source-mysql-contact_type" (
  "_uid" STRING,
  "value" STRING
) WITH (
  KAFKA_TOPIC='source-mysql-contact_type',
  VALUE_FORMAT='AVRO'
);

Next I’m trying to create a table from the STREAM as in the example below:

CREATE TABLE "table-mysql-contact_type" WITH (
  KAFKA_TOPIC='table-mysql-contact_type',
  VALUE_FORMAT='AVRO'
) AS SELECT
  "_uid",
  LATEST_BY_OFFSET("value") AS "value"
FROM "source-mysql-contact_type"
GROUP BY "_uid"
EMIT CHANGES;

The problem is that with this query I can’t define the PRIMARY KEY “_uid” as it is in MySQL.

I also tried to create the table before and do the insert like this:

CREATE TABLE "table-mysql-contact_type" (
  "_uid" STRING PRIMARY KEY,
  "value" STRING
) WITH (
  KAFKA_TOPIC='table-mysql-contact_type',
  VALUE_FORMAT='AVRO'
);
INSERT INTO "table-mysql-contact_type"
SELECT
  "_uid",
  LATEST_BY_OFFSET("value") AS "value"
FROM "source-mysql-contact_type"
GROUP BY "_uid"
EMIT CHANGES;

But I’m getting the message:

Exception while preparing statement: INSERT INTO can only be used to insert 
into a stream. table-mysql-contact_type is a table.

What do I need to do to be able to create a TABLE that receives data from a STREAM but that I can define the PRIMARY KEY of it?

This topic was automatically closed after 30 days. New replies are no longer allowed.