Hello, we have a problem with connect-transforms. (connect-transformations | Confluent Hub)
We used Couchbase Source Connector and Connect Transforms to produce changed data, and we want to filter some data.
In our Couchbase, we have a bucket named cargo-bucket, and this bucket has 2 different JSON document model: CityDistrict and SupplierProperties.
Our 2 type document samples:
{
"districtId": 1059,
"cargoProviderId": 17,
"totalLimit": 111,
"active": true,
"_class": "CityDistrict",
"cityId": 10
}
{
"listingWhoPays": 0,
"returnCargoProviderId": 0,
"supplierId": 999,
"cargoProviderProperties": [
{
"whoPays": 1,
"cargoProviderId": 12
}
],
"active": true,
"_class": "SupplierProperties",
}
We want to get only SupplierProperties documents, so we applied a transform filter to connector config. We want to exclude CityDistrict document types.
{
"name": "custom-couchbase-connector",
"config": {
"name": "custom-couchbase-connector",
"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "ignoreDeletes,deserializeJson,ignoreNonSuppliers",
"transforms.ignoreDeletes.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
"transforms.ignoreNonSuppliers.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.ignoreNonSuppliers.filter.condition": "$[?(@.value.payload._class == 'CityDistrict')]",
"transforms.ignoreNonSuppliers.filter.type": "exclude",
"transforms.ignoreNonSuppliers.missing.or.null.behavior": "exclude",
"transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
"couchbase.seed.nodes": "...",
"couchbase.username": "...",
"couchbase.password": "...",
"couchbase.bucket": "cargo-bucket",
"couchbase.bootstrap.timeout": "10s",
"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
"couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
"couchbase.topic": "my-topic",
"couchbase.compression": "ENABLED",
"couchbase.persistence.polling.interval": "0",
"couchbase.flow.control.buffer": "128m"
}
}
However, this configuration cannot work successfully, all types documents are incoming to Kafka topic. Only ‘SupplierProperties’ documents should be produced to kafka topic.
Can you help us for this problem?