Single Message Transform - Predicate & Filter

Conversation from Confluent Community Slack #connect channel. Copied here to make it available for others for reference.

Anggara Aji Prasetya:
Hello, how to filter what mesage must be sinked to other database by field condition? Example: I want to sink topic employee to mysql only field “education” = “bachelor”.

Neil Buesing @nbuesing :
Check out the apache connect single-message-transform (SMT), here is documentation from confluent

Anggara Aji Prasetya:
Thank you @nbuesing, I have an idea now how to do it :smiley:

Robin Moffatt:
see also 🎄 Twelve Days of SMT 🎄 - Day 11: Predicate and Filter

Anggara Aji Prasetya:
Yes, I got an idea when I see this video on youtube, thank you mr @rmoff

Anggara Aji Prasetya:
I already did this SMT config
Here is the config script

"transforms" : "filterPNM",
"transforms.filterPNM.type" : "io.confluent.connect.transforms.Filter$Value",
"transforms.filterPNM.filter.condition" : "$[?(@.email =~ /.PNM*/)]",
"transforms.filterPNM.filter.type" : "include"

I want to filter only message with value of email begin with word “PNM”
the connector work well and I don’t have an error but it wonk sink to database, all of the message.
Oh Btw, for source connector I used debezium for sql server. is that a problem ?"

Anggara Aji Prasetya:
Oh I get it,
I have a wrong condition in filter
Instead of /.PNM*/ the correct one is /PNM.*/ and it’s worked :smiley:

Neil Buesing:
@Anggara thanks for sharing your solution/outcome — this is what I love about the community — I have never done this specific SMT before, so nice to have another example to reference. If you haven’t shared this in the confluent forms, might be worth putting a post of it there (then it can be searched by others).

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.