How to lower case in JDBC connect

I’m using the JDBC Sink Connector to stream data from Oracle to PostgreSQL.

In Oracle, both the table and column names are in UPPERCASE, whereas in PostgreSQL they are in LOWERCASE. As a result, the Kafka topics are created using UPPERCASE naming (e.g., main.ADMCMP). However, during insertion into PostgreSQL, I encounter the following error:

Error during write operation. Attempting rollback. (io.confluent.connect.jdbc.sink.JdbcDbWriter:89)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Table “SMRT-PROD”.“main”.“ADMCMP” is missing and auto-creation is disabled.

Is there any built-in functionality or available “.jar” plugin that can help automatically handle this uppercase/lowercase mismatch between Kafka topics and PostgreSQL table/column names?

Below is the configuration I’m currently using for the JDBC Sink Connector:

name=postgresql-sink-connector
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

Topics (case-sensitive; assume uppercase in Kafka, lowercase in PostgreSQL)

topics=ADMCMP,ADMCMPPROF

PostgreSQL connection

connection.url=jdbc:postgresql://psql-erp-stage-01:5432/SMRT-PROD
connection.user=gslpgadmin
connection.password=gmpl

Disable automatic schema creation and evolution

auto.create=false
auto.evolve=false

Insert settings

insert.mode=INSERT
pk.mode=none

Ensure no quoting so lowercase PostgreSQL table/column names match

quote.sql.identifiers=ALWAYS

Map topic to schema.table

table.name.format=${topic}
transforms.ReplaceTopic.regex

Unwrap Debezium message (strip metadata)

transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=drop

Converters

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true

did you try this

Yes, I’ve already gone through the official documentation thoroughly, but unfortunately, it didn’t provide a solution for this issue.

I also attempted to use the SMT “com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase”, but since I’m using Kafka 3.7, it appears to be incompatible or not supported, and therefore didn’t work.

I tried setting “quote.sql.identifiers=ALWAYS” in the JDBC Sink Connector, but that caused the generated SQL statement to use a format like “smrt-prod.main.ADMCMP”, which resulted in a cross-database reference error on PostgreSQL.

At this point, I’m stuck with the uppercase/lowercase mismatch issue and unable to proceed with the data insertion into PostgreSQL.

What error did you hit?

I’m not seeing any other existing publicly available SMTs to change topic case. There is also the ChangeCase SMT for keys and values as well as an analogous CaseTransform SMT, though the latter is based on an even older version of Kafka Connect so if you are hitting compatibility issues with the first one you’ll likely not have success with this latter one.

I’m encountering the following error:

“Unable to instantiate TimestampNow: Failed to invoke plugin constructor (org.apache.kafka.connect.runtime.isolation.ReflectionScanner:139)
java.lang.reflect.InvocationTargetException”

Based on our discussion, it seems that we can’t directly convert topic names to lowercase within the JDBC Connector itself.

So, what are the alternative approaches we can use to convert topic names to lowercase before they’re consumed by the sink connector?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.