ssg
27 September 2024 10:28
1
I want to fetch messages from RabbitMQ and pass them to Kafka. For this I tried to use RabbitMQ Source Connector in my Kubernetes cluster, but it is not getting installed.
I have kafka connect pod running and when I run the connector pod, it gives error
“2024-09-26 08:43:58,201 ERROR IO error forwarding REST request: (org.apache.kafka.connect.runtime.rest.RestClient) [qtp1626478944-542]
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Connect Timeout”
I verified using connect api. The plugin is listed in http://localhost:8083/connector-plugins api but I get empty error when use http://localhost:8083/connectors
Can someone help in sharing the steps to configure this connector on Kubernetes pod
hey @ssg weclcome
would you mind sharing your config and your setup?
are you running confluent platform?
ssg
28 September 2024 03:57
3
Hi @mmuehlbeyer
I have deployed Kafka using Strimzi operator. I have created a custom image with Confluent RabbitMQ Source Connector jar files and placed them in /opt/kafka/plugins directory
Kafka Connect manifest file
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: rabbitmq-connect-cluster-1
namespace: messaging
labels:
strimzi.io/cluster: mqtt-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.1.0
image: simardocker/kafkaconnect:new
replicas: 1
bootstrapServers: 'mqtt-cluster-kafka-bootstrap:9092'
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
plugin.path: /opt/kafka/plugins
Kafka Connector manifest file:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mqtt-connector-mysql-1
namespace: messaging
labels:
strimzi.io/cluster: rabbitmq-connect-cluster-1
spec:
class: io.confluent.connect.r`Preformatted text`abbitmq.RabbitMQSourceConnector
config:
confluent.topic.bootstrap.servers: mqtt-cluster-kafka-bootstrap:9092
kafka.topic: rabbitmq-topic
rabbitmq.host: rabbitmq-cluster
rabbitmq.port: 5672
rabbitmq.queue: mqtt_test_queue_1 # Ensure this property is set
rabbitmq.virtual.host: /
rabbitmq.automatic.recovery.enabled: true
rabbitmq.network.recovery.interval.ms: 10000
rabbitmq.delivery.timeout.ms: 60000
rabbitmq.prefetch.count: 1000
rabbitmq.ack.mode: AUTO
rabbitmq.auth.mechanism: EXTERNAL
rabbitmq.username: default_user_XkRtTxF3k1Hp-xWcUOd
rabbitmq.password: <oauth2 token>
rest.advertised.host.name: rabbitmq-connect-cluster-1-connect
In RabbitMQ, oauth2 plugin is enabled and token based authorization is configured
is the advertised hostname resolvable ?
any other errors in the logs?