OK, so I manage to solve it with some workaround. Instead of “fixing” the string values that are causing error I’m replacing it with constant value which is close enough. The field value in question (exceptionmsg) is an exception message. We don’t need exact message as long the type of exception is known.
The solution involves two sink connectors:
connector 1 - original
This connector is handling all records except the ones that are causing the problem. I’m using regex expression to filter on value that starts with specific text and contains non-printable characters. Part of config looks like this:
...
"transforms": "..., filterRecords",
"transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]",
"transforms.filterRecords.filter.type": "exclude",
"transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",
...
connector 2 - handle issues
This connector is reading the same topic as the original connector, but is processing only the records that might cause the problems on write.
Config:
...
"transforms": "... , filterRecords, exceptionmsgReplace",
"transforms.exceptionmsgReplace.fields": "exceptionmsg",
"transforms.exceptionmsgReplace.replacement": "Failed to evaluate expression (full exception truncated by replication, see source database)",
"transforms.exceptionmsgReplace.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]",
"transforms.filterRecords.filter.type": "include",
"transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",
...
To filter on field value I’m using Confluent’s Filter transformation. It is not available out of the box in Kafka Connect, but can be installed from here: connect-transformations | Confluent Hub