Salesforce Platform Event Sink Connector

Hi all,
I am trying to produce an event into a topic (AVRO Scheme) and then I want to use Kafka Salesforce Platform Event Sink connector to generate a platform event on Salesforce CRM. Now, I am using REST Api to produce my message as shown in this screen:

Message is sent to my topic (related to the connector), but I am seeing that message is sent to the connector DLQ because of a Deserialization error:


[
{
“key”: “Header-1”,
“stringValue”: “Header-1”
},
{
“key”: “Header-2”,
“stringValue”: “Header-2”
},
{
“key”: “__connect.errors.topic”,
“stringValue”: “topic_salesforce_avro”
},
{
“key”: “__connect.errors.partition”,
“stringValue”: “1”
},
{
“key”: “__connect.errors.offset”,
“stringValue”: “15”
},
{
“key”: “__connect.errors.connector.name”,
“stringValue”: “lcc-3r660w”
},
{
“key”: “__connect.errors.task.id”,
“stringValue”: “0”
},
{
“key”: “__connect.errors.stage”,
“stringValue”: “VALUE_CONVERTER”
},
{
“key”: “__connect.errors.class.name”,
“stringValue”: “io.confluent.connect.avro.AvroConverter”
},
{
“key”: “__connect.errors.exception.class.name”,
“stringValue”: “org.apache.kafka.connect.errors.DataException”
},
{
“key”: “__connect.errors.exception.message”,
“stringValue”: "Failed to deserialize data for topic topic_salesforce_avro to Avro: "
},
{
“key”: “__connect.errors.exception.stacktrace”,
“stringValue”: “org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic topic_salesforce_avro to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)\n\tat java.base/java.lang.Thread.run(Thread.java:831)\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.(AbstractKafkaAvroDeserializer.java:323)\n\tat io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:164)\n\tat io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\t… 17 more\n”
}
]


Could anyone help me with this issue? I want to know if there are some example using AVRO Schema, my need is to generate a platform event on Salesforce once the message is produced into my topic.

Thanks in advance,
Gianluca

Hi @Gianloko,

Avro isn’t yet supported in the Produce REST API. Try using org.apache.kafka.connect.json.JsonConverter in the connector config. Or, write Avro-formatted example records, e.g., using kafka-avro-console-producer. Give one of those a shot and report back on how it goes.

HTH,
Dave

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.