Problem with converting Key from String to Avro format

Hi,

We are currently facing an issue in serializing a key for a new topic.

In the process we are mapping the resulting key of a join (a String) to an Avro key using .selectKey():

...
KStream
   .toTable()
   .leftJoin(...)
   .toStream()
   .selectKey((key, value) -> 
      AvroKey
         .newBuilder()
         .setKey(key)
         .build()
   )
...

However we are seeing an error during serialization telling that the actual key is still a String. The error messages is as follows:

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic eventOutputJoin-KSTREAM-TOTABLE-0000000004-repartition. A serializer (key: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: io.confluent.connect.avro.Key). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))).

How is this possible and how can we convert our String key to an Avro key?

Thank you.

Hi @sleepy.rain8974

From looking at the stacktrace, there’s an issue writing to a repartition topic
eventOutputJoin-KSTREAM-TOTABLE-0000000004-repartition.

From the name, it looks like something upstream in your topology set the needsRepartition flag (using an operation that could change the key causes a repartition even if the key remains the same).

The KStream.toTable method adds a repartition node to the topology if this internal flag is true. Did you set/change the key on the stream for the conversion to a KTable?

Hi @bbejeck ,

Thanks for your reply!

A bit more context; for the leftjoin that we are doing both the left- and righthand side have String keys but we would like the output/result to have an Avro schema key.

It is expected that the operation we provide creates an Avrokey but the error that it can not apply the serializer (GeneriaAvroSerializer) to the data type String.

That is strange because we apply the serializer to the Avro part and not to a String; they are only the keys of the input topics.

Hopefully this gives more context to the problem that we are facing.

Do you understand what I mean?

Thanks again!

Avro keys with single fields are generally not recommended for reasons related to internal binary comparisons for joins (Utf8 Avro types aren’t compared the same as java.lang.String).

Is there a specific reason not to use use String Serde for your keys? There’s no requirement to use the same serde for both keys and values.

There is a reason to use an avro key with a single field because our client has defined the schema registry like that and we are required to use it. We solved the issue though by using the following approach:

KStream
   .toTable()
   .leftJoin(...)
   .toStream()
   .selectKey((key, value) -> 
      AvroKey
         .newBuilder()
         .setAvroKeyCustomSpecificKey(key)
         .build()
   )

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.