Enum in Kafka sink connector

I am working on a MSSQL sink connector which simply saves data from a topic to a table. My topic is bound to an Avro schema. For one of the field, I am using enum.

Now when my connector saves data to table, it saves the enum name. But for some reason, I want it to save both Enum numeric value as well as Enum name.

e.g. if I have a below enum for car models, then connector saved Hatchback/Sedan/SUV. I want to save the values,1/2/3 as well. Both values can go to different columns.

enum Model
{
    Hatchback: 1,
    Sedan: 2,
    SUV: 3
}

How can we achieve this?

Are you using a MSSQL sink connector or writing one? I’m asking this because I’m not aware of any official MSSQL sink connector. Unless you’re using the JDBC sink connector?

Sorry for not mentioning it explicitly, I mean the Jdbc sink connector.

No worries. Try setting the property value.converter.enhanced.avro.schema.support to true. According to the documentation:

Enable enhanced Avro schema support in the Avro Converter. When set to true , this property preserves Avro schema package information and Enums when going from Avro schema to Connect schema. This information is added back in when going from Connect schema to Avro schema.

:page_facing_up: Source: Using Kafka Connect with Schema Registry | Confluent Documentation

Here is an example:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<SCHEMA_REGISTRY_URL>
key.converter.enhanced.avro.schema.support=true
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=<SCHEMA_REGISTRY_URL>
value.converter.enhanced.avro.schema.support=true

I have enabled this setting but connector still saves only enum text in the table. Any idea where can I find the additional info saved by connector when this setting is true?

Hi @ankit.11tyagi : I doubt the enum type is supported based on the data conversion table provided here: https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html

Enum type seems to be supported upto some extent as my connector saved the enum text in varchar columns in table. It is the numeric value associated with the enum which is not saving.

@ankit.11tyagi : My apologies, Ankit, if I am going too far. The supported JDBC conversion (Understanding data type conversions - JDBC Driver for SQL Server | Microsoft Docs) does not mention enum. Hence, I assume the “unknown” behaviour by the connector.

I think the limitation here is that the internal structure used by Kafka Connect to represent records known as Struct doesn’t support enums with additional values — hence why it tries to capture the enum value and simply force a toString() on it. You can check this behavior by taking a look in the Schema class that govern how each Struct is modeled.

It seems to me then that you have two options:

  1. Using a SMT in the sink connector: perhaps you can try to change the Struct schema on-the-fly by applying a custom SMT that depicts the enum value into two new fields. Chris Matta from Confluent wrote a very nice blog about this.

  2. Model the enum as a complex type: if you have the chance you can change the inbound Avro to model the entity as a complex type with two distinct columns instead of using an enum. I think this option may cause some friction at first — but in the long run seems to be the right thing to do.

Thanks,

— @riferrei

1 Like

Thanks @riferrei for the information. You are right the option #2 will give more control. However, I will have to check with the team managing the topic schema if they can implement this change as this does not seem to be backward compatible.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.