Problem with SMT after update kafka-connect-base version

Hello everyone!
I’m needing some tips and directions to deal with a situation.
I have a self managed kafka connect cluster where I use a custom Single Message Transform (SMT) to add a “doc” string into my avro schema.

It was working very well, but I’m getting trouble with the SMT after I try to upgrade the cp-kafka-connect-base version. I’m trying to update from 5.5.0 to 7.0.1 (I had the same problem using the 6.0.1).

Before
FROM confluentinc/cp-kafka-connect-base:5.5.0
After
FROM confluentinc/cp-kafka-connect-base:7.0.1

How my avro schema was using my custom SMT

{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Primary key with AI",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null,
	  "doc": "Primary key with AI"
    },
	
	{
      "name": "insert_at",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Order date in timestamp format",
          "connect.version": 1,
          "connect.name": "insert_at"
        }
      ],
      "default": null,
	  "doc": "Order date in timestamp format"
    }

How is it after the update in the kafka-connect-base

{
  "type": "record",
  "name": "Envelope",
  "namespace": "topic.order",
  "fields": [
    {
      "name": "order_id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Primary key with AI",
          "connect.version": 1,
          "connect.name": "order_id"
        }
      ],
      "default": null
    },
	
	{
      "name": "insert_at",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Date in timestamp format",
          "connect.version": 1,
          "connect.name": "insert_at"
        }
      ],
      "default": null
    }

The big deal for me is on the “doc” attribute.
I need the doc string stay in the same level as “name” and “type”, and not inside the type array. After the update, the SchemaBuilder behaviour has change.

Here some of my SMT code.

BaseTransformation

public R apply(R record) {
        initData(record.topic());

        Schema schema = this.retrieveSchema(record.topic());
        Struct toStruct = this.newStruct(record, schema);

        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                null,
                schema,
                toStruct,
                record.timestamp());
    }

Utils

public static void addField(SchemaBuilder builder, String name, Schema.Type type, String doc) {
    builder.field(name, new SchemaBuilder(type).name(name).version(1).optional().doc(doc).build());
}

SchemaSet

private static final String SCHEMA_NAME = "kafka.mytest";
private static final int SCHEMA_VERSION = 1;
private static final String SCHEMA_DOC = "MyTest";
	
@Override
public Schema buildSchema() {
    SchemaBuilder builder = SchemaBuilder.struct();
    builder.name(SCHEMA_NAME);
    builder.version(SCHEMA_VERSION);
    builder.doc(SCHEMA_DOC);

    addField(builder, "order_id", Schema.Type.INT64,"Primary Key with AI");
    addField(builder, "insert_at", Schema.Type.STRING,"Date in timestamp format");

    return builder.build();
}

My mysql connector config

{
  "name": "local-test-0001",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "dbhost",
    "database.port": "3306",
    "database.user": "user",
    "database.password": "pass",
    "database.server.name": "mytest",
    "database.include.list": "test",
    "table.include.list": "test.orders",
    "database.history.kafka.bootstrap.servers": "broker.intranet:9092",
    "database.history.kafka.topic": "test.schema-history",
    "snapshot.mode": "when_needed",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://myschemaregistry.intranet:8081",
    "transforms": "unwrap,customConverter",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": true,
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.customConverter.type": "kafka.custom.transform.BaseTransformation$Value"
  }
}

Notice that I use value.converter: “io.confluent.connect.avro.AvroConverter”.

I am afraid this is current behaviour of AvroConverter. It took the doc and added it as a parameter to the schema.

I would like to know if there is some config switch that would copy the schema parameter to the documentation field itself.

checkout the new configuration “discard.type.doc.default” feels like it is changing behavior of the doc setting in avro, so I am specularting that might be issue?

Wow! You are right!
I tried to set this config below in the connector creation, and the behaviour for the “doc” string is back to normal.
“value.converter.discard.type.doc.default”:true

Look this img with the diff between version 3 and 4 (current) of a field on my avro schema after I changed the config.

Now, my concern is that this setting is marked as @Deprecated. :thinking:

I just checked issues and find the fix that added this flag back.

The unit tests seems to be the best explaination

Avro field doc maps to connect schema doc in both way conversion and type doc will be dropped.

The fact that this flag was added to bring back the feature you needed really makes me wondering about the deprecated statement as well.

@nbuesing Thanks for your help!
For now, this will be enough to allow me to upgrade to 7.0.1 and solve some other issues on my kafka-connect cluster.
I was wondering: Do you recommend me to do something about it? Maybe open an issue about this for future releases?

I am not sure, this flag is new to me as well, and it seems this was added to allow for existing behavior to remain; but the deprecation does concern me. Maybe open an issue with the project asking what the deprecation means?

Ok, I’ll do it.
Thanks again!