Azure Data Explorer (Kusto) Database connector error

This is the first time I am using the Azure Data Explorer (Kusto) Database sink connector to send data from Kafka.

The relevant access configurations for Azure have been done.

Below is the error log and connector configurations.

[2024-04-18 16:48:09,257] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2134)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:48:09,258] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Failed to reconfigure connector's tasks (sink-fabric-kusto-sankhya-tgfest), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2042)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,258] INFO SinkConnectorConfig values: 
	config.action.reload = restart
	connector.class = com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = class io.confluent.connect.avro.AvroConverter
	name = sink-fabric-kusto-sankhya-tgfest
	predicates = []
	tasks.max = 3
	topics = [src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST]
	topics.regex = 
	transforms = []
	value.converter = class io.confluent.connect.avro.AvroConverter
 (org.apache.kafka.connect.runtime.SinkConnectorConfig:370)
[2024-04-18 16:49:09,259] INFO EnrichedConnectorConfig values: 
	config.action.reload = restart
	connector.class = com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector
	errors.deadletterqueue.context.headers.enable = false
	errors.deadletterqueue.topic.name = 
	errors.deadletterqueue.topic.replication.factor = 3
	errors.log.enable = false
	errors.log.include.messages = false
	errors.retry.delay.max.ms = 60000
	errors.retry.timeout = 0
	errors.tolerance = none
	header.converter = null
	key.converter = class io.confluent.connect.avro.AvroConverter
	name = sink-fabric-kusto-sankhya-tgfest
	predicates = []
	tasks.max = 3
	topics = [src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST]
	topics.regex = 
	transforms = []
	value.converter = class io.confluent.connect.avro.AvroConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:370)
[2024-04-18 16:49:09,277] INFO 127.0.0.1 - - [18/Apr/2024:16:49:09 +0000] "POST /connectors/sink-fabric-kusto-sankhya-tgfest/tasks?forward=true HTTP/1.1" 400 74 "-" "kafka-connect" 5 (org.apache.kafka.connect.runtime.rest.RestServer:62)
[2024-04-18 16:49:09,277] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient:177)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:80)
	at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:104)
	at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:121)
	at org.apache.kafka.connect.runtime.rest.HerderRequestHandler.completeOrForwardRequest(HerderRequestHandler.java:126)
	at org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource.putTaskConfigs(InternalClusterResource.java:81)
	at jdk.internal.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
	at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
	at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:478)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:400)
	at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:256)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:235)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:191)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.Server.handle(Server.java:516)
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,287] ERROR Error forwarding REST request (org.apache.kafka.connect.runtime.rest.RestClient:177)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,290] INFO 127.0.0.1 - - [18/Apr/2024:16:49:09 +0000] "POST /connectors/sink-fabric-kusto-sankhya-tgfest/tasks HTTP/1.1" 400 74 "-" "kafka-connect" 25 (org.apache.kafka.connect.runtime.rest.RestServer:62)
[2024-04-18 16:49:09,291] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Request to leader to reconfigure connector tasks failed (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2134)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:09,291] ERROR [Worker clientId=connect-1, groupId=kafka-connect-cluster] Failed to reconfigure connector's tasks (sink-fabric-kusto-sankhya-tgfest), retrying after backoff. (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2042)
org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Internal request missing required signature
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:162)
	at org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:116)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$publishConnectorTaskConfigs$45(DistributedHerder.java:2131)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)
[2024-04-18 16:49:52,356] INFO [src-mssql-sankhya-cdc-tgfest|task-0|offsets] WorkerSourceTask{id=src-mssql-sankhya-cdc-tgfest-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
[2024-04-18 16:50:02,357] INFO [src-mssql-sankhya-cdc-tgfest|task-0|offsets] WorkerSourceTask{id=src-mssql-sankhya-cdc-tgfest-0} Committing offsets for 8 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:233)
{
	"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
	"kusto.ingestion.url": "https://ingest-xxxxxxxxxxxxxxxx.z5.kusto.fabric.microsoft.com",
	"aad.auth.appid": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
	"consumer.override.bootstrap.servers": "xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092,xxx.xxx.xxx.xxx:9092",
	"flush.interval.ms": "30000",
	"aad.auth.authority": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
	"aad.auth.appkey": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
	"topics": "src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST",
	"tasks.max": "3",
	"flush.size.bytes": "1000",
	"name": "sink-fabric-kusto-sankhya-tgfest",
	"kusto.query.url": "https://trd-xxxxxxxxxxxxxxxx.z5.kusto.fabric.microsoft.com",
	"value.converter": "io.confluent.connect.avro.AvroConverter",
	"kusto.tables.topics.mapping": "[{'topic': 'src-mssql-sankhya.SANKHYA_PROD.sankhya.TGFEST','db': 'kqldb', 'table': 'tgfes'}]",
	"key.converter": "io.confluent.connect.avro.AvroConverter"
}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.