Here’s a great Slack thread to stay informed about in-place schema updates. Thanks, to Michael Drogalis and @almog for their help!
Copying the thread here for posterity.
Maulik Parikh Hi All, we have a Debezium CDC topic which uses schema-registry. Say we have built a KSQL table on top of that CDC topic. Now when CDC topic has a new version of schema, what’s the best way to have the ksql table reflect that change in schema (e.g. a new field is added)… Can we expect this to be automatic schema evolution on ksql side as well?
Michael Drogalis This is something we hope to make automatic one day, but today you would run a CREATE OR REPLACE
command on your stream or table. https://docs.ksqldb.io/en/latest/how-to-guides/update-a-running-persistent-query/
Maulik Parikh Thnx @'Michael Drogalis! Is this an uncommon use case that I described?
Michael Drogalis No it’s not uncommon, it’s just something we need to do.
Maulik Parikh hi @'Michael Drogalis, it seems CREATE OR REPLACE
suggestion on table to use the v2 of the CDC topic schema runs into error
Maulik Parikh Sequence to reproduce the issue : 1. Have a MySQL table CDC topic from Debezium (this will have schema v1) 2. Let some data pump in 3. Run CREATE TABLE KSQL query CREATE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');
4. Alter MySQL table and add one more column of type String at the end 5. Observe that Debezium History topic has the DDL change event 6. Observe that actual CDC topic has schema v2 when the first new record is created on that table 7. Now, Apply CREATE OR REPLACE statement CREATE OR REPLACE TABLE TBL_NAME WITH (KAFKA_TOPIC='CDC_TOPIC_NAME', VALUE_FORMAT='AVRO', KEY_FORMAT='AVRO');
Maulik Parikh This is the error : Cannot upgrade data source: DataSource 'CUSTOMERS_CDC_TBL' (The following columns are changed, missing or reordered: ['__OP' STRING, '__TABLE' STRING, '__SOURCE_TS_MS' BIGINT, '__TS_MS' BIGINT, '__DELETED' STRING])
Michael Drogalis Do you see the schema actually change in SR?
Maulik Parikh yes
Maulik Parikh We can see in Control Center with the new version as well (on the topic)
Michael Drogalis @almog might have an idea.
Almog Gavra Maulik Parikh can you paste the original schema for TBL_NAME
that you get back from a DESCRIBE
command?
Almog Gavra and also, to help debug, can you share the original and new schemas in SR?
Maulik Parikh KTable Original Schema
Name : CUSTOMERS_CDC_TBL
Field | Type
-------------------------------------------------------------
ROWKEY | STRUCT<ID INTEGER>; (primary key)
ID | INTEGER
PROPERTY_ID | INTEGER
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
COUNT_RESERVATIONS | INTEGER
IS_REPEAT_GUEST | INTEGER
EMAIL | VARCHAR(STRING)
BIRTHDAY | INTEGER
CPF | VARCHAR(STRING)
PHONE | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
CELL_PHONE | VARCHAR(STRING)
STREET | VARCHAR(STRING)
NUMBER | VARCHAR(STRING)
COMPLEMENT | VARCHAR(STRING)
NEIGHBORHOOD | VARCHAR(STRING)
CITY | VARCHAR(STRING)
STATE | VARCHAR(STRING)
ZIP | VARCHAR(STRING)
COUNTRY | VARCHAR(STRING)
RG | VARCHAR(STRING)
ISSUE_DATE | INTEGER
ISSUER | VARCHAR(STRING)
ISSUER_STATE | VARCHAR(STRING)
USER_ID | INTEGER
ADDRESS1 | VARCHAR(STRING)
ADDRESS2 | VARCHAR(STRING)
ID_PHOTO | INTEGER
DELETED | INTEGER
STATUS_ID | INTEGER
STATUS_NAME | VARCHAR(STRING)
DOCUMENT_TYPE | VARCHAR(STRING)
DOCUMENT_NUMBER | VARCHAR(STRING)
DOCUMENT_ISSUE_DATE | INTEGER
DOCUMENT_ISSUING_COUNTRY | VARCHAR(STRING)
DOCUMENT_EXPIRATION_DATE | INTEGER
LAST_CHANGE | BIGINT
IS_OPT_IN | INTEGER
OPT_IN_HASH | VARCHAR(STRING)
EMAIL_HASH | VARCHAR(STRING)
IS_ANONYMIZED | INTEGER
GUEST_TAX_ID_NUMBER | VARCHAR(STRING)
COMPANY_NAME | VARCHAR(STRING)
COMPANY_TAX_ID_NUMBER | VARCHAR(STRING)
IS_MERGED | INTEGER
__OP | VARCHAR(STRING)
__TABLE | VARCHAR(STRING)
__SOURCE_TS_MS | BIGINT
__TS_MS | BIGINT
__DELETED | VARCHAR(STRING)
-------------------------------------------------------------
Almog Gavra ah ok I think i see what’s going on
Almog Gavra it looks like Debezium will add the metadata columns at the end of the schema no matter what (__OP
, __TABLE
,…) which means that if you create an additional field, even if it’s at the end, will in practice still be in the middle of the schema
Almog Gavra unfortunately that looks to me like CREATE OR REPLACE won’t work with debezium unless you can somehow configure it to register the schema in a way that ksql likes
Almog Gavra do you mind creating a GitHub ticket with that information? we’ll prioritize it accordingly
Maulik Parikh as you can imagine this is going to be blocker for several others who are adopting ksql along with Debezium…
Maulik Parikh almog, Michael Drogalis - here’s the GitHub : https://github.com/confluentinc/ksql/issues/8148 Pls LMK if any further info needed
Michael Drogalis Thanks! Yeah this is good to surface, we’ll try to get on this one quickly. Debezium is mega popular.
This topic was automatically closed after 30 days. New replies are no longer allowed.