Confluent Kafka SMT Filter Problem

Hello, we have a problem with connect-transforms. (connect-transformations | Confluent Hub)

We used Couchbase Source Connector and Connect Transforms to produce changed data, and we want to filter some data.

In our Couchbase, we have a bucket named cargo-bucket, and this bucket has 2 different JSON document model: CityDistrict and SupplierProperties.

Our 2 type document samples:

{
  "districtId": 1059,
  "cargoProviderId": 17,
  "totalLimit": 111,
  "active": true,
  "_class": "CityDistrict",
  "cityId": 10
}


{
  "listingWhoPays": 0,
  "returnCargoProviderId": 0,
  "supplierId": 999,
  "cargoProviderProperties": [
    {
      "whoPays": 1,
      "cargoProviderId": 12
    }
  ],
  "active": true,
  "_class": "SupplierProperties",
}

We want to get only SupplierProperties documents, so we applied a transform filter to connector config. We want to exclude CityDistrict document types.

{
	"name": "custom-couchbase-connector",
	"config": {
		"name": "custom-couchbase-connector",
		"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
		"tasks.max": "1",
		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
		"value.converter": "org.apache.kafka.connect.json.JsonConverter",
		"transforms": "ignoreDeletes,deserializeJson,ignoreNonSuppliers",
		"transforms.ignoreDeletes.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
		"transforms.ignoreNonSuppliers.type": "io.confluent.connect.transforms.Filter$Value",
		"transforms.ignoreNonSuppliers.filter.condition": "$[?(@.value.payload._class == 'CityDistrict')]",
		"transforms.ignoreNonSuppliers.filter.type": "exclude",
		"transforms.ignoreNonSuppliers.missing.or.null.behavior": "exclude",
		"transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
		"couchbase.seed.nodes": "...",
		"couchbase.username": "...",
		"couchbase.password": "...",
		"couchbase.bucket": "cargo-bucket",
		"couchbase.bootstrap.timeout": "10s",
		"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
		"couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
		"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
		"couchbase.topic": "my-topic",
		"couchbase.compression": "ENABLED",
		"couchbase.persistence.polling.interval": "0",
		"couchbase.flow.control.buffer": "128m"
	}
}

However, this configuration cannot work successfully, all types documents are incoming to Kafka topic. Only ‘SupplierProperties’ documents should be produced to kafka topic.

Can you help us for this problem?

Hi @kaantas , welcome to the forum!

I’m not so familiar with the Couchbase connector or transforms, but one thing I notice is the order in which you’re applying the SMT:

"transforms": "ignoreDeletes,deserializeJson,ignoreNonSuppliers"

Perhaps if you try deserializeJson before the Filter SMT is applied with ignoreNonSuppliers, i.e.

"transforms": "ignoreDeletes,ignoreNonSuppliers,deserializeJson",
1 Like

Hi @rmoff, thank you very much to reply.

We try to change order of transforms, then an error is occurred on connector. Status is Failed.

Stack trace is:

"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Only Map objects supported in absence of schema for [filtering record without schema], found: [B\n\tat io.confluent.connect.transforms.util.Requirements.requireMap(Requirements.java:30)\n\tat io.confluent.connect.transforms.Filter.shouldDrop(Filter.java:215)\n\tat io.confluent.connect.transforms.Filter.apply(Filter.java:158)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 11 more\n"

Looking back on what I’ve written, I think I was talking nonsense :confused:

You had the order correct to start with, i.e.

"transforms": "ignoreDeletes,deserializeJson,ignoreNonSuppliers",

Reading through the Couchbase connector docs, could you try this, using the com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler:

"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.DefaultSchemaSourceHandler",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"

This assumes you have a Schema Registry running on that URL and port.

I’m not familiar with this connector, and am somewhat grasping at straws. You may want to try posting over at the Couchbase forum too.

Hi @rmoff, thank you again for your solution. Finally, we solved the problem.

We removed .value.payload from filter condition and changed the filter like:
"transforms.ignoreNonSuppliers.filter.condition": "$[?(@._class != 'CityDistrict')]",

Because, transforms.Filter$Value is at Value’s level. So, value.payload is not necessary.

Also, we changed filter type like:
"transforms.ignoreNonSuppliers.filter.type": "include",

In the last situation, our config is

{
	"name": "custom-couchbase-connector",
	"config": {
		"name": "custom-couchbase-connector",
		"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
		"tasks.max": "1",
		"key.converter": "org.apache.kafka.connect.storage.StringConverter",
		"value.converter": "org.apache.kafka.connect.json.JsonConverter",
		"transforms": "ignoreDeletes,deserializeJson,ignoreNonSuppliers",
		"transforms.ignoreDeletes.type": "com.couchbase.connect.kafka.transform.DropIfNullValue",
		"transforms.ignoreNonSuppliers.type": "io.confluent.connect.transforms.Filter$Value",
		"transforms.ignoreNonSuppliers.filter.condition": "$[?(@._class != 'CityDistrict')]",
		"transforms.ignoreNonSuppliers.filter.type": "include",
		"transforms.ignoreNonSuppliers.missing.or.null.behavior": "exclude",
		"transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
		"couchbase.seed.nodes": "...",
		"couchbase.username": "...",
		"couchbase.password": "...",
		"couchbase.bucket": "cargo-bucket",
		"couchbase.bootstrap.timeout": "10s",
		"couchbase.source.handler": "com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler",
		"couchbase.event.filter": "com.couchbase.connect.kafka.filter.AllPassFilter",
		"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
		"couchbase.topic": "my-topic",
		"couchbase.compression": "ENABLED",
		"couchbase.persistence.polling.interval": "0",
		"couchbase.flow.control.buffer": "128m"
	}
}

Thanks to Confluent for help again :slightly_smiling_face:

2 Likes

Glad it worked!

Celebrate

3 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.