Kafka JDBC Sink connector(oracle) - nullable pk field in upsert mode

Hello everyone,

I am facing an issue with nullable fields in primary key configurations while using upsert mode in the JDBC Sink connector. I am syncing around 80 tables to an Oracle 19c database, and 12 of them have unique indexes with some nullable fields. These unique indexes are used as primary keys (pk.fields), but there’s a problem with updating existing records when a nullable field (e.g., COMPANYID__) is NULL.

The issue lies in the generated MERGE statement by the JDBC Sink, as shown below (problematic column: COMPANYID__):

) incoming 
ON (
    "TIA_NAME_TELEPHONE"."NAME_ID_NO" = incoming."NAME_ID_NO" AND 
    "TIA_NAME_TELEPHONE"."TELEPHONE_TYPE" = incoming."TELEPHONE_TYPE" AND 
    "TIA_NAME_TELEPHONE"."COMPANYID__" = incoming."COMPANYID__"
)

When a nullable column is NULL, the equality condition (=) fails, and updates don’t work as expected. The ideal solution would involve modifying the condition to handle NULL values like this:

) incoming
ON (
    "TIA_NAME_TELEPHONE"."NAME_ID_NO" = incoming."NAME_ID_NO" AND
    "TIA_NAME_TELEPHONE"."TELEPHONE_TYPE" = incoming."TELEPHONE_TYPE" AND
    "TIA_NAME_TELEPHONE"."COMPANYID__" IS NULL
) 

Unfortunately, this change cannot be applied without modifying the JDBC Sink code itself a lot, as the connector builds the MERGE statement using only the column names.

I attempted a workaround by introducing IS NULL checks for each field:

) incoming ON (
    ("TIA_NAME_TELEPHONE"."NAME_ID_NO" = incoming."NAME_ID_NO" OR "TIA_NAME_TELEPHONE"."NAME_ID_NO" IS NULL) AND
    ("TIA_NAME_TELEPHONE"."TELEPHONE_TYPE" = incoming."TELEPHONE_TYPE" OR "TIA_NAME_TELEPHONE"."TELEPHONE_TYPE" IS NULL) AND
    ("TIA_NAME_TELEPHONE"."COMPANYID__" = incoming."COMPANYID__" OR "TIA_NAME_TELEPHONE"."COMPANYID__" IS NULL)
) 

While this solves the issue functionally, it severely degrades performance, making the sink unusable (upserts take about 1 minute per operation).

Is there any solution to handle nullable fields in primary keys during upsert operations without requiring changes to the source database or deleting indexes in the target database?

This is my current sink config.

{
    "name": "TIA_TEST_jdbc_sink_stream_pk_fields_TIA.NAME_TELEPHONE",
    "config": {
        "auto.create": "false",
        "auto.evolve": "true",
        "batch.size": "2000",
        "connection.password": "***",
        "connection.url": "***",
        "connection.user": "***",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "delete.enabled": "true",
        "errors.deadletterqueue.context.headers.enable": "true",
        "errors.deadletterqueue.topic.name": "dlq.debezium.TIA_TEST_jdbc_sink_stream_pk_fields_TIA.NAME_TELEPHONE",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "errors.max.retries": "10",
        "errors.retry.delay.max.ms": "900000",
        "errors.tolerance": "all",
        "insert.mode": "upsert",
        "pk.fields": "NAME_ID_NO,TELEPHONE_TYPE,COMPANYID__",
        "pk.mode": "record_key",
        "table.name.format": "TIA_${topic}",
        "tasks.max": "2",
        "topics": "debezium.TIA_TEST.TIA.NAME_TELEPHONE",
        "transforms": "unwrap,route",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$4",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    }    
}

Environment Details

  • Conflu platform version: 7.5
  • Conflu JDBC Sink: 10.5.9
  • OJDBC Driver: 10-19.25.0.0
  • Target DB: Oracle 19c

Any suggestions or insights would be greatly appreciated!

Thanks.
Michael