SQL Debezium source to topic to JDBC sink target

We are currently running a POC to prove we can replicate MS SQL Server tables from one server to another. We currently have CDC configured on the source table and have a Debezium connector configured. This data flows into a topic with no issues. Now using a JDBC sink connector, we want to read from the topic and replicate to the target. We currently have this working but we had to define EVERY COLUMN in the fields.whitelist property for this to work because it seemed to have other columns it was either looking for or trying to create. Is this normal and do we have to specify every column from the source that we want synched to target or is there a better way or a way to blacklist the fields?
Thanks for your assistance.

If I remember it right, Debezium wrappes sourced payload with an “envelope”, so there is a number of internal auxilary fields.

You should be able to manipulate the messages using different SMTs before sinking them to the target. Debezium provides a number of transformations for this, and usually you use Unwrap to pair it with JDBC Sink.

It should also be possible to use standard Flatten SMT, but its usage should probably be combined with fields.whitelist or fields.blacklist to achieve what you want to achieve.

Hope this helps!

Ok so do I remove fields before I put the data in the topic or do I remove fields before I insert into target?

Also I don’t see a fields.blacklist in the documentation…I hope I am wrong because that is exactly what I need.

Well, it is up to you when to apply the unwrapping, but enveloping of the data was implemented there for a reason actually, so I would recommend using it on the Sink connector side.

As of fields.blacklist - sorry for the confusion, what I meant was actually ReplaceField SMT, which allows you to whitelist or blacklist the fields…

1 Like

That ReplaceField did the trick!
Also had issues with the DateTime and found a setting for that too.
Thanks a bunch!

1 Like

You’re welcome! You may want to mark the answer which helped you as the “Solution” so it’s also usable for others!

One last question, my preference would be to eliminate those fields BEFORE putting them into the topic (storage reasons)…will that cause any issues? For example, the __deleted column is one that I am removing in the sink connector…if I would remove this from the source connector, would that create issues?

Technically, this is totally fine to do so - should not be a problem at all.

But from architecture perspective this may cause some issues in the future, please refer to the “Outbox pattern” to understand the potential hiccups.

I actually tried to do this and it didn’t work. The __deleted column must be something added after the fact or internal to Debezium. My ReplaceField function did not work and I still needed it for the target. This is fine though…all good. Thanks.

I do not know what exactly, but something is wrong.

SMTs should work fine on both Source and Sink connector sides. It is up to you where to use them. The field “being internal” is not relevant here, because after the field is added by the connector to a connector record, this field is always accessible for SMTs before being forwarded to the converter:

You can refer to this course, i.e. to understand Kafka Connect internals better: https://developer.confluent.io/learn-kafka/kafka-connect/how-connectors-work/

A potential reason why your ReplaceField SMT doesn’t work could be that your record structure is different to what you expect it to be at that time. I.e. you may apply some other SMT before, which already changes record structure, because SMTs are applied in a “chain” manner:

See more info here: https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/

Anyway, if you speak only about __deleted field, which you’re not interested in, why don’t you try using Unwrap SMT for this, which is intended for this and has delete.handling.mode configuration property, responsible exactly for this?

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.