Apply filter to messages from kafka topic before sink connector picks up

I have a topic "RInvite.V1 " with the following columns which is populated from a persistent query.
colmuns :
sent_ID
RStatusId
also i have jdbc sink connector that inserts to “RstatusHistory” table from the topic.
i want a filter to be applied before insert to “RstatusHistory” table,
ie, do not insert new row if the table has already a message with same sent_ID and RStatusId
how can i stop some messages coming from topics?

Looks like you are speaking about completely different things. First, your topic is in ksqlDB section, but then your question is about Kafka Connect. Then, you ask how to filter records before they are picked up by the connector, but then you ask to base your filter on the data already written to your target database.

So, all in all, it is not possible to answer your question, because it is not quite clear what you wanted to achieve.

Still, a few hints:

  • If you need to filter records really before Sink connector picks up the data, then you can do this with Kafka Streams app or ksqlDB query - as a result you will produce another topic, which will contain needed records. Only thing is that neither Kafka Streams app, nor ksqlDB know a thing about your target database tables, and this is impossible to base your filter on the data you have there.
  • If you need to filter records “on-the-fly” with a Sink connector, then you need a filtering SMT (Single Message Transformation). Not sure if any standard would cover your case, probably you’ll need to implement something special for your needs. Filtering based on database data is also not possible here.
  • If you really need to filter records based on target database table data, then you will have to implement this inside your database. For this you could implement a staging table, where all data is written, and then a target table with filtered data. Writing to the target table and required filtration may be implemented using i.e. a database trigger.

Hope this helps!

This topic was automatically closed after 30 days. New replies are no longer allowed.