Kafka-connect-HDFS

Hi! I can’t download HDFS Sink connectors !
Actually, i’m trying to install kafka connector in docker and i need to download the .JAR of Kafka-connect-hdfs connector before running it. When i go on this HDFS 3 Sink Connector | Confluent Hub , the download 's button is not clickable. Could someone help me, please?

Here is the configuration of the service:

kafka-connect:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
volumes:
- ./connector:/usr/share/java
environment:
CONNECT_BOOTSTRAP_SERVERS: ‘kafka:9092’
CONNECT_GROUP_ID: ‘kafka-connect-group’
CONNECT_CONFIG_STORAGE_TOPIC: ‘kafka-connect-config’
CONNECT_OFFSET_STORAGE_TOPIC: ‘kafka-connect-offset’
CONNECT_STATUS_STORAGE_TOPIC: ‘kafka-connect-status’
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_REST_ADVERTISED_HOST_NAME: ‘kafka-connect’
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_PLUGIN_PATH: ‘/usr/share/java’
ports:
- 8083:8083

hey @Damilola

I would recommend installing it via command line:

add the following to your compose file:

command:
        - bash
        - -c
        - |
          echo "Installing Kafka Connect hdfs"
          confluent-hub install confluentinc/kafka-connect-hdfs3:1.1.25
          #
          echo "Launching Kafka Connect worker"
          /etc/confluent/docker/run &
          #
          echo "Waiting for Kafka Connect to start listening on 0.0.0.0:8083 ⏳"
          while : ; do
            curl_status=$$(curl -s -o /dev/null -w %{http_code} http://0.0.0.0:8083/connectors)
            echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
            if [ $$curl_status -eq 200 ] ; then
            break
            fi
            sleep 5
          done
          sleep infinity

best,
michael

Okay. Thanks
I’ll try it

It works. Thanks

But i have another question
This is a snippet from my docker-compose. How to connect kafka-connect to hdfs (namenode)? Or it is during the copy, I presice the url of HDFS ? Here (localhost://50070)

  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    container_name: namenode
    volumes:
      - namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
    networks:
      - elk

  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    container_name: datanode
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
    networks:
      - elk

  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    hostname: connect
    depends_on:
      - schema_registry
      - kafka
      - zookeeper
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java/kafka-connect-*
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    ports:
      - 8083:8083
    command: 
      - bash
      - -c
      - |
        confluent-hub install confluentinc/kafka-connect-hdfs:10.2.1
        /etc/confluent/docker/run
    networks:
      - elk

it’s done via worker configuration of kafka connect.

see HDFS 3 Sink Connector Configuration Properties | Confluent Documentation for reference

best,
michael

1 Like

Hello Sir
I have another problem with kafka-connect:
Caused by: java.io.IOException: Failed to bind to 0.0.0.0/0.0.0.0:8083
Here is the service for kafka-connect:

kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    hostname: connect
    depends_on:
      - schema_registry
      - kafka
      - zookeeper
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_PORT: 8083
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java/
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    ports:
      - 8083:8083
    volumes:
      - hadoopconf:/usr/local/hadoop-conf
    command: 
      - bash
      - -c
      - |
        echo "Installing Kafka Connect hdfs"
        confluent-hub install confluentinc/kafka-connect-hdfs:10.2.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run
    networks:
      - elk

When i did :

sudo lsof -i tcp:8083

i see 2 processus running. Although i kill one, i get the same error

In browser, launching: localhost:8083/connectors, i got: ["hdfs-sink"]

Here is my quickstart.properties:

connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs
hdfs.url=hdfs://namenode:8020
flush.size=3

I’ll appreciate any help

Actually, here is my docker-compose contents

version: '3'

services:
  spark-master:
    image: bde2020/spark-master:3.1.1-hadoop3.2
    container_name: spark-master
    ports:
      - "8080:8080"
      - "7077:7077"
    environment:
      - INIT_DAEMON_STEP=setup_spark
    volumes:
      - spark_volume:/spark
    networks:
      - elk
  spark-worker-1:
    image: bde2020/spark-worker:3.1.1-hadoop3.2
    container_name: spark-worker-2
    depends_on:
      - spark-master
    ports:
      - "8081:8081"
    environment:
      - "SPARK_MASTER=spark://spark-master:7077"
      - "SPARK_WORKER_CORES=2"
      - "SPARK_WORKER_MEMORY=4g"
    networks:
      - elk
  zeppelin:
    image: apache/zeppelin:0.10.1
    container_name: apache-zeppelin
    depends_on:
      - spark-master
    ports:
      - "8085:8080"
    volumes:
      - ./notebook:/notebook
      - ./conf:/conf
      - ./logs:/logs
      - spark_volume:/opt/zeppelin/spark
    environment:
      - "SPARK_HOME=/opt/zeppelin/spark"
      - "SPARK_MASTER=spark://spark-master:7077"
    networks:
      - elk

  Elasticsearch:
    image: elasticsearch:7.16.2
    container_name: elasticsearch
    volumes:
    - elastic_data:/usr/share/elasticsearch/data/
    environment:
      discovery.type: single-node    
    ports:
    - '9200:9200'
    - '9300:9300'
    networks:
      - elk

  Kibana:
    image: kibana:7.16.2
    container_name: kibana
    ports:
    - '5601:5601'
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200  
    depends_on:
      - Elasticsearch  
    networks:
      - elk
  
  zookeeper:
    image: 'confluentinc/cp-zookeeper:latest'
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181   
    networks:
      - elk
  

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092  
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
    networks:
      - elk

  namenode:
    image: bde2020/hadoop-namenode:2.0.0-hadoop2.7.4-java8
    container_name: namenode
    volumes:
      - namenode:/hadoop/dfs/name
      - hadoopconf:/etc/hadoop
    environment:
      - CLUSTER_NAME=test
    env_file:
      - ./hadoop-hive.env
    ports:
      - "50070:50070"
    networks:
      - elk

  datanode:
    image: bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8
    container_name: datanode
    volumes:
      - datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop-hive.env
    environment:
      SERVICE_PRECONDITION: "namenode:50070"
    ports:
      - "50075:50075"
    networks:
      - elk

  kafka-connect:
    image: confluentinc/cp-kafka-connect:latest
    container_name: kafka-connect
    hostname: connect
    depends_on:
      - schema_registry
      - kafka
      - zookeeper
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8082'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java/
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    ports:
      - 8083:8083
    volumes:
      - hadoopconf:/usr/local/hadoop-conf
    command: 
      - bash
      - -c
      - |
        echo "Installing Kafka Connect hdfs"
        confluent-hub install confluentinc/kafka-connect-hdfs:10.2.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run
    networks:
      - elk

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.0.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - kafka-connect
      - kafka
      - zookeeper
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    networks:
      - elk

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8082:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092

    networks:
      - elk
volumes:
    spark_volume:
    elastic_data:
    kafka_data:
    zookeeper_data: {}
    namenode:
    datanode:
    hadoopconf:

networks:
  elk:

I suggest you debug one container at a time

docker compose up kafka-connect

Sounds like you’re running something else on port 8083, so you could try changing the port mapping as well.

And the quickstart file is only for standalone connect mode, not posting to the Connect REST API

I’ll also note that Spark itself can consume from Kafka and write to both HDFS and Elasticsearch.

1 Like

Okay! Thank you for your answer!
I’ll try it and if it doesn’t work, i’ll use Spark to consume directly from kafka and write to HDFS.

Best,