Hi everyone, just trying to move data from mysql to postgres but this error message shows up in the sink :
" org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00".
I want to fix this by using connect-transformations (Filter confluent) but I don’t know how to capture exception message and/or filter condition to exclude those rows.
I have been reading JdbcSinkConnector - error on INSERT (invalid byte sequence) but I cannot capture “invalid byte sequence for encoding “UTF8”: 0x00” error message.
Can you guys give me some expertise?
Thanks a lot
Hi @rmoff . From time to now I am a big fan of yours. I have been paying a lot of attention to " Twelve Days of SMT" and the big amount of tutorials about kafka streaming in general you have created.
With my current post I have been following your indications en day 11 of SMT. I can achieve what the tutorial teaches, however trying to apply this to my problem (data from mysql source into postgres by a sink, results in an error “org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00”)
I ask for your help and expertise… which should be the condition to handle the UTF8 error?
Mi sink json is something like
““transforms”: “unwrap, filterStout”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.filterStout.type” : “io.confluent.connect.transforms.Filter$Value”,
“transforms.filterStout.filter.condition”: WHAT SHOULD BE THE CONDITION LIKE IN HERE TO CAPTURE “ERROR: invalid byte sequence for encoding “UTF8”: 0x00"”,
“transforms.filterStout.filter.type” : “exclude”,”
Good morning @rgurskis , I would really appreciate if you could tell me how exactly capture the “org.postgresql.util.PSQLException: ERROR: invalid byte sequence for encoding “UTF8”: 0x00” message to filter on value that starts with specific text and contains non-printable characters.
It was something you talked anout at “JdbcSinkConnector - error on INSERT (invalid byte sequence) - #4 by rgurskis”
The sink connector si something like this:
Mi sink json is something like
““transforms”: “unwrap, filterStout”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.filterStout.type” : “io.confluent.connect.transforms.Filter$Value”,
“transforms.filterStout.filter.condition”: WHAT SHOULD BE THE CONDITION LIKE IN HERE TO CAPTURE “ERROR: invalid byte sequence for encoding “UTF8”: 0x00"”,
“transforms.filterStout.filter.type” : “exclude”,”
I am stuck with this and I would really appreciate your help.
Thanks a lot
Hi @cssalvador,
Please check my filter condition below. Hope it helps.
"transforms.filterRecords.filter.condition": "$[?(@.exceptionmsg =~ /Failed to evaluate expression.*[^[:print:]].*/)]",
"transforms.filterRecords.filter.type": "exclude",
"transforms.filterRecords.type": "io.confluent.connect.transforms.Filter$Value",
In this case exceptionmsg is the field in the topic that might contain invalid characters
I’m looking for the text that starts with “Failed to evaluate expression”, followed by any number of characters, then anything that’s not in printable character class (^[:print:]), and then again followed by any characters. You will need to adjust it based on your situation. To play around with regex you can use online tools, such as https://regex101.com/
hi @rgurskis Yeahhhh it finally got to work!
I made it with something like this:
“transforms.filterRecords.filter.condition”: “$[?(@.description =~ /.[]./ || @.source_campaign =~ /.[]./ )]”,
However this works in the case of just fields description or source_campaigna are the only ones with this 0x00 characters.
How can i apply filter.condition to all fields in the kakfa topic without indicating the name of the field?
Something like “transforms.filterRecords.filter.condition”: "$[?(@.* …???
Well, I haven’t done it myself, but “@.*” could work. You are welcome to try
Hey man
@rgurskis really appreciate your help. It seems to work.
However when replace the non printable characters it seems it replaces the whole field not just the NON PRINTABLE characters. Is there any way to just replace the non printable characters and keep de valid ones in the same filed?
That’s how it works, if you are using “org.apache.kafka.connect.transforms.MaskField$Value” transformation. It replaces entire field value with “mask” value:
That approach was acceptable in my case. I was searching for the transformation that could replace substring in the value, but unfortunately I could not find anything. The alternative was to write my own transformation class, but I decided it’s not worth it
Really appreciate your help man. Thanks a lot for your time