Kafka Connect debezium postgres - Source and Sink Configurations

I am using Kafka connect debezium to replicate data from one database postgres instance to another database postgres instance. Here are my questions regarding the implementation.

Kafka Connect configuration provided below

  1. I wish to replicate a set of specific tables. I am using the attribute
    "database.whitelist": "public.tablex",
    but I am noting that all tables that exist in the database instance are being replicated. Am I using the wrong configuration to restrict the tables I want picked up by my connector?

    {
    "name": "connector-ginger",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "post-aws-dev.whatever.com",
        "database.port": "5432",
        "database.user": "ginger",
        "database.password": "hocuspocusfocus",
        "database.dbname": "mock_db",
        "database.server.name": "dev",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "database.whitelist": "public.tablex",
        "database.history.kafka.topic": "schema-changes.tablex",
        "plugin.name": "wal2json",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
    	"key.converter.schema.registry.url": "http://localhost:8081",
    	"value.converter": "io.confluent.connect.avro.AvroConverter",
    	"value.converter.schema.registry.url": "http://localhost:8081"
    }
    }
    
  2. For each of the tables I wish to sink data on the destination side would it be possible to do this with one configuration? Or, do I need a sink config for each table/kafka.topic?

    For e.g can I define multiple topics using this attribute "topics": "dev.public.tablex",

    { 
    "name": "sink-postgres", 
    "config": { 
    	"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", 
    	"tasks.max": "1", 
    	"topics": "dev.public.tablex", 
    	"connection.url": "jdbc:postgresql://post-whatever.com",
    	"connection.user" :  "ginger",
    	"table.name.format": "public.tablex",
    	"connection.password" : "hocuspocusfocus",
    	"insert.mode": "upsert", 
    	"pk.mode": "record_value", 
    	"pk.fields": "pkOnTableId",
    	"auto.create": "true", 
    	"delete.enabled": "false",
    	 "transforms": "unwrap",
    	"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    	"transforms.unwrap.drop.tombstones": "false",
    	"key.converter": "io.confluent.connect.avro.AvroConverter",
    	"key.converter.schema.registry.url": "http://localhost:8081",
    	"value.converter": "io.confluent.connect.avro.AvroConverter",
    	"value.converter.schema.registry.url": "http://localhost:8081"
    } 
    }
    

What debezium versions are you using? according to the recnts docs the config key should be: table.include.list

https://debezium.io/documentation/reference/1.6/connectors/postgresql.html

2 Likes

Thank you. I will report back and let you know if this works.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.