Hello guys,
I’m trying to send events from a test topic to Splunk. I created a simple connector as it is stated in the docs ( Splunk Sink Connector for Confluent Platform) and it is working fine. But the connector for topic with the avro, it is failed.
confluent-6.2.0/etc/kafka/connect-standalone.properties;
bootstrap.servers=127.0.0.1:9092
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=127.0.0.1:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/Users/merihbozbura/Documents/dev/confluent-6.2.0/share/java,/Users/merihbozbura/Documents/dev/confluent-6.2.0/share/confluent-hub-components
confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties;
bootstrap.servers=127.0.0.1:9092
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=127.0.0.1:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java,/Users/merih/Documents/dev/confluent-6.2.0/share/java,/Users/merih/Documents/dev/confluent-6.2.0/share/confluent-hub-components
My connector;
{
"connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
"confluent.topic.bootstrap.servers": "127.0.0.1:9092",
"tasks.max": "1",
"topics": "splunk_test",
"splunk.indexes": "kafka",
"splunk.hec.uri": "127.0.0.1:8889",
"value.converter.schema.registry.url": "127.0.0.1:8081",
"splunk.sourcetypes": "kafka",
"confluent.topic.replication.factor": "1",
"name": "splunk-merih",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"splunk.hec.token": "***********************"
}
Why is it failed to deserialize data for topic splunk_test to Avro? I can see my event in Control Center. Is there anything I am missing?
/Users/merih/Documents/dev/confluent-6.2.0/bin/kafka-avro-console-producer --topic new_topic --bootstrap-server 127.0.0.1:9092 --property value.schema=“$(< xxx.avsc)” < test_event_passed
PS: I have to delete several HTTP because of this warning (Sorry, new users can only put 2 links in a post.) So, the URLs are fine in the actual configs
confluent local services connect log;
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2021-08-23 17:33:11,248] WARN purge offsets for closed partitions=[splunk_test-0] leaving offsets={} (com.splunk.kafka.connect.SplunkSinkTask:169)
[2021-08-23 17:33:11,276] ERROR WorkerSinkTask{id=splunk-merih-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic splunk_test to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2021-08-23 17:33:11,284] INFO LoadBalancer stopped (com.splunk.hecclient.LoadBalancer:193)
[2021-08-23 17:33:11,288] INFO kafka-connect-splunk task ends with config=splunkURI:http://127.0.0.1:8889, raw:false, ack:false, indexes:kafka, sourcetypes:kafka:test, sources:, headerSupport:false, headerCustom:, httpKeepAlive:true, validateCertificates:true, trustStorePath:, socketTimeout:60, eventBatchTimeout:300, ackPollInterval:10, ackPollThreads:2, maxHttpConnectionPerChannel:2, flushWindow:30, totalHecChannels:2, enrichment:, maxBatchSize:500, numberOfThreads:1, lineBreaker:, maxOutstandingEvents:1000000, maxRetries:-1, useRecordTimestamp:true, hecEventFormatted:false, trackData:false, headerSupport:false, headerCustom:, headerIndex:splunk.header.index, headerSource:splunk.header.source, headerSourcetype:splunk.header.sourcetype, headerHost:splunk.header.host, lbPollInterval:120 (com.splunk.kafka.connect.SplunkSinkTask:356)