SPLUNK sink Connector “FAILED” status

Hello guys,

I’m trying to send events from a test topic to Splunk. I created a simple connector as it is stated in the docs ( Splunk Sink Connector for Confluent Platform) and it is working fine. But the connector for topic with the avro, it is failed.

confluent-6.2.0/etc/kafka/connect-standalone.properties;

bootstrap.servers=127.0.0.1:9092
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=127.0.0.1:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/Users/merihbozbura/Documents/dev/confluent-6.2.0/share/java,/Users/merihbozbura/Documents/dev/confluent-6.2.0/share/confluent-hub-components

confluent-6.2.0/etc/schema-registry/connect-avro-standalone.properties;

bootstrap.servers=127.0.0.1:9092
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=127.0.0.1:8081
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java,/Users/merih/Documents/dev/confluent-6.2.0/share/java,/Users/merih/Documents/dev/confluent-6.2.0/share/confluent-hub-components

My connector;

{
  "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
  "confluent.topic.bootstrap.servers": "127.0.0.1:9092",
  "tasks.max": "1",
  "topics": "splunk_test",
  "splunk.indexes": "kafka",
  "splunk.hec.uri": "127.0.0.1:8889",
  "value.converter.schema.registry.url": "127.0.0.1:8081",
  "splunk.sourcetypes": "kafka",
  "confluent.topic.replication.factor": "1",
  "name": "splunk-merih",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "splunk.hec.token": "***********************"
}

Why is it failed to deserialize data for topic splunk_test to Avro? I can see my event in Control Center. Is there anything I am missing?

/Users/merih/Documents/dev/confluent-6.2.0/bin/kafka-avro-console-producer --topic new_topic --bootstrap-server 127.0.0.1:9092 --property value.schema="$(< xxx.avsc)" < test_event_passed

PS: I have to delete several HTTP because of this warning (Sorry, new users can only put 2 links in a post.) So, the URLs are fine in the actual configs

confluent local services connect log;

Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2021-08-23 17:33:11,248] WARN purge offsets for closed partitions=[splunk_test-0] leaving offsets={} (com.splunk.kafka.connect.SplunkSinkTask:169)
[2021-08-23 17:33:11,276] ERROR WorkerSinkTask{id=splunk-merih-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic splunk_test to Avro: 
	at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
	at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2021-08-23 17:33:11,284] INFO LoadBalancer stopped (com.splunk.hecclient.LoadBalancer:193)
[2021-08-23 17:33:11,288] INFO kafka-connect-splunk task ends with config=splunkURI:http://127.0.0.1:8889, raw:false, ack:false, indexes:kafka, sourcetypes:kafka:test, sources:, headerSupport:false, headerCustom:, httpKeepAlive:true, validateCertificates:true, trustStorePath:, socketTimeout:60, eventBatchTimeout:300, ackPollInterval:10, ackPollThreads:2, maxHttpConnectionPerChannel:2, flushWindow:30, totalHecChannels:2, enrichment:, maxBatchSize:500, numberOfThreads:1, lineBreaker:, maxOutstandingEvents:1000000, maxRetries:-1, useRecordTimestamp:true, hecEventFormatted:false, trackData:false, headerSupport:false, headerCustom:, headerIndex:splunk.header.index, headerSource:splunk.header.source, headerSourcetype:splunk.header.sourcetype, headerHost:splunk.header.host, lbPollInterval:120 (com.splunk.kafka.connect.SplunkSinkTask:356)

It looks like the issue could be related to the ‘magic byte’ where your data does not adhere to the wire format that’s expected for the Schema Registry. Check out this entry that might help you troubleshoot your issue: java - Kafka Streams - SerializationException: Unknown magic byte - Stack Overflow

TL;DR : Check to make sure you are sending is being sent in Avro.

1 Like

Looking at your kafka-avro-console-producer and connector config the topics are different, is this deliberate?

It is possible that there are some non-Avro messages on the topic that the connector is reading from? Perhaps from earlier testing? These would cause the connectors to fail, even if later ones are valid.

You could validate this by (i) use kafka-avro-console-consumer to read back the topic contents as Avro and make sure it’s all valid (ii) simply create a new test topic from scratch to write data to and change the connector config to read from.


Edit:
As @mitchell-h says below, the actual error you’re hitting is from the Avro deserialiser reading data in the value of the message that doesn’t match its expected format. You can read more about this here.

org.apache.kafka.common.errors.SerializationException: Unknown magic byte! ← this is the actual error you’re hitting.

Generally means either the key is avro and they have the string converter. Or they didn’t serialize the message on the producer side with the confluent Sr supporting clients.

Or the producer didn’t serialize it with the correct serde on the producer side.

I’m betting the producer serializer given their console consumer command line flags. I would need your producer configs in order to verify.