Better yet, copying it here so you & others can refer to it later:
Siddharth Mathur
Hi, is anyone using SalesforcePlatformEventSinkConnector connector? I have managed to get it running on confluent cloud, however, message from kafka topic are not reaching salesforce. I am using kcat to produce message in kafka. Any help will be very much appreciated. Thank you.
Niranjan Bommu
how do you know its not reaching salesforce?
Niranjan Bommu
if you look at the DLQ in sink connector, you will see message with exact error why it is failing.
Siddharth Mathur
Oh ok. Do I need to install DLQ connector? I can see messages in DLQ in connector
Siddharth Mathur
Thanks for your help Niranjan. I very much appreciate it.
I cant find log anywhere? Is it possible to check the request sent out by connector to salesforce?
Niranjan Bommu
nope, by default it creates DQL, that you can check in the UI.
Niranjan Bommu
quick question, you ccloud is dedicated?
Siddharth Mathur
I am using confluent.cloud
Siddharth Mathur
when I click on messages in DQL
on UI, it takes me to dlq-lcc-mjwd1
topic and there is nothing in there.
Niranjan Bommu
yeah, if sink fails then message will end up in DLQ.
Siddharth Mathur
how can I debug where this could be failing?
Niranjan Bommu
what kind of clcoud cluster you have, basic, standard, dedicated?
Siddharth Mathur
basic
Niranjan Bommu
are you able to see messages in the actual topic?
Siddharth Mathur
no
Siddharth Mathur
yes: using kcat
Siddharth Mathur
not in the UI anywhere
Niranjan Bommu
how you are checking messages in UI?
Niranjan Bommu
what is jump to offset you are setting?
Siddharth Mathur
I am not sure, I cant see it under topics
Siddharth Mathur
I have been looking at all the pages on the UI but I have not found anything
Niranjan Bommu
Siddharth Mathur
ah ok, it was not set
Niranjan Bommu
try offset -2 and check each partition.
Niranjan Bommu
-2
earliest, -1
latest
Siddharth Mathur
great!
Siddharth Mathur
Thank you so much
Siddharth Mathur
I see all the messages that I have pushed from Salesforce and Kcat in the UI as well.
Now need to find out what is the wall in message going from confluent to Salesforce using sink connector.
Niranjan Bommu
is there way to check in salesforce you are able to reach from conflunet cloud?
Siddharth Mathur
Yes, request is not reaching salesforce
Siddharth Mathur
with the offset change I can see an exception in the dlq topic
Niranjan Bommu
cool, what it says?
Siddharth Mathur
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic
Niranjan Bommu
ah, that explains
Siddharth Mathur
here is the complete trace that is there …
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic purchases:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:500)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:200)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:142)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:500)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:233)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:170)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:228)
at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:203)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:106)
... 20 more