Databricks Spark with Schema Registry

Hi there,
I’m trying to build Spark based Kafka Producer and Consumer in Databricks that sends and consume Avro data and wanted to use Schema registry url for mapping the Data schema.
Unfortunately I’m not able to find some good example on it and need your help on any tested code Scala/Python with Spark.
Thanks for your time.

Cheers
Abhi

Here’s a couple of references that should help:

1 Like

Hi, you can find examples also in this recent blog post: Build Streaming Data Pipelines with Confluent, Databricks, and Azure
That also include usage of secured schema registry!
Let us know how it goes!

2 Likes

Thanks for your response and sharing the latest blog on it. It will really help user community.
In out Kafka cluster, schema registry is having https url. I have below queries.

  1. User has read/write permission for Kafka topic. to use Schema registry, do we need to explicitly provide the Read/Write permission to the user?

  2. I was referring link https://docs.confluent.io/platform/current/schema-registry/schema_registry_ccloud_tutorial.html#:~:text=To%20manually%20register%20the%20schema,to%20define%20the%20new%20schema where it is mentioned to

schema.registry.basic.auth.user.info= <SR API KEY>:<SR API SECRET>

Could you please tell me if we have to configure it while writting Producer/COnsumer from Databricks?

If so , what is the correct parameter for databricks specifics as in Databricks we put Prefix as "**kafka.**bootstrap.servers.
Thank you ,
Abhishek

Hi @abhietc31 , unfortunately the Databricks implementation doesn’t support a secured schema registry yet. So you need to use the schemaregistry client manually to decode the messages. This is described in the blog post I linked above, (see the getSchema(id) function ) but I am pasting here the code for reference:

from confluent_kafka.schema_registry import SchemaRegistryClient
import ssl

schema_registry_conf = {
    'url': schemaRegistryUrl,
    'basic.auth.user.info': '{}:{}'.format(confluentRegistryApiKey, confluentRegistrySecret)}

schema_registry_client = SchemaRegistryClient(schema_registry_conf)

import pyspark.sql.functions as fn
from pyspark.sql.types import StringType

binary_to_string = fn.udf(lambda x: str(int.from_bytes(x, byteorder='big')), StringType())

clickstreamTestDf = (
  spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", confluentBootstrapServers)
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret))
  .option("kafka.ssl.endpoint.identification.algorithm", "https")
  .option("kafka.sasl.mechanism", "PLAIN")
  .option("subscribe", confluentTopicName)
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load()
  .withColumn('key', fn.col("key").cast(StringType()))
  .withColumn('fixedValue', fn.expr("substring(value, 6, length(value)-5)"))
  .withColumn('valueSchemaId', binary_to_string(fn.expr("substring(value, 2, 4)")))
  .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', 'valueSchemaId','fixedValue')
)


import pyspark.sql.functions as fn
from pyspark.sql.avro.functions import from_avro

def parseAvroDataWithSchemaId(df, ephoch_id):
  cachedDf = df.cache()
  
  fromAvroOptions = {"mode":"FAILFAST"}
  
  def getSchema(id):
    return str(schema_registry_client.get_schema(id).schema_str)

  distinctValueSchemaIdDF = cachedDf.select(fn.col('valueSchemaId').cast('integer')).distinct()

  for valueRow in distinctValueSchemaIdDF.collect():

    currentValueSchemaId = sc.broadcast(valueRow.valueSchemaId)
    currentValueSchema = sc.broadcast(getSchema(currentValueSchemaId.value))
    
    filterValueDF = cachedDf.filter(fn.col('valueSchemaId') == currentValueSchemaId.value)
    
    filterValueDF \
      .select('topic', 'partition', 'offset', 'timestamp', 'timestampType', 'key', from_avro('fixedValue', currentValueSchema.value, fromAvroOptions).alias('parsedValue')) \
      .write \
      .format("delta") \
      .mode("append") \
      .option("mergeSchema", "true") \
     .save(deltaTablePath) 

display(clickstreamTestDf)
2 Likes

That’s very nice explanation. I really loved it.
Blog is written very thoughtfully.
I’m getting stuck at Spark-Kafka producer side which is missing in the blog and can’t just test consumer.
Could you please suggest on how to use Schema Registry and to_Avro to producer Avro data in Kafka topic and while producing , how to use Schema registry for mapping the schema plz ?
Thanks,
Abhishek

Hi gianlucanatali, did you get chance to look on my above request plz?

Hi @abhietc31 , I didn’t try the producer yet, but maybe you can hava a look at this example python code?
https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/python.html
Let’s see if someone else in the community tried this already, if they can share their experience.
Keep us posted!

1 Like

Hi
@abhietc31 @gianlucanatali … Anyone tried the Producer Code example for above request. Exactly we are looking for above example producer either in JSON or AVRO format from Databricks using pyspark.
Could someone help me out here!

I tried various things, but I am unable to get my app to consume the topic correctly. I am using pyspark structured streaming, and for some reason the from_avro() doesn’'t deserialize the stream correctly. For example, this is how the output looks

+---+--------+---------------------------------------+
|key|topic   |value                                  |
+---+--------+---------------------------------------+
|0  |ad_topic|{, , -1, 84035, 1.5673984559807316E-76}|
|1  |ad_topic|{, , -1, 84035, 1.567405013908699E-76} |

On the Confluent UI, the messages are correctly published

{
  "ad_id": "48413",
  "timestamp": "2023-01-01 00:23:57",
  "clicks": 1,
  "views": 2,
  "costs": 67.24
}

any clue to what i am missing here?