I want to send my data from postgres db to the kafka topic through kafka-connect , this to be done on my local system using docker images, if i directly use the plugin i wont be able to add my postgres connection, i need to install the plugin, to support postgres, how can i do that any blogs/ resources would be really helpful.
Could you provide a little more info on the error you’re hitting? I don’t fully understand. Are you asking how to create a custom Docker image with the postgres connector installed?
Dave
Exactly I want to create a custom Docker image with the postgres connector installed and I need a sample configuration to make a connector, Sorry for the late reply @dtroiano
Which connect image are you currently using? i.e., cp-kafka-connect
or cp-server-connect
or something else? Also, which specific connector? i.e., JDBC or Debezium PostgreSQL CDC?
You’d create a Dockerfile that runs the confluent-hub
command to install, e.g.,
FROM confluentinc/cp-server-connect:7.3.2
RUN confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.0.1
This would install the connector in /usr/share/confluent-hub-components
, so that path would have to be included in the CONNECT_PLUGIN_PATH environment variable.
For a full working example, I’d recommend the PostgreSQL CDC connect example here, or the JDBC source / sink examples depending on which connector you’re using.
HTH,
Dave
I am currently using this image Quay
Hi @dtroiano ,
kafka-connect:
# image: confluentinc/cp-kafka-connect-base:5.5.3
image: confluentinc/cp-kafka-connect:7.3.3
hostname: kafka-connect
container_name: kafka-connect
volumes:
- $PWD/connectors:/connectors
- "./producers/debezium-debezium-connector-postgresql/:/usr/share/confluent-hub-components/debezium-debezium-connector-postgresql/"
- "./consumers/confluentinc-kafka-connect-elasticsearch/:/usr/share/confluent-hub-components/confluentinc-kafka-connect-elasticsearch/"
depends_on:
- zookeeper
- kafka
- schema-registry
ports:
- "8085:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_ZOOKEEPER_CONNECT: "zookeeper:2181"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.5.1.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components/*"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Installing connector plugins"
# Check latest version here: https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.1
confluent-hub install --no-prompt debezium/debezium-connector-postgresql:2.0.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
I dont know whats wrong with my configuration i am not able to install the plugin
if i curl to connector-plugins
http://localhost:8085/connector-plugins
[{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"7.3.3-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"7.3.3-ccs"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"7.3.3-ccs"}]
and even try to register
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8085/connectors/ -d @register-postgres.json
HTTP/1.1 500 Internal Server Error
Date: Mon, 03 Apr 2023 13:06:38 GMT
Content-Type: application/json
Content-Length: 2352
Server: Jetty(9.4.48.v20220622)
{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.debezium.connector.postgresql.PostgresConnector, available connectors are: PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorCheckpointConnector, name='org.apache.kafka.connect.mirror.MirrorCheckpointConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorHeartbeatConnector, name='org.apache.kafka.connect.mirror.MirrorHeartbeatConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.mirror.MirrorSourceConnector, name='org.apache.kafka.connect.mirror.MirrorSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=sink, typeName='sink', location='file:/usr/share/java/confluent-control-center/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='7.3.3-ccs', encodedVersion=7.3.3-ccs, type=source, typeName='source', location='file:/usr/share/java/confluent-control-center/'}"}
my config file
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.servername":"127.0.0.1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "dbserver2",
"schema.include.list": "inventory",
"slot.name":"dbserver1inventory",
"publication.autocreate.mode":"all_tables"
}
}
@dtroiano please help me out i was very confused
This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.