[Schema Registry][AVRO]Error deserializing number type column without precision nor scala - Oracle

Hello everyone,

I’m working in a project with Confluent with the Kafka Connect connector CDC Oracle and The connector serialize with io.confluent.connect.avro.AvroConverter. The problem is that in some Oracle tables there are some fields with Number Type without precision nor scale, for this reason the Schema registry is created in this way:

    {
      "name": "TEST_NUMBER",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 127,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "127"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    }

When I use the library confluent to deserialize data, the fields with Number type without precision nor scale are showed with bytes and no as decimal. After that I do a from_json with schema and the columns appear as null. I’m working with Spark 3.1.2

        <dependency>
           <groupId>io.confluent</groupId>
           <artifactId>kafka-schema-registry-client</artifactId>
           <version>7.1.1</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.1.1</version>
        </dependency>

¿Do you know if these library supported this feature?
¿Should I add anything more to get fine the values?

I added the images when dataframe is deserialized and when is used the funcion
from_json, also I added my code in case it can help.
after_from_json

Code:

import org.apache.spark.sql.{Dataset, Row, SparkSession}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.spark.sql.streaming.Trigger
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema.Parser
import org.apache.spark.sql.avro.SchemaConverters

import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.mapAsJavaMapConverter



object SparkAvroConfluent {

  val schemaRegistryURL = "schemaRegistryURL"
  val restService = new RestService(schemaRegistryURL)
  val confluentRegistryApiKey = "confluentRegistryApiKey"
  val confluentRegistrySecret = "confluentRegistrySecret"
  val props = Map(
    "basic.auth.credentials.source" -> "USER_INFO",
    "schema.registry.basic.auth.user.info" -> s"$confluentRegistryApiKey:$confluentRegistrySecret",
    "mode"->"PERMISSIVE"
  ).asJava
  val schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
  private val kafkaAvroDeserializer = new  AvroDeserializer(schemaRegistryClient)
  val topic = "topic_name"

  private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
  private val sparkSchema = SchemaConverters.toSqlType(new Parser().parse(avroSchema))




  def main(args: Array[String]) {

    val spark = SparkSession.builder().master("local[*]").getOrCreate()

    val group_id = "1"
    val bootstrap_servers = "bootstrap_servers"
    
    val kafOptions = Map(
      "kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
      "kafka.sasl.mechanism" -> "PLAIN",
      "kafka.security.protocol" -> "SASL_SSL",
      "kafka.bootstrap.servers" -> bootstrap_servers,
      "group.id" -> group_id,
      "enable.auto.commit" -> "false",
      "subscribe" -> topic,
      "includeHeaders" -> "true",
      //"subscribePattern"-> subscribePattern,
      "startingOffsets" -> "earliest"
    )

    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      DeserializerWrapper.deserializer.deserialize(bytes)
    )

    val kafkaDataFrame = spark.readStream.format("kafka").options(kafOptions).load()

    println("avroSchema")
    println(avroSchema)

    def saveToDB = (df: Dataset[Row], batchId: Long) => {
      df.show(false)

      val valueDataFrame = df.selectExpr("""deserialize(value) AS message""")
      valueDataFrame.show(false)

      import org.apache.spark.sql.functions._
      val formattedDataFrame = valueDataFrame.select(
        from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
        .select("parsed_value.*")
      formattedDataFrame.show(false)
    }

    kafkaDataFrame
      .writeStream
      .format("console")
      .outputMode("append")
      .foreachBatch(saveToDB)
      .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
      .start()
      .awaitTermination()

  }
    object DeserializerWrapper {
      val deserializer = kafkaAvroDeserializer
    }

    class AvroDeserializer extends AbstractKafkaAvroDeserializer {
      def this(client: SchemaRegistryClient) {
        this()
        this.schemaRegistry = client
      }

      override def deserialize(bytes: Array[Byte]): String = {
        val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
        genericRecord.toString
      }
    }
}

Hi everyone,
This issue was solved from Connector CDC Oracle version 2.0.0 adding the property numeric.mapping with value best_fit_or_decimal.

https://docs.confluent.io/kafka-connect-oracle-cdc/current/configuration-properties.html
Explication Conector Confluent:
“Use best_fit_or_decimal if NUMERIC columns should be cast to Connect’s primitive type based upon the column’s precision and scale. If the precision and scale exceed the bounds for any primitive type, Connect’s DECIMAL logical type will be used instead.”

In this way when the column in Oracle is Numeric without precision or scale the schema registry added the field as double. The connector just will use decimalType if the number value is major than double type maximun number.

My new schema registry version is:

{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"STFAMPRO","type":["null","string"],"default":null},{"name":"CHFAMPRO","type":["null","string"],"default":null},{"name":"**TEST_NUMBER**","type":["null","**double**"],"default":null},{"name":"**TEST_NUMBER_DECIMAL**","type":["null","**double**"],"default":null},{"name":"table","type":["null","string"],"default":null},{"name":"SCN_CMD","type":["null","string"],"default":null},{"name":"OP_TYPE_CMD","type":["null","string"],"default":null},{"name":"op_ts","type":["null","string"],"default":null},{"name":"current_ts","type":["null","string"],"default":null},{"name":"row_id","type":["null","string"],"default":null},{"name":"username","type":["null","string"],"default":null}]}
I will close this issue. Thanks everyone!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.