Context
The goal is to stand up a Kafka Connect container that is running the Confluent Http Sink Connector such that the connector will pick up JSON data from a given topic and send the data to an Http REST endpoint.
What I Have Working So Far
I managed to configure a set of local Docker containers and have a working container with Kafka Connect and the Http Sink Connector. The Http Sink Connector does as I expect and picks up data from a given topic and then fires an HTTP POST to a given .NET REST endpoint successfully responding with a 200 OK.
The Next Step
Now what I want to do it stand up a second Kafka Connect container with the Http Sink Connector but this time instead of sending the data to a custom written .NET REST API, I want to send the data into an Azure Event Hub. I have configured an event hub and have been able to set up Postman such that I can post data (a JSON data model) to the event hub successfully. This took a bit of work to figure out how to properly format the Authorization header, but thanks to a few stack exchange articles I have it working.
What I Am Struggling With
I have a second container running Kafka Connect and the Http Sink Connector. I configured the ‘headers’ property with the same headers that I used for the .NET REST API endpoint, but in addition I added the Authorization header that I used from the Postman working example. I didn’t change any other configuration settings with the Http Sink Connector from what I had working in my first example.
The error that I receive from the Http Sink Connector is a rather nebulous TLS error. The following is what gets dumped to the error results topic that I have configured for the Http Sink Connector.
{
"Timestamp": "2021-07-27T21:03:56.259+00:00",
"Topic": "<masked for privacy>",
"Partition": 0,
"Offset": 5,
"Key": [
110,
117,
108,
108
],
"Headers": {
"input_record_offset": "5",
"input_record_timestamp": "1627419835824",
"input_record_partition": "0",
"input_record_topic": "<masked for privacy>",
"error_message": "Exception while processing HTTP request for a batch of 1 records.",
"exception": "Error while processing HTTP request with Url : https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : java.lang.IllegalArgumentException: TLS\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:387)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:306)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:280)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:182)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:62)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
"response_content": null,
"status_code": null,
"payload": "{<masked for privacy>}",
"reason_phrase": null,
"url": "https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01"
},
"Message": "Retry time lapsed, unable to process HTTP request. HTTP Response code: null, Reason phrase: null, Url: https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01, Response content: null, **Exception: java.lang.IllegalArgumentException: TLS, Error message: Exception while processing HTTP request for a batch of 1 records.**"
}
What I Have Tried to Fix the Error
-
Connected to the Kafka Connect container and performed a tcpdump. I have done this in the past against the first example I got working in order to debug the situation before I had it all working. In this case I performed a tcpdump against post 80 and port 443. The dump against port 80 captured no packets (as expected) and the capture against port 443 captured 6 packets but did not reveal anything that I could interpret. I suspect what I was seeing was the TSL handshake that failed.
-
After further reading on configuring for Azure Event Hubs I configured three properties in the Kafka Connector. Those settings are supposed to say that when a Connector is producing events that it should use SASL. Here is what I added to my Kafka Connect configuration:
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal Shared Access Policy for the Primary Key>"
The following change appears to have had zero impact, negative or positive because I continue to get the same error from the Http Sink Connector.
I am wondering if there are other ways that I can debug what is going on or if someone has any experience with doing what I am attempting to do. I believe it is in fact feasible, but I think the trick in all this is getting the security configured just right. I don’t understand why the Postman example works, but the Http Sink Connector fails with a TLS exception.
Any advice, help, suggestions would be greatly appreciated!
For additional context here is the configuration for Kafka Connect from my docker-compose file and the configuration of the Http Sink Connector:
Kafka Connect from docker-compose
kafka-connect-azure:
image: confluentinc/cp-kafka-connect:5.4.0
hostname: kafka-connect-azure
container_name: kafka-connect-azure
depends_on:
- zookeeper
- kafka
- kafka-schema-registry
- kafka-rest-proxy
ports:
- "8084:8084"
environment:
CONNECT_REST_PORT: 8084
CONNECT_CLIENT_ID: "kafka-connect-azure"
CONNECT_GROUP_ID: "kafka-connect-azure-group"
CONNECT_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-azure"
# required EH Kafka security settings
# CONNECT_SECURITY_PROTOCOL: "PLAINTEXT"
#CONNECT_SECURITY_PROTOCOL=SASL_SSL
#CONNECT_SASL_MECHANISM=PLAIN
#CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal Shared Access Policy for the Primary Key>"
# CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: kafka-schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: kafka-schema-registry:8081
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-azure-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-azure-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-azure-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect-azure/jars,/usr/share/confluent-hub-components'
#CONNECT_LOG4J_LOGGERS: "log4j.logger.io.confluent.connect.http.HttpSinkConnector=TRACE,org.apache.kafka.connect=TRACE"
volumes:
- ./connectors-azure:/etc/kafka-connect-azure/jars/
Http Sink Connector
{
"name": "AzureEventHubSink",
"config": {
"auth.type": "NONE",
"topics": "<masked for privacy>",
"tasks.max": "3",
"http.api.url": "https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01",
"request.method": "POST",
"request.body.format": "json",
"connector.class": "io.confluent.connect.http.HttpSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"header.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"behavior.on.null.values": "fail",
"behavior.on.error": "log",
"headers": "Authorization: SharedAccessSignature <masked for privacy>",
"header.separator": "|",
"max.retries": 10,
"retry.backoff.ms": 3000,
"retry.on.status.codes": "408,500-",
"max.poll.records": 1,
"max.poll.interval.ms": 900000,
"http.connect.timeout.ms": 30000,
"http.request.timeout.ms": 30000,
"batch.json.as.array": false,
"batch.max.size": 1,
"reporter.bootstrap.servers": "PLAINTEXT://kafka:29092",
"reporter.result.topic.name": "<masked for privacy>",
"reporter.result.topic.replication.factor": "1",
"reporter.result.topic.partitions": 1,
"reporter.result.topic.value.format": "json",
"reporter.error.topic.name":"<masked for privacy>",
"reporter.error.topic.replication.factor":"1",
"reporter.error.topic.partitions": 1,
"reporter.error.topic.value.format": "json",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "<masked for privacy>",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true,
"confluent.topic.bootstrap.servers": "PLAINTEXT://kafka:29092",
"confluent.topic.replication.factor": "1"
}
}