Should/Will Kafka Connect support schema evolution using Avro 1.10.X enum defaults?

We are using Kafka Connect to pipe data from Kafka topics into parquet files on HDFS and/or S3. Our Kafka data is serialised using Avro (schema registry).

At the moment we use 2 connectors for this

  • HDFS2 Sink Connector
  • Amazon S3 Sink Connector

Up until recently we would set “schema.compatibility” to “NONE” in our connectors, but this had the pain-full side-effect that during deploys of our application we got huge file explosions (lots of very small files in HDFS / S3). This happens because kafka connect will create a new file every time the schema id of a log changes compared to the previous log. During deploys of our applications (which can take up to 20 minutes) multiple logs of mixed schema ids are inevitable and given the huge amounts of logs file explosions of up to a million files weren’t uncommon.

To solve this problem we switched all our connectors “schema.compatibility” to “BACKWARD”, which should only create a new file when a higher schema id is detected and deserialise all logs with the latest known schema id. Which should only create one new file during deploys.

An example connector config:

{
    "name": "hdfs-Project_Test_Subject",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "partition.duration.ms": "86400000",
        "topics.dir": "/user/kafka/Project",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "flush.size": "1000000",
        "schema.compatibility": "BACKWARD",
        "topics": "Project_Test_Subject",
        "timezone": "UTC",
        "hdfs.url": "hdfs://hadoophost:9000",
        "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
        "rotate.interval.ms": "7200000",
        "locale": "C",
        "hadoop.home": "/opt/hadoop",
        "logs.dir": "/user/kafka/_logs",
        "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "name": "hdfs-Project_Test_Subject",
        "errors.tolerance": "all",
        "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
        "path.format": "YYYY/MM/dd"
    }
}

However, we have lots of enum fields in our data records (avro schemas) to which subjects get added frequently, and this is causing issues with our Kafka Connect connectors FAILING with these kinds of errors:

Schema parameters not equal. 
source parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, io.confluent.connect.avro.Enum=Ablo.testfield, io.confluent.connect.avro.Enum.null=null, io.confluent.connect.avro.Enum.value1=value1, io.confluent.connect.avro.Enum.value2=value2}
and target parameters:
{io.confluent.connect.avro.enum.default.testfield=null, io.confluent.connect.avro.Enum=Ablo.testfield, io.confluent.connect.avro.Enum.null=null, io.confluent.connect.avro.Enum.value1=value1, io.confluent.connect.avro.Enum.value2=value2, io.confluent.connect.avro.Enum.value3=value3}

Since Avro 1.10.X specification, enum values support defaults, which makes schema evolution possible even when adding subjects (values) to an enum. When testing our schemas for compatibility using the Schema Registry api we always get “is_compatible” => true. So schema evolution should in theorie not be a problem.

The error above is thrown in the SchemaProjector class which is part of Kafka Connect, more specifically in the function checkMaybeCompatible(). It seems like this function is not respecting the Avro 1.10.X specification for enum schema evolution, and I’m not sure if it is meant to respect it? Is this something that is known / being worked on? As we currently don’t have any other routes to fix this issue and returning to the “NONE” schema compatibility is no options considering the file explosions, we’re kinda stuck here.

2 Likes

@GuusDeGraeve thank you for the well formed question. Let me see if I can help find you an answer.

1 Like

@rick do you have any update for us regarding this topic? If I need to provide some more information, please let me know.

Sorry @GuusDeGraeve I haven’t been able to raise an answer for you. I will continue to try.

Hi @GuusDeGraeve

I submitted the PR to Avro (back in 2018!) to have enum defaults added to the standard. I’ll take a look under the hood to see if I can figure out what’s going on in Kafka Connect.

Do you by chance happen to have a java test of some sort that showcases this failure? If you do, it would be very useful for me to shortcut straight to debugging. In the meantime, I’ll see if I can figure it out.

Thanks!

