Kafka jdbc-sink-connector table.name.format

Hello. I’m trying to do a replication from my oracle database to postgre database and I’m using jdbc sink connector.

My use case is that I have only one topic for multiple tables and because of that I need to get the table name from the payload not from the topic.

I’ve tried this: “table.name.format”: “${source.schema}.${source.table}” but it’s not working. It tries to insert the records to the string itself, not its value. Is there any way to achive this or am I making a syntax error?

Thanks for any help.

The JDBC Sink Connector doesn’t support this. The ${topic} placeholder is the only special string that gets handled here.

Ideally a single message transform would do this for you and rewrite the topic based on the payload. I’m not sure if Debezium’s content-based router would work in this scenario but it’s worth a shot. I.e., would something like this work:

transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.source.table

If that’s a dead end, then some options are:

  1. write your own content-based routing SMT
  2. modify the data pipeline upstream so that each table has its own topic
  3. if you only have a few tables, you might consider a connector per table where each one filters accordingly. It wouldn’t be the most efficient solution but would likely be easier than 1 or 2. This blog points to a couple of ways to filter.