Hello. I’m trying to do a replication from my oracle database to postgre database and I’m using jdbc sink connector.
My use case is that I have only one topic for multiple tables and because of that I need to get the table name from the payload not from the topic.
I’ve tried this: “table.name.format”: “${source.schema}.${source.table}” but it’s not working. It tries to insert the records to the string itself, not its value. Is there any way to achive this or am I making a syntax error?
Thanks for any help.
The JDBC Sink Connector doesn’t support this. The ${topic}
placeholder is the only special string that gets handled here.
Ideally a single message transform would do this for you and rewrite the topic based on the payload. I’m not sure if Debezium’s content-based router would work in this scenario but it’s worth a shot. I.e., would something like this work:
transforms=route
transforms.route.type=io.debezium.transforms.ContentBasedRouter
transforms.route.language=jsr223.groovy
transforms.route.topic.expression=value.source.table
If that’s a dead end, then some options are:
- write your own content-based routing SMT
- modify the data pipeline upstream so that each table has its own topic
- if you only have a few tables, you might consider a connector per table where each one filters accordingly. It wouldn’t be the most efficient solution but would likely be easier than 1 or 2. This blog points to a couple of ways to filter.