Confluent Http Sink & Azure Event Hubs

Context
The goal is to stand up a Kafka Connect container that is running the Confluent Http Sink Connector such that the connector will pick up JSON data from a given topic and send the data to an Http REST endpoint.

What I Have Working So Far
I managed to configure a set of local Docker containers and have a working container with Kafka Connect and the Http Sink Connector. The Http Sink Connector does as I expect and picks up data from a given topic and then fires an HTTP POST to a given .NET REST endpoint successfully responding with a 200 OK.

The Next Step
Now what I want to do it stand up a second Kafka Connect container with the Http Sink Connector but this time instead of sending the data to a custom written .NET REST API, I want to send the data into an Azure Event Hub. I have configured an event hub and have been able to set up Postman such that I can post data (a JSON data model) to the event hub successfully. This took a bit of work to figure out how to properly format the Authorization header, but thanks to a few stack exchange articles I have it working.

What I Am Struggling With
I have a second container running Kafka Connect and the Http Sink Connector. I configured the ‘headers’ property with the same headers that I used for the .NET REST API endpoint, but in addition I added the Authorization header that I used from the Postman working example. I didn’t change any other configuration settings with the Http Sink Connector from what I had working in my first example.

The error that I receive from the Http Sink Connector is a rather nebulous TLS error. The following is what gets dumped to the error results topic that I have configured for the Http Sink Connector.

{
    "Timestamp": "2021-07-27T21:03:56.259+00:00",
    "Topic": "<masked for privacy>",
    "Partition": 0,
    "Offset": 5,
    "Key": [
      110,
      117,
      108,
      108
    ],
    "Headers": {
      "input_record_offset": "5",
      "input_record_timestamp": "1627419835824",
      "input_record_partition": "0",
      "input_record_topic": "<masked for privacy>",
      "error_message": "Exception while processing HTTP request for a batch of 1 records.",
      "exception": "Error while processing HTTP request with Url : https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01, Error Message : Exception while processing HTTP request for a batch of 1 records., Exception : java.lang.IllegalArgumentException: TLS\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeBatchRequest(HttpWriterImpl.java:387)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.executeRequestWithBackOff(HttpWriterImpl.java:306)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.sendBatch(HttpWriterImpl.java:280)\n\tat io.confluent.connect.http.writer.HttpWriterImpl.write(HttpWriterImpl.java:182)\n\tat io.confluent.connect.http.HttpSinkTask.put(HttpSinkTask.java:62)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
      "response_content": null,
      "status_code": null,
      "payload": "{<masked for privacy>}",
      "reason_phrase": null,
      "url": "https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01"
    },
    "Message": "Retry time lapsed, unable to process HTTP request. HTTP Response code: null, Reason phrase: null, Url: https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01, Response content: null, **Exception: java.lang.IllegalArgumentException: TLS, Error message: Exception while processing HTTP request for a batch of 1 records.**"
  }

What I Have Tried to Fix the Error

  • Connected to the Kafka Connect container and performed a tcpdump. I have done this in the past against the first example I got working in order to debug the situation before I had it all working. In this case I performed a tcpdump against post 80 and port 443. The dump against port 80 captured no packets (as expected) and the capture against port 443 captured 6 packets but did not reveal anything that I could interpret. I suspect what I was seeing was the TSL handshake that failed.

  • After further reading on configuring for Azure Event Hubs I configured three properties in the Kafka Connector. Those settings are supposed to say that when a Connector is producing events that it should use SASL. Here is what I added to my Kafka Connect configuration:

CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal  Shared Access Policy for the Primary Key>"

The following change appears to have had zero impact, negative or positive because I continue to get the same error from the Http Sink Connector.

I am wondering if there are other ways that I can debug what is going on or if someone has any experience with doing what I am attempting to do. I believe it is in fact feasible, but I think the trick in all this is getting the security configured just right. I don’t understand why the Postman example works, but the Http Sink Connector fails with a TLS exception.

Any advice, help, suggestions would be greatly appreciated!

For additional context here is the configuration for Kafka Connect from my docker-compose file and the configuration of the Http Sink Connector:
Kafka Connect from docker-compose

