SchemaAndValue to kafka connect struct not working for avro converter

HI there:
We are now using JsonConverter to convert the data from String to ShemaAndValue in SMT. It is a nested Schema and we can convert it to SchemaAndValue successfully. But when we use the schemaandvalue to build the schema and struct for SMT while using the value converter as avro converter. The Schema and value does not work. But when we use the schemabuilder to build the shcema and struct. It works fine. We compare both, the class, the content are same. But it just raise errors. Can anyone give us a hand. Thanks a
Here is our example:

    val input = """{
      "schema": {
         "type": "struct", "optional": false, "version": 1, "fields": [

         { "field": "id", "type": "string", "optional": true },

         { "field": "batters","type": "array", "items": { "type" : "struct", "fields":
                                  [{ "field": "bid", "type": "string" }, { "field": "type", "type": "string" }] }}

         ] },
         "payload": {
         "id": "1",
         "batters": [{ "bid": "1001", "type":"parise" }]
         }
                     }""".getBytes()


    converter.configure(Collections.emptyMap(), false)

    val schemaAndValue = converter.toConnectData("foo", input)

The method newRecord in SMT

 newRecord(record, schemaAndValue.schema,schemaAndValue .value )

The errors:

 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:297)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:323)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:247)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            | 	at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic array_nested_12 :
connect            | 	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91)
connect            | 	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:297)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            | 	... 11 more
connect            | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
connect            | Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault
connect            | 	at org.apache.avro.Schema$Names.put(Schema.java:1511)
connect            | 	at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:782)
connect            | 	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:943)
connect            | 	at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1102)
connect            | 	at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:971)
connect            | 	at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:955)
connect            | 	at org.apache.avro.Schemas.toString(Schemas.java:46)
connect            | 	at org.apache.avro.Schemas.toString(Schemas.java:30)
connect            | 	at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:140)
connect            | 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
connect            | 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
connect            | 	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
connect            | 	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
connect            | 	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:143)
connect            | 	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
connect            | 	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:297)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            | 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:297)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:323)
connect            | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:247)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            | 	at java.lang.Thread.run(Thread.java:748)

we try to compare the different ways to create the struct and schema(shceamabuilder and SchemaAndValue)


    val arrayvalueSchema = SchemaBuilder.struct().name("com.example.User")
      .field("bid", Schema.STRING_SCHEMA)
      .field("type", Schema.STRING_SCHEMA).build()

    val arrayschema = SchemaBuilder.array(
      arrayvalueSchema
    ).build()

    val builder = SchemaBuilder.struct.name("json_schema")
      .field("id",Schema.STRING_SCHEMA)
      .field("batters",arrayschema)


    val oldschema =  builder.build()


    val arrayelementstruct1 = new Struct(arrayvalueSchema)
      .put("bid", "001" )
      .put("type", "test1")

    val arrayelementstruct2 = new Struct(arrayvalueSchema)
      .put("bid", "002" )
      .put("type", "test2")

    val array = util.Arrays.asList(arrayelementstruct1,arrayelementstruct2)


    val oldstruct = new Struct(oldschema)
      .put("id","01")
      .put("batters",array)



    // ----------------------------------------------------------------------

    val converter = new JsonConverter()

    val input = """{
      "schema": {
         "type": "struct", "optional": false, "version": 1, "fields": [

         { "field": "id", "type": "string", "optional": true },

         { "field": "batters","type": "array", "items": { "type" : "struct", "fields":
                                  [{ "field": "bid", "type": "string" }, { "field": "type", "type": "string" }] }}

         ] },
         "payload": {
         "id": "1",
         "batters": [{ "bid": "1001", "type":"parise" }]
         }
                     }""".getBytes()


    converter.configure(Collections.emptyMap(), false)

    val schemaAndValue = converter.toConnectData("foo", input)

    val jsShcmea = converter.asJsonSchema(schemaAndValue.schema())

    // val struct = requireStruct(schemaAndValue.value(),"testing")

    val schema = schemaAndValue.schema()

The results are same


Many thanks again