Conversation from Confluent Community Slack #connect channel. Copied here to make it available for others for reference.
Anggara Aji Prasetya:
Hello, how to filter what mesage must be sinked to other database by field condition? Example: I want to sink topic employee to mysql only field “education” = “bachelor”.
Neil Buesing @nbuesing :
Check out the apache connect single-message-transform (SMT), here is documentation from confluent
Anggara Aji Prasetya:
Thank you @nbuesing, I have an idea now how to do it
Robin Moffatt:
see also 🎄 Twelve Days of SMT 🎄 - Day 11: Predicate and Filter
Anggara Aji Prasetya:
Yes, I got an idea when I see this video on youtube, thank you mr @rmoff
Anggara Aji Prasetya:
I already did this SMT config
Here is the config script
"transforms" : "filterPNM",
"transforms.filterPNM.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterPNM.filter.condition" : "$[?(@.email =~ /.PNM*/)]",
"transforms.filterPNM.filter.type" : "include"
I want to filter only message with value of email begin with word “PNM”
the connector work well and I don’t have an error but it wonk sink to database, all of the message.
Oh Btw, for source connector I used debezium for sql server. is that a problem ?"
Anggara Aji Prasetya:
Oh I get it,
I have a wrong condition in filter
Instead of /.PNM*/
the correct one is /PNM.*/
and it’s worked
Neil Buesing:
@Anggara thanks for sharing your solution/outcome — this is what I love about the community — I have never done this specific SMT before, so nice to have another example to reference. If you haven’t shared this in the confluent forms, might be worth putting a post of it there (then it can be searched by others).