I’m creating a data conversion pipeline that will have MySQL as SOURCE and ElasticSearch as SYNC. My data source is modeled in MySQL like this:
CREATE TABLE `person` (
`_uid` CHAR(36) NOT NULL PRIMARY KEY,
`_created` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`_updated` DATETIME(3) NULL,
`_disabled` DATETIME(3) NULL,
`name` VARCHAR(100) NOT NULL,
`age` INT NOT NULL
);
CREATE TABLE `contact_type` (
`_uid` CHAR(36) NOT NULL PRIMARY KEY,
`value` VARCHAR(100) NOT NULL
);
CREATE TABLE `person_contact` (
`_uid` CHAR(36) NOT NULL PRIMARY KEY,
`_created` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`_updated` DATETIME(3) NULL,
`_disabled` DATETIME(3) NULL,
`is_default` TINYINT NOT NULL DEFAULT 0,
`value` TEXT NOT NULL,
`contact_type_uid` CHAR(36) NOT NULL,
`person_uid` CHAR(36) NOT NULL,
FOREIGN KEY (`contact_type_uid`) REFERENCES `contact_type` (`_uid`),
FOREIGN KEY (`person_uid`) REFERENCES `person` (`_uid`)
);
To import the data into ksqldb I created a connector like this:
CREATE SOURCE CONNECTOR "source-mysql" WITH (
"connector.class" = 'io.confluent.connect.jdbc.JdbcSourceConnector',
"connection.url" = 'jdbc:mysql://mysql:3306/db',
"connection.user" = 'root',
"connection.password" = 'root',
"mode" = 'bulk',
"topic.prefix" = 'source-mysql-'
);
And then I created the source’s pointing to the topic’s like this:
CREATE STREAM "source-mysql-contact_type" (
"_uid" STRING,
"value" STRING
) WITH (
KAFKA_TOPIC='source-mysql-contact_type',
VALUE_FORMAT='AVRO'
);
Next I’m trying to create a table from the STREAM as in the example below:
CREATE TABLE "table-mysql-contact_type" WITH (
KAFKA_TOPIC='table-mysql-contact_type',
VALUE_FORMAT='AVRO'
) AS SELECT
"_uid",
LATEST_BY_OFFSET("value") AS "value"
FROM "source-mysql-contact_type"
GROUP BY "_uid"
EMIT CHANGES;
The problem is that with this query I can’t define the PRIMARY KEY “_uid” as it is in MySQL.
I also tried to create the table before and do the insert like this:
CREATE TABLE "table-mysql-contact_type" (
"_uid" STRING PRIMARY KEY,
"value" STRING
) WITH (
KAFKA_TOPIC='table-mysql-contact_type',
VALUE_FORMAT='AVRO'
);
INSERT INTO "table-mysql-contact_type"
SELECT
"_uid",
LATEST_BY_OFFSET("value") AS "value"
FROM "source-mysql-contact_type"
GROUP BY "_uid"
EMIT CHANGES;
But I’m getting the message:
Exception while preparing statement: INSERT INTO can only be used to insert
into a stream. table-mysql-contact_type is a table.
What do I need to do to be able to create a TABLE that receives data from a STREAM but that I can define the PRIMARY KEY of it?