Http sink connector creation issue

I am trying to create my first http sink connector, so I create all topic need it for the connector source, DLT, success response and error response topic but still is not working error: org.apache.kafka.connect.errors.ConnectException: Unable to manage topics:
at io.confluent.connect.reporter.ReporterAdminClient.handleExecutionException(ReporterAdminClient.java:109)
at io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:57)
at io.confluent.connect.reporter.Reporter.createDestinationTopicsIfNeeded(Reporter.java:426)
at io.confluent.connect.reporter.Reporter.configure(Reporter.java:81)
at io.confluent.connect.http.HttpSinkTask.start(HttpSinkTask.java:50)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:325)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
at io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:53)
… 12 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics

My http Sink config:
{
“topics”: “topic-source”,
“name”: “usergroupinfo-httpsink-connect”,
“request.method”: “POST”,
“max.request.size”: “1048576”,
“auth.username”: “kafka”,
“value.converter.schemas.enable”: “false”,
“batch.max.size”: “1”,
“http.api.url”: “http://xxx”,
“retries”: “3”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“auth.password”: “kafka”,
“auth.type”: “basic”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“read.timeout.ms”: “5000”,
“tasks.max”: “3”,
“connection.timeout.ms”: “5000”,
“connector.class”: “io.confluent.connect.http.HttpSinkConnector”,
“bootstrap.servers”: “xxx”,
“reporter.bootstrap.servers”: “xxx”,
“request.body.format”: “json”,
“retry.backoff.ms”: “1000”,
“headers”: “Content-Type:application/json”,
“reporter.result.topic.name”: “Http.Success.Responses”,
“reporter.error.topic.name”: “Http.Error.Responses”
}

I don’t understande why he try to create a topic , all topic are already exist?
Br/

hey @zeggarin welcome :slight_smile:

your connect cluster is running fine I assume?
any logs in these logs?

best,
michael

Thanks mmuehlbeyer for your reply,
yes my cluster is healthy and running well
part of log from my pod:
[2024-07-24 12:47:33,139] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,139] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Cancelled in-flight METADATA request with correlation id 208894 due to node -1 being disconnected (elapsed time since creation: 100ms, elapsed time since send: 100ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,139] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Cancelled in-flight INIT_PRODUCER_ID request with correlation id 208895 due to node -1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,139] WARN [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Bootstrap broker kafka:9090 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1232)
[2024-07-24 12:47:33,202] INFO [usergroupinfo-httpsink-connect|worker] [AdminClient clientId=usergroupinfo-httpsink-connect-license-manager] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,202] INFO [usergroupinfo-httpsink-connect|worker] [AdminClient clientId=usergroupinfo-httpsink-connect-license-manager] Cancelled in-flight METADATA request with correlation id 711 due to node -1 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, request timeout: 8319ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,206] WARN [dborxp2.5-postgre-outbox-connector|task-0|offsets] Couldn’t commit processed log positions with the source database due to a concurrent connector shutdown or restart (io.debezium.connector.common.BaseSourceTask:366)
[2024-07-24 12:47:33,307] INFO [usergroupinfo-httpsink-connect|task-0] [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,308] INFO [usergroupinfo-httpsink-connect|task-0] [Producer clientId=producer-1] Cancelled in-flight METADATA request with correlation id 208495 due to node -1 being disconnected (elapsed time since creation: 100ms, elapsed time since send: 100ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,308] INFO [usergroupinfo-httpsink-connect|task-0] [Producer clientId=producer-1] Cancelled in-flight INIT_PRODUCER_ID request with correlation id 208496 due to node -1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,308] WARN [usergroupinfo-httpsink-connect|task-0] [Producer clientId=producer-1] Bootstrap broker kafka:9090 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1232)
[2024-07-24 12:47:33,336] INFO [usergroupinfo-httpsink-connect|task-2] [Producer clientId=producer-3] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,336] INFO [usergroupinfo-httpsink-connect|task-2] [Producer clientId=producer-3] Cancelled in-flight METADATA request with correlation id 208642 due to node -1 being disconnected (elapsed time since creation: 100ms, elapsed time since send: 100ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,337] INFO [usergroupinfo-httpsink-connect|task-2] [Producer clientId=producer-3] Cancelled in-flight INIT_PRODUCER_ID request with correlation id 208643 due to node -1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,337] WARN [usergroupinfo-httpsink-connect|task-2] [Producer clientId=producer-3] Bootstrap broker kafka:9090 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1232)
[2024-07-24 12:47:33,340] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,340] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Cancelled in-flight METADATA request with correlation id 208897 due to node -1 being disconnected (elapsed time since creation: 100ms, elapsed time since send: 100ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,340] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Cancelled in-flight INIT_PRODUCER_ID request with correlation id 208898 due to node -1 being disconnected (elapsed time since creation: 0ms, elapsed time since send: 0ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,340] WARN [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-2] Bootstrap broker kafka:9090 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1232)
[2024-07-24 12:47:33,415] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-6] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient:1090)
[2024-07-24 12:47:33,415] INFO [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-6] Cancelled in-flight API_VERSIONS request with correlation id 12239 due to node -1 being disconnected (elapsed time since creation: 2ms, elapsed time since send: 2ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient:406)
[2024-07-24 12:47:33,415] WARN [usergroupinfo-httpsink-connect|task-1] [Producer clientId=producer-6] Bootstrap broker kafka.xxxx:433 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient:1232)

