The goal is to stand up a Kafka Connect container that is running the Confluent Http Sink Connector such that the connector will pick up JSON data from a given topic and send the data to an Http REST endpoint.
What I Have Working So Far
I managed to configure a set of local Docker containers and have a working container with Kafka Connect and the Http Sink Connector. The Http Sink Connector does as I expect and picks up data from a given topic and then fires an HTTP POST to a given .NET REST endpoint successfully responding with a 200 OK.
The Next Step
Now what I want to do it stand up a second Kafka Connect container with the Http Sink Connector but this time instead of sending the data to a custom written .NET REST API, I want to send the data into an Azure Event Hub. I have configured an event hub and have been able to set up Postman such that I can post data (a JSON data model) to the event hub successfully. This took a bit of work to figure out how to properly format the Authorization header, but thanks to a few stack exchange articles I have it working.
What I Am Struggling With
I have a second container running Kafka Connect and the Http Sink Connector. I configured the ‘headers’ property with the same headers that I used for the .NET REST API endpoint, but in addition I added the Authorization header that I used from the Postman working example. I didn’t change any other configuration settings with the Http Sink Connector from what I had working in my first example.
The error that I receive from the Http Sink Connector is a rather nebulous TLS error. The following is what gets dumped to the error results topic that I have configured for the Http Sink Connector.
"Timestamp": "2021-07-27T21:03:56.259+00:00",
"Topic": "<masked for privacy>",
"Partition": 0,
"Offset": 5,
"Key": [
"Headers": {
"input_record_offset": "5",
"input_record_timestamp": "1627419835824",
"input_record_partition": "0",
"input_record_topic": "<masked for privacy>",
"error_message": "Exception while processing HTTP request for a batch of 1 records.",
"exception": "Error while processing HTTP request with Url : https://<masked for privacy><masked for privacy>/messages?api-version=2015-01, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : java.lang.IllegalArgumentException: TLS\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(\n\tat io.confluent.connect.http.HttpSinkTask.put(\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(\n\tat\n\tat java.util.concurrent.Executors$\n\tat\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(\n\tat java.util.concurrent.ThreadPoolExecutor$\n\tat\n",
"response_content": null,
"status_code": null,
"payload": "{<masked for privacy>}",
"reason_phrase": null,
"url": "https://<masked for privacy><masked for privacy>/messages?api-version=2015-01"
"Message": "Retry time lapsed, unable to process HTTP request. HTTP Response code: null, Reason phrase: null, Url: https://<masked for privacy><masked for privacy>/messages?api-version=2015-01, Response content: null, **Exception: java.lang.IllegalArgumentException: TLS, Error message: Exception while processing HTTP request for a batch of 1 records.**"
What I Have Tried to Fix the Error
Connected to the Kafka Connect container and performed a tcpdump. I have done this in the past against the first example I got working in order to debug the situation before I had it all working. In this case I performed a tcpdump against post 80 and port 443. The dump against port 80 captured no packets (as expected) and the capture against port 443 captured 6 packets but did not reveal anything that I could interpret. I suspect what I was seeing was the TSL handshake that failed.
After further reading on configuring for Azure Event Hubs I configured three properties in the Kafka Connector. Those settings are supposed to say that when a Connector is producing events that it should use SASL. Here is what I added to my Kafka Connect configuration:
CONNECT_PRODUCER_SASL_JAAS_CONFIG: required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal Shared Access Policy for the Primary Key>"
The following change appears to have had zero impact, negative or positive because I continue to get the same error from the Http Sink Connector.
I am wondering if there are other ways that I can debug what is going on or if someone has any experience with doing what I am attempting to do. I believe it is in fact feasible, but I think the trick in all this is getting the security configured just right. I don’t understand why the Postman example works, but the Http Sink Connector fails with a TLS exception.
Any advice, help, suggestions would be greatly appreciated!
For additional context here is the configuration for Kafka Connect from my docker-compose file and the configuration of the Http Sink Connector:
Kafka Connect from docker-compose
image: confluentinc/cp-kafka-connect:5.4.0
hostname: kafka-connect-azure
container_name: kafka-connect-azure
- zookeeper
- kafka
- kafka-schema-registry
- kafka-rest-proxy
- "8084:8084"
CONNECT_CLIENT_ID: "kafka-connect-azure"
CONNECT_GROUP_ID: "kafka-connect-azure-group"
# required EH Kafka security settings
#CONNECT_SASL_MECHANISM=PLAIN required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
CONNECT_PRODUCER_SASL_JAAS_CONFIG: required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal Shared Access Policy for the Primary Key>"
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-azure-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-azure-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-azure-status
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect-azure/jars,/usr/share/confluent-hub-components'
#CONNECT_LOG4J_LOGGERS: ",org.apache.kafka.connect=TRACE"
- ./connectors-azure:/etc/kafka-connect-azure/jars/
Http Sink Connector
"name": "AzureEventHubSink",
"config": {
"auth.type": "NONE",
"topics": "<masked for privacy>",
"tasks.max": "3",
"http.api.url": "https://<masked for privacy><masked for privacy>/messages?api-version=2015-01",
"request.method": "POST",
"request.body.format": "json",
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"key.converter": "",
"header.converter": "",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.null.values": "fail",
"behavior.on.error": "log",
"headers": "Authorization: SharedAccessSignature <masked for privacy>",
"header.separator": "|",
"max.retries": 10,
"": 3000,
"": "408,500-",
"max.poll.records": 1,
"": 900000,
"": 30000,
"": 30000,
"": false,
"batch.max.size": 1,
"reporter.bootstrap.servers": "PLAINTEXT://kafka:29092",
"": "<masked for privacy>",
"reporter.result.topic.replication.factor": "1",
"reporter.result.topic.partitions": 1,
"reporter.result.topic.value.format": "json",
"":"<masked for privacy>",
"reporter.error.topic.partitions": 1,
"reporter.error.topic.value.format": "json",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.tolerance": "all",
"": "<masked for privacy>",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true,
"confluent.topic.bootstrap.servers": "PLAINTEXT://kafka:29092",
"confluent.topic.replication.factor": "1"