kafka-connect-azure:

    image: confluentinc/cp-kafka-connect:5.4.0

    hostname: kafka-connect-azure

    container_name: kafka-connect-azure

    depends_on:

      - zookeeper

      - kafka

      - kafka-schema-registry

      - kafka-rest-proxy

    ports:

      - "8084:8084"

    environment:

      CONNECT_REST_PORT: 8084

      CONNECT_CLIENT_ID: "kafka-connect-azure"

      CONNECT_GROUP_ID: "kafka-connect-azure-group"

      CONNECT_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:29092"

      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-azure"

      # required EH Kafka security settings

      # CONNECT_SECURITY_PROTOCOL: "PLAINTEXT"

      #CONNECT_SECURITY_PROTOCOL=SASL_SSL

      #CONNECT_SASL_MECHANISM=PLAIN

      #CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL

      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN

      CONNECT_PRODUCER_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<this was taken from the Azure Event Hub portal  Shared Access Policy for the Primary Key>"

      # CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"

      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: kafka-schema-registry:8081

      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: kafka-schema-registry:8081

      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter

      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

      # CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter

      # CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-azure-configs

      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-azure-offsets

      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-azure-status

      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"

      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"

      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect-azure/jars,/usr/share/confluent-hub-components'

      #CONNECT_LOG4J_LOGGERS: "log4j.logger.io.confluent.connect.http.HttpSinkConnector=TRACE,org.apache.kafka.connect=TRACE"

    volumes:

      - ./connectors-azure:/etc/kafka-connect-azure/jars/

Http Sink Connector

{

  "name": "AzureEventHubSink",

  "config": {

    "auth.type": "NONE",

    "topics": "<masked for privacy>",

    "tasks.max": "3",

    "http.api.url": "https://<masked for privacy>.servicebus.windows.net/<masked for privacy>/messages?api-version=2015-01",

    "request.method": "POST",

    "request.body.format": "json",

    "connector.class": "io.confluent.connect.http.HttpSinkConnector",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",

    "header.converter": "org.apache.kafka.connect.storage.StringConverter",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",

    "value.converter.schemas.enable": "false",

    "behavior.on.null.values": "fail",

    "behavior.on.error": "log",

    "headers": "Authorization: SharedAccessSignature <masked for privacy>",

    "header.separator": "|",

    "max.retries": 10,

    "retry.backoff.ms": 3000,

    "retry.on.status.codes": "408,500-",

    "max.poll.records": 1,

    "max.poll.interval.ms": 900000,

    "http.connect.timeout.ms": 30000,

    "http.request.timeout.ms": 30000,

    "batch.json.as.array": false,

    "batch.max.size": 1,

    "reporter.bootstrap.servers": "PLAINTEXT://kafka:29092",

    "reporter.result.topic.name": "<masked for privacy>",

    "reporter.result.topic.replication.factor": "1",

    "reporter.result.topic.partitions": 1,

    "reporter.result.topic.value.format": "json",

    "reporter.error.topic.name":"<masked for privacy>",

    "reporter.error.topic.replication.factor":"1",

    "reporter.error.topic.partitions": 1,

    "reporter.error.topic.value.format": "json",

    "errors.log.enable": true,

    "errors.log.include.messages": true,

    "errors.tolerance": "all",

    "errors.deadletterqueue.topic.name": "<masked for privacy>",

    "errors.deadletterqueue.topic.replication.factor": 1,

    "errors.deadletterqueue.context.headers.enable": true,

    "confluent.topic.bootstrap.servers": "PLAINTEXT://kafka:29092",

    "confluent.topic.replication.factor": "1"

  }

}

To circle back around on this in case anyone else runs into the same issue that I was having, I figured out the solution for this particular use case.

The problem ended up being the version of TLS that the Http Sink Connector was configured for as a default. Default was set to TLS, which does not necessarily mean TLSv1.2. Since the Http Sink Connector (and I believe Kafka Connect) run using Java version 1.8, the default TLS is TLSv1.2. Once I configured the following:

"https.ssl.protocol": "TLSv1.2",
"https.ssl.provider": "SunJSSE",

and ensured that I had a non-expired SAS, the call to the web API worked as expected.

While this is good, what I only come to realize is that the SAS needs to be dynamically generated because it contains an expiration that will eventually result in an expired token.

Now I am working on figuring out how to use AAD OAuth and the Http Sink Connector. I almost have it working but ran into a different problem. I am going to create a new post with this new problem to see if someone can help me out.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.