Hi,
This week, I published the first version of a Kafka Connect plugin that solves a need I often had: filtering records based on the contents of its key or value.
The existing Filter (Confluent) transform seems to fill that need, but its choice of JSONPath as the query language for conditions makes it very limited. Most severely, JSONPath’s conditional syntax is limited to arrays, so it is not possible to express basic conditions like “the value of field foo
must equal "bar"
”.
This is why I created Kafka Connect JMESPath. This plugin integrates the JMESPath query language, which is way more powerful than JSONPath and makes it straightforward to express both simple and complex conditions, such as:
genre == 'Mystery'
author == 'Agatha Christie' && publishDate.year >= `1950`
-
contains(title, 'Murder')
.
Another difference that the JMESPath plugin does not define another type of transform. Instead, it is based on Kafka Connect’s general transform predicate system, which, while most commonly combined with the Filter (Apache Kafka) transform for record filtering, can be used with any transform to apply it only conditionally. This makes the JMESPath predicates more widely applicable than the Confluent Filter transform.
Example Usage:
{
"connector.class": "FileStreamSink",
"topics": "books",
"file": "/tmp/books.txt",
"tasks.max": "1",
"transforms": "Filter",
"transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
"transforms.Filter.predicate": "Old",
"predicates": "Old",
"predicates.Old.type": "de.denisw.kafka.connect.jmespath.MatchesJMESPath$Value",
"predicates.Old.query": "publishDate.year < `2000`"
}
The JMESPath plugin is open-source (Apache License 2.0) and available on GitHub. The README contains the full documentation on how to use the transform predicates. I hope it will be useful to some of you. Feedback welcome!