FYI for KSQL and Debezium users!

Here’s a great Slack thread to stay informed about in-place schema updates. Thanks, to Michael Drogalis and @almog for their help!

1 Like

Copying the thread here for posterity. :slight_smile:


Maulik Parikh Hi All, we have a Debezium CDC topic which uses schema-registry. Say we have built a KSQL table on top of that CDC topic. Now when CDC topic has a new version of schema, what’s the best way to have the ksql table reflect that change in schema (e.g. a new field is added)… Can we expect this to be automatic schema evolution on ksql side as well?

Michael Drogalis This is something we hope to make automatic one day, but today you would run a CREATE OR REPLACE command on your stream or table. https://docs.ksqldb.io/en/latest/how-to-guides/update-a-running-persistent-query/

Maulik Parikh Thnx @'Michael Drogalis! Is this an uncommon use case that I described?

Michael Drogalis No it’s not uncommon, it’s just something we need to do.

Maulik Parikh hi @'Michael Drogalis, it seems CREATE OR REPLACE suggestion on table to use the v2 of the CDC topic schema runs into error

Maulik Parikh Sequence to reproduce the issue : 1. Have a MySQL table CDC topic from Debezium (this will have schema v1) 2. Let some data pump in 3. Run CREATE TABLE KSQL query CREATE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO'); 4. Alter MySQL table and add one more column of type String at the end 5. Observe that Debezium History topic has the DDL change event 6. Observe that actual CDC topic has schema v2 when the first new record is created on that table 7. Now, Apply CREATE OR REPLACE statement CREATE OR REPLACE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');

Maulik Parikh This is the error : Cannot upgrade data source: DataSource 'CUSTOMERS_CDC_TBL' (The following columns are changed, missing or reordered: ['__OP' STRING, '__TABLE' STRING, '__SOURCE_TS_MS' BIGINT, '__TS_MS' BIGINT, '__DELETED' STRING])

Michael Drogalis Do you see the schema actually change in SR?

Maulik Parikh yes

Maulik Parikh We can see in Control Center with the new version as well (on the topic)

Michael Drogalis @almog might have an idea.

Almog Gavra Maulik Parikh can you paste the original schema for TBL_NAME that you get back from a DESCRIBE command?

Almog Gavra and also, to help debug, can you share the original and new schemas in SR?

Maulik Parikh KTable Original Schema

Name : CUSTOMERS_CDC_TBL 
Field                     | Type 
------------------------------------------------------------- 
ROWKEY                    | STRUCT<ID INTEGER>; (primary key) 
ID                        | INTEGER 
PROPERTY_ID               | INTEGER
FIRST_NAME                | VARCHAR(STRING)
LAST_NAME                 | VARCHAR(STRING)
COUNT_RESERVATIONS        | INTEGER
IS_REPEAT_GUEST           | INTEGER
EMAIL                     | VARCHAR(STRING)
BIRTHDAY                  | INTEGER
CPF                       | VARCHAR(STRING)
PHONE                     | VARCHAR(STRING)
GENDER                    | VARCHAR(STRING)
CELL_PHONE                | VARCHAR(STRING)
STREET                    | VARCHAR(STRING)
NUMBER                    | VARCHAR(STRING)
COMPLEMENT                | VARCHAR(STRING)
NEIGHBORHOOD              | VARCHAR(STRING)
CITY                      | VARCHAR(STRING)
STATE                     | VARCHAR(STRING)
ZIP                       | VARCHAR(STRING)
COUNTRY                   | VARCHAR(STRING)
RG                        | VARCHAR(STRING)
ISSUE_DATE                | INTEGER
ISSUER                    | VARCHAR(STRING)
ISSUER_STATE              | VARCHAR(STRING)
USER_ID                   | INTEGER
ADDRESS1                  | VARCHAR(STRING)
ADDRESS2                  | VARCHAR(STRING)
ID_PHOTO                  | INTEGER
DELETED                   | INTEGER
STATUS_ID                 | INTEGER
STATUS_NAME               | VARCHAR(STRING)
DOCUMENT_TYPE             | VARCHAR(STRING)
DOCUMENT_NUMBER           | VARCHAR(STRING)
DOCUMENT_ISSUE_DATE       | INTEGER
DOCUMENT_ISSUING_COUNTRY  | VARCHAR(STRING)
DOCUMENT_EXPIRATION_DATE  | INTEGER
LAST_CHANGE               | BIGINT
IS_OPT_IN                 | INTEGER
OPT_IN_HASH               | VARCHAR(STRING)
EMAIL_HASH                | VARCHAR(STRING)
IS_ANONYMIZED             | INTEGER
GUEST_TAX_ID_NUMBER       | VARCHAR(STRING)
COMPANY_NAME              | VARCHAR(STRING)
COMPANY_TAX_ID_NUMBER     | VARCHAR(STRING)
IS_MERGED                 | INTEGER
__OP                      | VARCHAR(STRING)
__TABLE                   | VARCHAR(STRING)
__SOURCE_TS_MS            | BIGINT
__TS_MS                   | BIGINT
__DELETED                 | VARCHAR(STRING)
-------------------------------------------------------------

Almog Gavra ah ok I think i see what’s going on

Almog Gavra it looks like Debezium will add the metadata columns at the end of the schema no matter what (__OP, __TABLE,…) which means that if you create an additional field, even if it’s at the end, will in practice still be in the middle of the schema

Almog Gavra unfortunately that looks to me like CREATE OR REPLACE won’t work with debezium unless you can somehow configure it to register the schema in a way that ksql likes :confused:

Almog Gavra do you mind creating a GitHub ticket with that information? we’ll prioritize it accordingly

Maulik Parikh as you can imagine this is going to be blocker for several others who are adopting ksql along with Debezium…

Maulik Parikh almog, Michael Drogalis - here’s the GitHub : https://github.com/confluentinc/ksql/issues/8148 Pls LMK if any further info needed

Michael Drogalis Thanks! Yeah this is good to surface, we’ll try to get on this one quickly. Debezium is mega popular.

1 Like

This topic was automatically closed after 30 days. New replies are no longer allowed.