And when I check the connector status using connect api, had this:

{
“name”: “usergroupinfo-httpsink-connect”,
“connector”: {
“state”: “UNASSIGNED”,
“worker_id”: “connect-0.connect.xxx.local:8083”
},
“tasks”: [
{
“id”: 0,
“state”: “FAILED”,
“worker_id”: “connect-0.connect.xxx.local:8083”,
“trace”: “org.apache.kafka.connect.errors.ConnectException: Unable to manage topics:\n\tat io.confluent.connect.reporter.ReporterAdminClient.handleExecutionException(ReporterAdminClient.java:109)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:57)\n\tat io.confluent.connect.reporter.Reporter.createDestinationTopicsIfNeeded(Reporter.java:426)\n\tat io.confluent.connect.reporter.Reporter.configure(Reporter.java:81)\n\tat io.confluent.connect.http.HttpSinkTask.start(HttpSinkTask.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:325)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:227)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics\n\tat java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)\n\tat java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005)\n\tat org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)\n\tat io.confluent.connect.reporter.ReporterAdminClient.createTopic(ReporterAdminClient.java:53)\n\t… 12 more\nCaused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics\n”
}
],
“type”: “sink”
}

Br/

the sink connector tries to write data to
the topics specified by

“reporter.result.topic.name”: “Http.Success.Responses”,
“reporter.error.topic.name”: “Http.Error.Responses”

see Configuration Reference for HTTP Sink Connector for Confluent Platform | Confluent Documentation for reference

Thanks mmuehlbeyer, I think my Lab cluster was not Ok, I create a new connector on dev but now when I check the status I had this error:
{
“name”: “httpsink-connect”,
“connector”: {
“state”: “FAILED”,
“worker_id”: “connect-0.xxxx.cluster.local:8086”,
“trace”: “org.apache.kafka.common.errors.TimeoutException: License topic could not be created\nCaused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics\n”
},
“tasks”: ,
“type”: “sink”
}

I don’t know why http sink connect need license topic ?

A license is needed for this connector (see here and documentation here).

It wouldn’t actually create the topic despite createTopics being in the stacktrace. It’d see that the topic exists and move on, but it still needs the admin client to connect to do that. I think that the reporter.* specific configs here are what you need (params dependent on how security is set up on your cluster).

One other thing to check: if auth.type is basic then I would expect connection.user and connection.password params instead of auth.username and auth.password (see here). Where are you seeing those parameters documented or in an example?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.