Postgres Source Connector DockerFile

I want to send my data from postgres db to the kafka topic through kafka-connect , this to be done on my local system using docker images, if i directly use the plugin i wont be able to add my postgres connection, i need to install the plugin, to support postgres, how can i do that any blogs/ resources would be really helpful.

Hi @likhithkanigolla,

Could you provide a little more info on the error you’re hitting? I don’t fully understand. Are you asking how to create a custom Docker image with the postgres connector installed?

Dave

Exactly I want to create a custom Docker image with the postgres connector installed and I need a sample configuration to make a connector, Sorry for the late reply @dtroiano

Hi @likhithkanigolla,

Which connect image are you currently using? i.e., cp-kafka-connect or cp-server-connect or something else? Also, which specific connector? i.e., JDBC or Debezium PostgreSQL CDC?

You’d create a Dockerfile that runs the confluent-hub command to install, e.g.,

FROM confluentinc/cp-server-connect:7.3.2

RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.0.1

This would install the connector in /usr/share/confluent-hub-components, so that path would have to be included in the CONNECT_PLUGIN_PATH environment variable.

For a full working example, I’d recommend the PostgreSQL CDC connect example here, or the JDBC source / sink examples depending on which connector you’re using.

HTH,
Dave

I am currently using this image Quay

Hi @dtroiano ,

  kafka-connect:
#    image: confluentinc/cp-kafka-connect-base:5.5.3
    image: confluentinc/cp-kafka-connect:7.3.3
    hostname: kafka-connect
    container_name: kafka-connect
    volumes:
      - $PWD/connectors:/connectors
      - "./producers/debezium-debezium-connector-postgresql/:/usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/"
      - "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/"
    depends_on:
      - zookeeper
      - kafka
      - schema-registry
    ports:
      - "8085:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
      KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M"
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/*"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # Check latest version here: https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.1
        confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.0.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

I dont know whats wrong with my configuration i am not able to install the plugin
if i curl to connector-plugins

http://localhost:8085/connector-plugins
[{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.3.3-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.3.3-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.3.3-ccs"}]

and even try to register

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8085/connectors/ -d @register-postgres.json
HTTP/1.1 500 Internal Server Error
Date: Mon, 03 Apr 2023 13:06:38 GMT
Content-Type: application/json
Content-Length: 2352
Server: Jetty(9.4.48.v20220622)

{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.debezium.connector.postgresql.PostgresConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"}

my config file

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.servername":"127.0.0.1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "topic.prefix": "dbserver2",
        "schema.include.list": "inventory",
        "slot.name":"dbserver1inventory",
        "publication.autocreate.mode":"all_tables"
    }
}

@dtroiano please help me out i was very confused

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.