One more question. Outside of this enum problem, It’s not clear to me why you’re getting a file explosion. In a given topic I’d expect to see a single schema, plus its evolved descendants. Is it that you have many interwoven evolutions, such that, for example, schema 1, 2, and 3 are all present and intertwined in the event stream?

Or, are you using a single stream with multiple different event types events within it? I suspect the former, but I just wanted to confirm with you.

Hi @abellemare, thank you for your time!

To answer your questions:

I’m afraid I don’t have a quick test that showcases the failure but I will try to go in a little more detail about our situation, which should also answer your second question.

This is a simplified overview of our data pipeline from application to AWS S3:
Application => Logstash (serialises to avro) => Kafka => Kafka Connect (S3 Sink Connector) => S3

Our Avro schemas are being created within the application by our data engineers, they are also pushed to the Schema Registry at that point. When a schema for a certain record changes and our application gets deployed we get the file explosions. This is because we run on Kubernetes and use rolling deploys, which basically means we can have logs coming into the same Kafka topic serialised with different schema ids while the deploy is running.

For example, a Kafka topic could have 10 messages in this order:

- {"id": "2918", "name": "Foo"} // serialised with Avro schema ID 11
- {"id": "2918", "name": "Foo", "gender": "Male"} // serialised with Avro schema ID 12
- {"id": "8372", "name": "Bar"} // serialised with Avro schema ID 11
- {"id": "6537", "name": "Bow"} // serialised with Avro schema ID 11
- {"id": "8372", "name": "Bar", "gender": "Male"} // serialised with Avro schema ID 12
- {"id": "6537", "name": "Bow", "gender": "Male"} // serialised with Avro schema ID 12
- {"id": "6537", "name": "Bow"} // serialised with Avro schema ID 11
- {"id": "3726", "name": "Bunny", "gender": "Male"} // serialised with Avro schema ID 12
- {"id": "9826", "name": "Lolly", "gender": "Female"} // serialised with Avro schema ID 12
- {"id": "1827", "name": "Cat", "gender": "Female"} // serialised with Avro schema ID 12

When our Kafka Connect S3 Sink Connector is configured using compatibility mode “NONE”, this sequence of messages would generate exactly 5 new parquet files on S3 as the incoming schema ID changes 5 times. Given the huge amount of messages that go through our data stack, this quickly results in huge file explosions, which is why we would like to start using compatibility mode “BACKWARD”, which should only create 1 new parquet file for this sequence as it will always use the newest schema id to deserialise the incoming messages.

However, when changing the compatibility to “BACKWARD” we bumped into the issue of this forum topic. We have quite some ENUM fields in our schemas, and frequently symbols/values are added to those enums. We have CI/CD checks that check the compatibility of a new schema with the previous version before we push them to the registry and start using them. When adding symbols/values to an enum field, all these compatibility checks pass without a problem, which means schema evolution should be possible using the “BACKWARD” mode. Unfortunately it seems Kafka Connect is doing it’s own internal checks for schema parameters within the checkMaybeCompatible() function in the SchemaProjector class as I’ve posted in my earlier post. This check is responsible for the failure we are getting in our connector:

Schema parameters not equal. 
source parameters: 
{io.confluent.connect.avro.enum.default.testfield=null, io.confluent.connect.avro.Enum=Ablo.testfield, io.confluent.connect.avro.Enum.null=null, io.confluent.connect.avro.Enum.value1=value1, io.confluent.connect.avro.Enum.value2=value2}
and target parameters:
{io.confluent.connect.avro.enum.default.testfield=null, io.confluent.connect.avro.Enum=Ablo.testfield, io.confluent.connect.avro.Enum.null=null, io.confluent.connect.avro.Enum.value1=value1, io.confluent.connect.avro.Enum.value2=value2, io.confluent.connect.avro.Enum.value3=value3}

