SMT - Cast field to a different type AND preserve the original value

What if I have a field that I want to both cast to a different type AND preserve the original value?

Use case:
I have an Avro key which includes some nullable fields. I want to pipe these messages into Postgres using the JDBC Sink Connector using the key as primary key. I want to do that so that I can use “delete.enabled” configuration for the Sink Connector. All of that works but obliges me to make the nullable fields not-nullable. (Can’t include nullable columns in a primary key)

So one idea that came to mind was:

  • keep the fields nullable strings in Avro
  • cast the fields to non-nullable strings in Sink connector config (possible?)

But even if that were possible (I haven’t found a coalesce option for SMT so far), that’d have the undesired side effect of obscuring which messages had non-null pre-cast values vs which messages happened to have values matching the cast’s output.
Is there a way to copy the nullable string to another field in the message value, while also casting it to a not-nullable field within the key?

(I know I could achieve this via ksqlDB or Kafka Streams, piping the data into a secondary topic. I’m just curious if anyone knows of a way to achieve this via off-the-shelf SMT in Kafka Connect)

I’m not sure I quite follow. Can you illustrate with an example? How would you want to treat messages with a partial null key field?

also w.r.t.:

is that something enforced by the sink connector?

It appears the not-null requirement is imposed by Postgres:

org.postgresql.util.PSQLException: ERROR: null value in column \"bed_number\" violates not-null constraint

Here’s an example of what I’d like to do.

Suppose I have messages w/ Avro keys/values whose representations in JSON are roughly:

    {key: {k1: 'abc', k2: null}, value: {v1: 'def'}}
    {key: {k1: 'abc', k2: 'foo'}, value: {v1: 'def'}}
    {key: {k1: 'abc', k2: ''}, value: {v1: 'def'}}

I’d like the corresponding records, when piped into Postgres to take this form (where k1 and k2 are used as primary key):

    k1: 'abc', k2: '', v1: 'def', original_k2: null
    k1: 'abc', k2: 'foo', v1: 'def', original_k2: 'foo'
    k1: 'abc', k2: '', v1: 'def', original_k2: ''

^ k2 is coalesced to an empty string if null, to satisfy Postgres
^ original_k2 is added so I can tell whether k2 was null or not, pre-coalesce.

Another idea would be to insert the data as:

    k1: 'abc', k2: '',  k3_had_null_k2: true, v1: 'def'
    k1: 'abc', k2: 'foo', k3_had_null_k2: false, v1: 'def'
    k1: 'abc', k2: '', k3_had_null_k2: false, v1: 'def'

^ omit original_k2, instead add a boolean to the primary key, dynamically assigned based on whether k2 (pre-coalesce) was originally a null

This latter could be achieved if insertField supported predicates based on key/value fields (instead of just topic name and metadata).

Thanks, that makes more sense now with the example.

I think given the bespoke logic that you’re looking at here will require either a customer Single Message Transform, or a Kafka Streams/ksqlDB stream processing application.

There is the Filter SMT from Confluent (not the same as the Apache Kafka Filter) which can be used to conditionally apply Single Message Transform, but only to drop/include a record, not conditionally apply another SMT.

1 Like