Concatenated composite keys (ksqlDB v0.11) and JDBC Connector

Hi!

I’m using ksqlDB version 0.11 (I cannot upgrade to newer versions at the moment), and willing to replicate data from a table into MySQL using JDBC Sink connector. ksqlDB v0.11 does not support multiple table keys, and my data needs to be grouped using a GROUP BY expression with multiple.

Using this statement I create the table:

    CREATE TABLE estads AS SELECT 
                STID AS stid, 
                ASIG AS asig, 
                COUNT(*) AS np, 
                MIN(NOTA) AS min, 
                MAX(NOTA) AS max, 
                AVG(NOTA) AS med, 
                LATEST_BY_OFFSET(FECHREG) AS fechreg 
        FROM estads_stm GROUP BY stid, asig EMIT CHANGES;

The resulting table has the following schema:

Name                 : ESTADS
 Field      | Type                           
---------------------------------------------
 KSQL_COL_0 | VARCHAR(STRING)  (primary key) 
 NP         | BIGINT                         
 MIN        | DOUBLE                         
 MAX        | DOUBLE                         
 MED        | DOUBLE                         
 FECHREG    | VARCHAR(STRING) 

As you can see, the two primary keys (stid and asig) has been merged into a field called KSQL_COL_0, which is the expected behavior for version 0.11. The problem is that I need to use JDBC Sink connector to replicate the data into a MySQL table with the following schema:

+---------+--------------+------+-----+-------------------+-----------------------------+
| Field   | Type         | Null | Key | Default           | Extra                       |
+---------+--------------+------+-----+-------------------+-----------------------------+
| stid    | varchar(15)  | NO   | PRI | NULL              |                             |
| asig    | varchar(10)  | NO   | PRI | NULL              |                             |
| np      | smallint(6)  | YES  |     | NULL              |                             |
| min     | decimal(5,2) | YES  |     | NULL              |                             |
| max     | decimal(5,2) | YES  |     | NULL              |                             |
| med     | decimal(5,2) | YES  |     | NULL              |                             |
| fechreg | timestamp    | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------+--------------+------+-----+-------------------+-----------------------------+

I don’t know how to “unmerge” the automatically generated KSQL_COL_0 in order to tell JDBC that both stid and asig are primary keys in the MySQL table. Any ideas how to manage this? I know that since ksqlDB version 0.15 this is no longer a problem, as multiple keys is supported, but as I said, upgrading is not an option in my case.

Thanks! :slight_smile:

The best way might be to use “single message transforms” (SMT). It allows you to specify a simple per-record transformation before the data will be written into MySQL.

Check out the docs for more details: Get started with Single Message Transforms for self-managed connectors | Confluent Documentation

This topic was automatically closed after 30 days. New replies are no longer allowed.