If I understand correctly, Avro spec v1.10.X and above should support default values for enums, which should make schema evolution / projection possible even if enum symbols get added.

This works correctly in this case for example, lets say :

- {"id": "2726", "type": "Smile"} // Avro schema ID 11 (known enum values: smile, emotional)
- {"id": "181", "type": "Sad"} // Avro schema ID 12 (known enum values: smile, emotional, sad)
- {"id": "12", "type": "Emotional"} // Avro schema ID 12
- {"id": "18", "type": "Smile"} // Avro schema ID 12
- {"id": "26", "type": "Sad"} // Avro schema ID 12

Whenever a log comes in serialised with a schema id older than the last known schema id however, we get the error above. This sequence of messages would cause that error for example:

- {"id": "2726", "type": "Smile"} // Avro schema ID 11 (known enum values: smile, emotional)
- {"id": "181", "type": "Sad"} // Avro schema ID 12 (known enum values: smile, emotional, sad)
- {"id": "12", "type": "Emotional"} /* Avro schema ID 11 */ <== FAILURE
- {"id": "18", "type": "Smile"} // Avro schema ID 12
- {"id": "26", "type": "Sad"} // Avro schema ID 12

I hope this will help you understand the issue we’re having. It might be that we’re doing something wrong, or using a bad practice, but I don’t see any easy way for us to solve this ourselves.

Thanks again for the help.

Thanks for the great description @GuusDeGraeve!

I can see that in checkMaybeCompatible() it is indeed doing an equi-comparison on the parameters, which appears to be the enum parameters:

} else if (!Objects.equals(source.parameters(), target.parameters())) {
            throw new SchemaProjectorException("Schema parameters not equal. source parameters: " + source.parameters() + " and target parameters: " + target.parameters());
} 

So any narrowing or expansion of any enum, for any schema, would indeed fail. It’s not clear to me why this must be the case.

Before we file a bug ticket, I have one more question: Are you able to write the data to S3 in AVRO format instead of PARQUET?

I am wondering if there is an issue converting the AVRO enums to PARQUET, which is why this particular check is failing. I am wondering if this results in a failure because PARQUET does not have the same default enum standard that AVRO does. I admit that I am not an expert in the KC code, but if we can validate that this same exception occurs if you’re trying to write the files directly as AVRO format, that may narrow down the scope of the issue.

Thanks again for your time @abellemare, I’ll have to test this and get back to you on that. I’ll keep you posted.

1 Like

Hi @abellemare, after testing this I can confirm the connector also fails when writing to AVRO format. It fails with the exact same error.

Hi @GuusDeGraeve
Thanks for checking that out! I think we should file a bug report now, because this definitely sounds like a bug to me. I would expect it to be able to write it out in AVRO format at the very least, given that they are compatible. I think this is a limitation on Kafka Connect at the moment, and it seems like it might take a bit of refactoring to get this to work correctly.

Can you open a bug for this? https://issues.apache.org/jira/projects/KAFKA

Was there ever a bug report opened for this issue? We’re currently going through this exact problem, and it would be great to see if there has been any developments since this thread was active.

By the way, current Confluent Platform releases are using Avro 1.11.0 already.

Perhaps you can reproduce the issue in a unit test?

I am encountering the same issue with a protobuf schema (as noted, the issue in checkMaybeCompatible is not Avro-specific).

It might be fixable by making a change in schema-registry/schema-converter/src/main/java/io/confluent/connect/schema/ConnectEnum.java at 623215154f1c78ee1ba023c670d1235b656ea69a · confluentinc/schema-registry · GitHub, but it’s not clear to me.

While this should be fixed upstream in Kafka itself, I think it’s possible to patch here in kafka-connect-storage-common.

upstream in Kafka itself

You’d modify the respective converter / serializer in schema registry or connector project. Upstream Kafka doesn’t have Avro/Protobuf support.

We’re having exact same issue. Is there any solution yet? Thanks!