RabbitMQ Source Connector with Kafka not getting installed on Kubernetes Cluster

I want to fetch messages from RabbitMQ and pass them to Kafka. For this I tried to use RabbitMQ Source Connector in my Kubernetes cluster, but it is not getting installed.

I have kafka connect pod running and when I run the connector pod, it gives error
“2024-09-26 08:43:58,201 ERROR IO error forwarding REST request: (org.apache.kafka.connect.runtime.rest.RestClient) [qtp1626478944-542]
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Connect Timeout”

I verified using connect api. The plugin is listed in http://localhost:8083/connector-plugins api but I get empty error when use http://localhost:8083/connectors

Can someone help in sharing the steps to configure this connector on Kubernetes pod

hey @ssg weclcome :slight_smile:

would you mind sharing your config and your setup?
are you running confluent platform?

Hi @mmuehlbeyer

I have deployed Kafka using Strimzi operator. I have created a custom image with Confluent RabbitMQ Source Connector jar files and placed them in /opt/kafka/plugins directory

Kafka Connect manifest file

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: rabbitmq-connect-cluster-1
  namespace: messaging
  labels:
    strimzi.io/cluster: mqtt-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  image: simardocker/kafkaconnect:new
  replicas: 1
  bootstrapServers: 'mqtt-cluster-kafka-bootstrap:9092'
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    plugin.path: /opt/kafka/plugins

Kafka Connector manifest file:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mqtt-connector-mysql-1
  namespace: messaging
  labels:
    strimzi.io/cluster: rabbitmq-connect-cluster-1
spec:
  class: io.confluent.connect.r`Preformatted text`abbitmq.RabbitMQSourceConnector
  config:
    confluent.topic.bootstrap.servers: mqtt-cluster-kafka-bootstrap:9092
    kafka.topic: rabbitmq-topic
    rabbitmq.host: rabbitmq-cluster
    rabbitmq.port: 5672
    rabbitmq.queue: mqtt_test_queue_1  # Ensure this property is set
    rabbitmq.virtual.host: /
    rabbitmq.automatic.recovery.enabled: true
    rabbitmq.network.recovery.interval.ms: 10000
    rabbitmq.delivery.timeout.ms: 60000
    rabbitmq.prefetch.count: 1000
    rabbitmq.ack.mode: AUTO
    rabbitmq.auth.mechanism: EXTERNAL
    rabbitmq.username: default_user_XkRtTxF3k1Hp-xWcUOd
    rabbitmq.password: <oauth2 token>
    rest.advertised.host.name: rabbitmq-connect-cluster-1-connect

In RabbitMQ, oauth2 plugin is enabled and token based authorization is configured

is the advertised hostname resolvable ?

any other errors in the logs?