Hello everyone,
I’m working in a project with Confluent with the Kafka Connect connector CDC Oracle and The connector serialize with io.confluent.connect.avro.AvroConverter. The problem is that in some Oracle tables there are some fields with Number Type without precision nor scale, for this reason the Schema registry is created in this way:
{
"name": "TEST_NUMBER",
"type": [
"null",
{
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}
When I use the library confluent to deserialize data, the fields with Number type without precision nor scale are showed with bytes and no as decimal. After that I do a from_json with schema and the columns appear as null. I’m working with Spark 3.1.2
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.1.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.1.1</version>
</dependency>
¿Do you know if these library supported this feature?
¿Should I add anything more to get fine the values?
I added the images when dataframe is deserialized and when is used the funcion
from_json, also I added my code in case it can help.
Code:
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.spark.sql.streaming.Trigger
import org.apache.avro.generic.GenericRecord
import org.apache.avro.Schema.Parser
import org.apache.spark.sql.avro.SchemaConverters
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters.mapAsJavaMapConverter
object SparkAvroConfluent {
val schemaRegistryURL = "schemaRegistryURL"
val restService = new RestService(schemaRegistryURL)
val confluentRegistryApiKey = "confluentRegistryApiKey"
val confluentRegistrySecret = "confluentRegistrySecret"
val props = Map(
"basic.auth.credentials.source" -> "USER_INFO",
"schema.registry.basic.auth.user.info" -> s"$confluentRegistryApiKey:$confluentRegistrySecret",
"mode"->"PERMISSIVE"
).asJava
val schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, props)
private val kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
val topic = "topic_name"
private val avroSchema = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value").getSchema
private val sparkSchema = SchemaConverters.toSqlType(new Parser().parse(avroSchema))
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val group_id = "1"
val bootstrap_servers = "bootstrap_servers"
val kafOptions = Map(
"kafka.sasl.jaas.config" -> "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";",
"kafka.sasl.mechanism" -> "PLAIN",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.bootstrap.servers" -> bootstrap_servers,
"group.id" -> group_id,
"enable.auto.commit" -> "false",
"subscribe" -> topic,
"includeHeaders" -> "true",
//"subscribePattern"-> subscribePattern,
"startingOffsets" -> "earliest"
)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
DeserializerWrapper.deserializer.deserialize(bytes)
)
val kafkaDataFrame = spark.readStream.format("kafka").options(kafOptions).load()
println("avroSchema")
println(avroSchema)
def saveToDB = (df: Dataset[Row], batchId: Long) => {
df.show(false)
val valueDataFrame = df.selectExpr("""deserialize(value) AS message""")
valueDataFrame.show(false)
import org.apache.spark.sql.functions._
val formattedDataFrame = valueDataFrame.select(
from_json(col("message"), sparkSchema.dataType).alias("parsed_value"))
.select("parsed_value.*")
formattedDataFrame.show(false)
}
kafkaDataFrame
.writeStream
.format("console")
.outputMode("append")
.foreachBatch(saveToDB)
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.start()
.awaitTermination()
}
object DeserializerWrapper {
val deserializer = kafkaAvroDeserializer
}
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val genericRecord = super.deserialize(bytes).asInstanceOf[GenericRecord]
genericRecord.toString
}
}
}