I am using Kafka connect debezium to replicate data from one database postgres instance to another database postgres instance. Here are my questions regarding the implementation.
Kafka Connect configuration provided below
-
I wish to replicate a set of specific tables. I am using the attribute
"database.whitelist": "public.tablex",
but I am noting that all tables that exist in the database instance are being replicated. Am I using the wrong configuration to restrict the tables I want picked up by my connector?{ "name": "connector-ginger", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "post-aws-dev.whatever.com", "database.port": "5432", "database.user": "ginger", "database.password": "hocuspocusfocus", "database.dbname": "mock_db", "database.server.name": "dev", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "database.whitelist": "public.tablex", "database.history.kafka.topic": "schema-changes.tablex", "plugin.name": "wal2json", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081" } }
-
For each of the tables I wish to sink data on the destination side would it be possible to do this with one configuration? Or, do I need a sink config for each table/kafka.topic?
For e.g can I define multiple topics using this attribute
"topics": "dev.public.tablex",
{ "name": "sink-postgres", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "dev.public.tablex", "connection.url": "jdbc:postgresql://post-whatever.com", "connection.user" : "ginger", "table.name.format": "public.tablex", "connection.password" : "hocuspocusfocus", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "pkOnTableId", "auto.create": "true", "delete.enabled": "false", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://localhost:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://localhost:8081" } }