We are using jdbc sink connector to read from an Avro topic and write to postgresql. We are getting the following error
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic CUSTOMER1 to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:124)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:88)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:519)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
… 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema for id 0
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:261)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:372)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:203)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
… 17 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: User is denied operation on this server.; error code: 40301
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:301)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:371)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:840)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:813)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:417)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaForDeserialize(AbstractKafkaAvroDeserializer.java:361)
Below is our jdbc sink properties
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“topics”: “CUSTOMER1”,
“table.name.format”: “cdc_${topic}”,
“connection.url”: “jdbc:postgresql://server”,
“connection.user”: “",
“connection.password”: "”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “https://server.aws.confluent.cloud”,
“value.converter.basic.auth.credentials.source”: “USER_INFO”,
“value.converter.basic.auth.user.info”: “xx:yy”,
“pk.mode”: “record_key”,
“insert.mode”: “upsert”,
“delete.enabled”: “true”,
“auto.create”: “true”,
“auto.evolve”: “true”
We can easily reproduce the issue by making the following rest call (based on the connect log, sink connector also makes the same REST call)
curl -s -u xx:yy GET https://server.aws.confluent.cloud/schemas/ids/0?fetchMaxId=false&subject=CUSTOMER1-value
{“error_code”:40301,“message”:“User is denied operation on this server.”}
We tried to give all the required access to confluent cloud user, but no success. We are not using managed confluent cloud connector. Kafka connect is running on an EC2 machine. JDBC sink connector used to work fine previously on confluent cloud. We would appreciate any help here.