What:?
I am trying to create a data pipeline to capture change data (CDC) from RDS PostgreSQL to kafka cluster on confluent cloud.
How ?
I am running a self managed kafka connect cluster in docker container in an EC2 machine on AWS.
Note : I cant use PostgreSQL debezium confluent managed connector directly for security reasons.
Issue:
I am able to run the connect cluster and see the consumers are created on confluent kafka cluster but I dont see the topic to which data are streamed. The connector is running fine with no issues.
Configuration
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-debezium-pg-01/config \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"plugin.name": "pgoutput",
"publication.autocreate.mode":"filtered",
"tasks.max": "1",
"database.hostname": "xxxxxx.amazonaws.com",
"database.port": "5432",
"database.user": "debezium",
"database.password": "XXXXX",
"database.dbname" : "dbaname",
"database.server.name": "dbname1",
"table.include.list": "test.tablename",
"schema.whitelist": "myschema",
"database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
"database.history.kafka.topic": "dbname1.dbaname",
"topic.creation.enable" :"true",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"pg-debezium-$1"
}'
Note : - I have followed this tutorial