Confluent schema less Json to Avro

Hi there:
We are now using event-hub connector to connect to Confulent Kafka. But our input data seems to be schema-less json. Thus, we are trying to use SMT to update schema and convert the record to Avro. The connector works fine, but we still cant see the schema in schema-registry. So is it possible to register the new schema into schema-registry and convert it to Avro through SMT. Any documents about that ? Thanks a lot

I guess it could work, but it’s a bit tricky to get it right. If you register the Avro schema in advance, you would know the id of the schema. In the SMT you can than transform the Json to a Avro binary, and prepend 5 bytes, first is a zero, and the next four are the byte representation of the schema id.

Hi gklijs:
Thanks a lot for your help. Now, We meet several questions: after transform the data from Json to Avro binary. We need to update two methods: schemaUpdateCache for schema and newrecord for the new record. Since schemaUpdateCache cant recognize the avro schema directly, we need to still use the schema under kafk.connect.data package ? Is it under avro format when it updates in Schema registry? Another question, after prepend 5 bytes into the new record. Can we directly give the avro binary to the updatedValue ? We try to give the GenericData.Record from org.apache.avro.generic.GenericData. It seems not working. Many thanks for your help

Here is our apply methods:

 override protected def newRecord(record: R, updatedSchema: Schema, updatedValue: Any): R =   record.newRecord(
      record.topic,
      record.kafkaPartition,
      updatedSchema,
      updatedValue,
      record.valueSchema,
      record.value,
      record.timestamp)

 override def apply(record: R): R = applyWithSchema(record)

  private def applyWithSchema(record: R) = {


    val nrecord :util.Map[String,Object] =
      new ObjectMapper().readValue(record.value().toString, classOf[util.HashMap[String,Object]])

    val valueSchemaJson =
      s"""
    {
      "namespace": "com.avro.junkie",
      "type": "record",
      "name": "User2",
      "fields": [
        {"name": "name", "type": "string"},
        {"name": "favoriteNumber",  "type": "int"},
        {"name": "favoriteColor", "type": "string"}
      ]
    }
  """

    val schemaParser = new Parser
    val valueSchemaAvro = schemaParser.parse(valueSchemaJson)
    print(valueSchemaAvro)
    val avroRecord = new GenericData.Record(valueSchemaAvro)

    avroRecord.put("name", "sam")
    avroRecord.put("favoriteNumber", 100)
    avroRecord.put("favoriteColor", "oreo")



    schemaUpdateCache.put(null , null)


    newRecord(record, null, avroRecord)

  }

 private def makeUpdatedSchema() ={
    val builder = SchemaBuilder.struct.name("json_schema")
      .field("id", Schema.STRING_SCHEMA)
      .field("battery", Schema.STRING_SCHEMA)

    builder.build()
  }

You currently aren’t writing to binary. I do think if you would, and prepend with the correct bytes, it should play nicely with the other parts using Schema Registry.

I think registering the schema in advance is the easiest way, since SMT are stateless. But to be honest, I did not work with the combination of SMT and Schema Registry before, so there might be easier ways.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.