I have a database in my Postgres. I want to connect the database to the Kafka topic, whatever changes are done to my database I am getting to my Kafka topic using debezium. But I want to send the Kafka topic messages to the elastic search. How can I achieve this? Steps would be appreciated.
My current docker file
version: "3.7"
services:
postgres:
image: postgres:10
ports:
- 5433:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=exampledb
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
kafka-connect:
container_name: kafka-connect
image: confluentinc/cp-kafka-connect-base:5.5.3
ports:
- "8083:8083"
depends_on:
- zookeeper
- kafka
volumes:
- $PWD/connect-plugins:/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect"
CONNECT_CONFIG_STORAGE_TOPIC: connect-config
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
CONNECT_PLUGIN_PATH: /connect-plugins
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-3.3.0.jar
volumes:
- $PWD/connectors:/connectors
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
echo "Installing connector plugins"
# Check latest version here: https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.1
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]
kafdrop:
image: obsidiandynamics/kafdrop
restart: "no"
environment:
KAFKA_BROKERCONNECT: "kafka:9092"
# JVM_OPTS: "-Xms16M -Xmx512M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
ports:
- 9000:9000
depends_on:
- kafka
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
container_name: elasticsearch
ports:
- 9200:9200
environment:
ES_JAVA_OPTS: "-Xms1g -Xmx1g"
discovery.type: "single-node"
node.store.allow_mmap: "false"
How do I create the connector between my postgres-Kafka and Kafka-elastic search using connectors using the RestAPI