This is the first time I am using the Azure Data Explorer (Kusto) Database sink connector to send data from Kafka.
The relevant access configurations for Azure have been done.
Below is the error log and connector configurations.
[2024-04-18 16:48:09,257] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2134)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:48:09,258] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Failed to reconfigure connector's tasks (sink-fabric-kusto-sankhya-tgfest), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2042)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,258] INFO SinkConnectorConfig values:
config.action.reload = restart
connector.class = com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.avro.AvroConverter
name = sink-fabric-kusto-sankhya-tgfest
predicates = []
tasks.max = 3
topics = [src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST]
topics.regex =
transforms = []
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.SinkConnectorConfig:370)
[2024-04-18 16:49:09,259] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = class io.confluent.connect.avro.AvroConverter
name = sink-fabric-kusto-sankhya-tgfest
predicates = []
tasks.max = 3
topics = [src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST]
topics.regex =
transforms = []
value.converter = class io.confluent.connect.avro.AvroConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:370)
[2024-04-18 16:49:09,277] INFO 127.0.0.1 - - [18/Apr/2024:16:49:09 +0000] "POST /connectors/sink-fabric-kusto-sankhya-tgfest/tasks?forward=true HTTP/1.1" 400 74 "-" "kafka-connect" 5 (org.apache.kafka.connect.runtime.rest.RestServer:62)
[2024-04-18 16:49:09,277] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient:177)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:80)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:104)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:121)
at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:126)
at org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource.putTaskConfigs(InternalClusterResource.java:81)
at jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:191)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,287] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient:177)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,290] INFO 127.0.0.1 - - [18/Apr/2024:16:49:09 +0000] "POST /connectors/sink-fabric-kusto-sankhya-tgfest/tasks HTTP/1.1" 400 74 "-" "kafka-connect" 25 (org.apache.kafka.connect.runtime.rest.RestServer:62)
[2024-04-18 16:49:09,291] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2134)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,291] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Failed to reconfigure connector's tasks (sink-fabric-kusto-sankhya-tgfest), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2042)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:52,356] INFO [src-mssql-sankhya-cdc-tgfest|task-0|offsets] WorkerSourceTask{id=src-mssql-sankhya-cdc-tgfest-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2024-04-18 16:50:02,357] INFO [src-mssql-sankhya-cdc-tgfest|task-0|offsets] WorkerSourceTask{id=src-mssql-sankhya-cdc-tgfest-0} Committing offsets for 8 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
{
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"kusto.ingestion.url": "https://ingest-xxxxxxxxxxxxxxxx.z5.kusto.fabric.microsoft.com",
"aad.auth.appid": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"consumer.override.bootstrap.servers": "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092",
"flush.interval.ms": "30000",
"aad.auth.authority": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"aad.auth.appkey": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
"topics": "src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST",
"tasks.max": "3",
"flush.size.bytes": "1000",
"name": "sink-fabric-kusto-sankhya-tgfest",
"kusto.query.url": "https://trd-xxxxxxxxxxxxxxxx.z5.kusto.fabric.microsoft.com",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"kusto.tables.topics.mapping": "[{'topic': 'src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST','db': 'kqldb', 'table': 'tgfes'}]",
"key.converter": "io.confluent.connect.avro.AvroConverter"
}