How Sink Connector reflect DDL

Hi, I have used Debezium MySQL Source Connector and MySQL Sink Connector.

I know that the options “auto.create”: “true” and “auto.evolve”: “true” handle the adding columns and creating tables.

but I have to reflect source’s ddl to sink. When drop table A, The data of table A remains in the sink db. I want to delete all this data with drop table.

I can’t find other solution to handle ddl especially DROP TABLE and MODIFY COLUMN TYPE…
If I read schema history topic or schema change topic, I got so many errors.

How can I reflect DDL at Sink Connector?

Here are my configs.

{
    "name": "source-mysql",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "IP",
        "database.port": "PORT",
        "database.user": "USER",
        "database.password": "PW",
        "database.server.id": "10",
        "database.server.name": "test-server-name",
        "database.include.list": "debezium",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "schema-changes",
	"transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}
{
    "name": "sink-mysql",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "TOPICS (table name)",
        "connection.url": "jdbc:mysql://IP:PORT/SCHEMA",
        "connection.user": "USER",
	"connection.password": "PW",
	"transforms": "unwrap, flatten",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
	"transforms.flatten.type"       : "org.apache.kafka.connect.transforms.Flatten$Value",
	"transforms.flatten.delimiter"  : "_",
        "auto.create": "true",
        "auto.evolve": "true",
	"delete.enabled" : "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_key"
    }
}

When I use normal sink connector config, reflecting DDL is not working too…

{
    "name": "sink-ddl",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "schema-changes, schema-history",
        "connection.url": "jdbc:mysql://IP:PORT/debezium",
        "connection.user": "USER",
	"connection.password": "PW"
    }
}

As is expected. Debezium reads CDC events from the table. If the source table is dropped, then there are no more events to read from it. TRUNCATE commands are not sent, nor are "after":null records for all existing records.

I would expect modification of column types to be handled appropriately by the converter/serializer

You may find better answers in Debezium support channels.

Hello,

I would expect modification of column types to be handled appropriately by the converter/serializer

Debezium DML messages have schema information of column type. also Debezium makes DDL message at DDL topic.
But, JDBC Sink Connector cannot handle these. Am I wrong?

If I want reflect modification of column types to sink database, customizing sink connector myself is the only way ?

Thanks.

The sink connector will only read the topics you’ve configured it for. And then use those to run the respective insert/update/delete statements. I do not believe alter/add column are used unless auto evolution is enabled, but that’s only going to reference the record schema, not any DML generated by Debezium. More specifically, JDBC sink is standalone, and not coupled with Debezium’s data structures

1 Like

Thank you. The answer is clear.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.