Hello everyone!
I’m needing some tips and directions to deal with a situation.
I have a self managed kafka connect cluster where I use a custom Single Message Transform (SMT) to add a “doc” string into my avro schema.
It was working very well, but I’m getting trouble with the SMT after I try to upgrade the cp-kafka-connect-base version. I’m trying to update from 5.5.0 to 7.0.1 (I had the same problem using the 6.0.1).
Before
FROM confluentinc/cp-kafka-connect-base:5.5.0
After
FROM confluentinc/cp-kafka-connect-base:7.0.1
How my avro schema was using my custom SMT
{
"type": "record",
"name": "Envelope",
"namespace": "topic.order",
"fields": [
{
"name": "order_id",
"type": [
"null",
{
"type": "long",
"connect.doc": "Primary key with AI",
"connect.version": 1,
"connect.name": "order_id"
}
],
"default": null,
"doc": "Primary key with AI"
},
{
"name": "insert_at",
"type": [
"null",
{
"type": "string",
"connect.doc": "Order date in timestamp format",
"connect.version": 1,
"connect.name": "insert_at"
}
],
"default": null,
"doc": "Order date in timestamp format"
}
How is it after the update in the kafka-connect-base
{
"type": "record",
"name": "Envelope",
"namespace": "topic.order",
"fields": [
{
"name": "order_id",
"type": [
"null",
{
"type": "long",
"connect.doc": "Primary key with AI",
"connect.version": 1,
"connect.name": "order_id"
}
],
"default": null
},
{
"name": "insert_at",
"type": [
"null",
{
"type": "string",
"connect.doc": "Date in timestamp format",
"connect.version": 1,
"connect.name": "insert_at"
}
],
"default": null
}
The big deal for me is on the “doc” attribute.
I need the doc string stay in the same level as “name” and “type”, and not inside the type array. After the update, the SchemaBuilder behaviour has change.
Here some of my SMT code.
BaseTransformation
public R apply(R record) {
initData(record.topic());
Schema schema = this.retrieveSchema(record.topic());
Struct toStruct = this.newStruct(record, schema);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
null,
schema,
toStruct,
record.timestamp());
}
Utils
public static void addField(SchemaBuilder builder, String name, Schema.Type type, String doc) {
builder.field(name, new SchemaBuilder(type).name(name).version(1).optional().doc(doc).build());
}
SchemaSet
private static final String SCHEMA_NAME = "kafka.mytest";
private static final int SCHEMA_VERSION = 1;
private static final String SCHEMA_DOC = "MyTest";
@Override
public Schema buildSchema() {
SchemaBuilder builder = SchemaBuilder.struct();
builder.name(SCHEMA_NAME);
builder.version(SCHEMA_VERSION);
builder.doc(SCHEMA_DOC);
addField(builder, "order_id", Schema.Type.INT64,"Primary Key with AI");
addField(builder, "insert_at", Schema.Type.STRING,"Date in timestamp format");
return builder.build();
}
My mysql connector config
{
"name": "local-test-0001",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "dbhost",
"database.port": "3306",
"database.user": "user",
"database.password": "pass",
"database.server.name": "mytest",
"database.include.list": "test",
"table.include.list": "test.orders",
"database.history.kafka.bootstrap.servers": "broker.intranet:9092",
"database.history.kafka.topic": "test.schema-history",
"snapshot.mode": "when_needed",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": false,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://myschemaregistry.intranet:8081",
"transforms": "unwrap,customConverter",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": true,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.customConverter.type": "kafka.custom.transform.BaseTransformation$Value"
}
}
Notice that I use value.converter: “io.confluent.connect.avro.AvroConverter”.
I am afraid this is current behaviour of AvroConverter. It took the doc and added it as a parameter to the schema.
I would like to know if there is some config switch that would copy the schema parameter to the documentation field itself.