Help needed for Salesforce Platform Event Sink Connector POC

Hi There,

I am new to Kafka and Confluent cloud. In the past few days, I have made some progress and have been able to setup managed connectors - Salesforce Platform Event Source & Salesforce Platform Event Sink connectors.

I am able to use the source connector and push the message into a Kafka topic from Salesforce without any problem. I am having issues sending data from Sink connector back to Salesforce.

I see that the Sink connector cannot accept JSON type. It needs JSON_SR or other types. I am not sure what is the best way to convert json data coming into kafka topic from different sources to JSON_SR format.

I have tried using ksqldb stream to convert json to json_sr. It seems that the stream is converting the data and saving it into the new topic but it seems sink is not picking up this message and sending it to Salesforce. I cannot see this data in the dlq or error or success topics. It seems like the event is not picked by Sink connector. However, as soon as I manually produce a message into the topic that is being used by Sink connector, it creates an exception message with invalid magic byte in the dlq topic.

Is a commit required from the ksqldb stream for sink connector to pick up the event?

I would really appreciate help with this.

Thank you very much.

Hi @ssmathur , I am linking your slack thread between you and @nbommu1 where you discussed this question:
https://confluentcommunity.slack.com/archives/C5BRM0TN0/p1643804744942429

Better yet, copying it here so you & others can refer to it later:

Siddharth Mathur
Hi, is anyone using SalesforcePlatformEventSinkConnector connector? I have managed to get it running on confluent cloud, however, message from kafka topic are not reaching salesforce. I am using kcat to produce message in kafka. Any help will be very much appreciated. Thank you.

Niranjan Bommu
how do you know its not reaching salesforce?

Niranjan Bommu
if you look at the DLQ in sink connector, you will see message with exact error why it is failing.

Siddharth Mathur
Oh ok. Do I need to install DLQ connector? I can see messages in DLQ in connector

Siddharth Mathur
Thanks for your help Niranjan. I very much appreciate it.
I cant find log anywhere? Is it possible to check the request sent out by connector to salesforce?

Niranjan Bommu
nope, by default it creates DQL, that you can check in the UI.

Niranjan Bommu
quick question, you ccloud is dedicated?

Siddharth Mathur
I am using confluent.cloud

Siddharth Mathur
when I click on messages in DQL on UI, it takes me to dlq-lcc-mjwd1 topic and there is nothing in there.

Niranjan Bommu
yeah, if sink fails then message will end up in DLQ.

Siddharth Mathur
how can I debug where this could be failing?

Niranjan Bommu
what kind of clcoud cluster you have, basic, standard, dedicated?

Siddharth Mathur
basic

Niranjan Bommu
are you able to see messages in the actual topic?

Siddharth Mathur
no

Siddharth Mathur
yes: using kcat

Siddharth Mathur
not in the UI anywhere

Niranjan Bommu
how you are checking messages in UI?

Niranjan Bommu
what is jump to offset you are setting?

Siddharth Mathur
I am not sure, I cant see it under topics

Siddharth Mathur
I have been looking at all the pages on the UI but I have not found anything

Niranjan Bommu

Siddharth Mathur
ah ok, it was not set

Niranjan Bommu
try offset -2 and check each partition.

Niranjan Bommu
-2 earliest, -1 latest

Siddharth Mathur
great!

Siddharth Mathur
Thank you so much :pray:

Siddharth Mathur
I see all the messages that I have pushed from Salesforce and Kcat in the UI as well.
Now need to find out what is the wall in message going from confluent to Salesforce using sink connector.

Niranjan Bommu
is there way to check in salesforce you are able to reach from conflunet cloud?

Siddharth Mathur
Yes, request is not reaching salesforce

Siddharth Mathur
with the offset change I can see an exception in the dlq topic

Niranjan Bommu
cool, what it says?

Siddharth Mathur
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic

Niranjan Bommu
ah, that explains :slightly_smiling_face:

Siddharth Mathur
here is the complete trace that is there …

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic purchases: 
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:170)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:228)
    at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
    ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:203)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:106)
    ... 20 more

how did you solve this? I am getting the same error.