Can I apply filter on fields of nested JSON record. For example CDC data from debezium MySQL connector comes with structure of {“before”:{…},“after”:{…}} I want to compare these updates over specific fields. My data is JSON not AVRO, no schema registered. Is it possible to apply this filter? If yes how can I address those internal nested JSON fields?
Thanks in advance.
A filter()
can take any Predicate
so it’s just up to you to write the code. If you have a KStream<Void, Json>
and apply a filter, the Predicate
will get a Json
value and thus you can access any nested field with no change. And you can apply arbitrary complex evaluation to eventually return true
or false
to keep or drop the record.
stream.filter((k, v) -> {
// v will be the Json
boolean returnValue;
// write code to set `returnValue`
return returnValue;
});
Yes @mjsax Thanks for reply. My question is :
{
"before":{
"id":1,
"name":"abc"
},
"after":{
"id":1,
"name":"xyz"
}
}
and now if name is modified in after field I do not want to filter it but fields other than name are getting modified I want to filter that record.
Can I do that? as nested fields name are same. or do we need to flatten it first?
Thank you.
Not sure what you mean by if name is modified in after field
? Do you mean if after.name
is different to before.name
? If so, than yes, you can just write code like this (not real syntax; just illustrative):
if (v.before.name.equals(v.after.name)) return true; // name is not modified, keep the record
if (!v.before.otherField1.equals(v.after.otherField1) return false; // otherField1 was modified, drop the record
// repeat the same line for all other fields
return true; // nothing was modified, keep the record