Are there source and sink connector libraries that support ddl in postgresql?

Are there source and sink connector libraries that support ddl in postgresql?

It seems that ddl is not supported by the connector below that is currently in use.
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,

source connector json
{
“name”: “postgres-source”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.hostname”: “172.19.0.4”,
“database.port”: “5432”,
“database.user”: “postgres”,
“database.password”: “1234”,
“database.dbname”: “postgres”,
“database.server.name”: “postgres”,
“slot.name”: “source”,
“topic.prefix”: “CDC”,
“table.include.list”: “public.opr,public.opr_sys”,
“plugin.name”: “pgoutput”,
“offset.commit.interval.ms”: 1000
}
}

sink connector json

{
“name”: “postgres-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:postgresql://172.19.0.5:5432/postgres”,
“connection.user”: “postgres”,
“connection.password”: “1234”,
“slot.name”: “sink”,
“topics.regex”: “CDC.",
“insert.mode”: “upsert”,
“delete.enabled”: “true”,
“auto.create”: “true”,
“auto.evolve”: “true”,
“table.evolve.strategy”: “drop-and-create”,
“pk.mode”: “record_key”,
“unique.key”: “id”,
“batch.size”: 100,
“transforms”: “unwrap,route,TimestampConverter”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.unwrap.delete.handling.mode”: “rewrite”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\.([^.]+)\.([^.]+)”,
“transforms.route.replacement”: “$3”,
“transforms.TimestampConverter.type”: “com.github.howareyouo.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.TimestampConverter.format”: “yyyy-MM-dd’T’HH:mm:ss’Z’”,
“transforms.TimestampConverter.target.type”: “Timestamp”,
“transforms.TimestampConverter.fields”: "
_dt”,
“record.interval.ms”: 1000
}
}

You’ve not shown the key or value converter values, but "auto.create”: “true” will create the tables on the sink side.

You’ll need to use record schemas, though, from both the source and sink. Plain JSON cannot be used to construct any more columns than BLOB or TEXT types

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.