Hi, I have used Debezium MySQL Source Connector and MySQL Sink Connector.
I know that the options “auto.create”: “true” and “auto.evolve”: “true” handle the adding columns and creating tables.
but I have to reflect source’s ddl to sink. When drop table A, The data of table A remains in the sink db. I want to delete all this data with drop table.
I can’t find other solution to handle ddl especially DROP TABLE and MODIFY COLUMN TYPE…
If I read schema history topic or schema change topic, I got so many errors.
How can I reflect DDL at Sink Connector?
Here are my configs.
{
"name": "source-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "IP",
"database.port": "PORT",
"database.user": "USER",
"database.password": "PW",
"database.server.id": "10",
"database.server.name": "test-server-name",
"database.include.list": "debezium",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
{
"name": "sink-mysql",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "TOPICS (table name)",
"connection.url": "jdbc:mysql://IP:PORT/SCHEMA",
"connection.user": "USER",
"connection.password": "PW",
"transforms": "unwrap, flatten",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.flatten.type" : "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter" : "_",
"auto.create": "true",
"auto.evolve": "true",
"delete.enabled" : "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
When I use normal sink connector config, reflecting DDL is not working too…
{
"name": "sink-ddl",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "schema-changes, schema-history",
"connection.url": "jdbc:mysql://IP:PORT/debezium",
"connection.user": "USER",
"connection.password": "PW"
}
}