Postgres-Kafka-Elastic Search

I have a database in my Postgres. I want to connect the database to the Kafka topic, whatever changes are done to my database I am getting to my Kafka topic using debezium. But I want to send the Kafka topic messages to the elastic search. How can I achieve this? Steps would be appreciated.
My current docker file

version: "3.7"
services:
  postgres:
    image: postgres:10
    ports:
      - 5433:5432
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=exampledb
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092
  kafka-connect:
    container_name: kafka-connect
    image: confluentinc/cp-kafka-connect-base:5.5.3
    ports:
      - "8083:8083"
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - $PWD/connect-plugins:/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: connect-config
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181
      CONNECT_PLUGIN_PATH: /connect-plugins
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-3.3.0.jar

    volumes:
      - $PWD/connectors:/connectors
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command: 
      - bash 
      - -c 
      - |
        echo "Installing connector plugins"
        # Check latest version here: https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.0.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run & 
        #
        sleep infinity
  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]
   
  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      KAFKA_BROKERCONNECT: "kafka:9092"
      # JVM_OPTS: "-Xms16M -Xmx512M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    ports:
      - 9000:9000
    depends_on:
      - kafka
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
    container_name: elasticsearch
    ports:
      - 9200:9200
    environment:
      ES_JAVA_OPTS: "-Xms1g -Xmx1g"
      discovery.type: "single-node"
      node.store.allow_mmap: "false"

How do I create the connector between my postgres-Kafka and Kafka-elastic search using connectors using the RestAPI

hey @likhithkanigolla

did you consider using

best,
michael

hey, @mmuehlbeyer thanks for the reply. the document you shared shows me the connection between Kafka and the elastic search. How can I make one connector between my postgres and the Kafka topic to receive messages?
Also, any video tutorials would be more helpful for me. Please share if you have any

hey @likhithkanigolla

ok I see
take a look at
https://debezium.io/documentation/reference/stable/connectors/postgresql.html

best,
michael

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.