JDBC sink connector stops consuming from partition

Hello everyone, I have an issue with distributed JDBC connector stop consuming from some partitions under heavy load.

The load is about 6K per second. Value type is Protobuf, key type is string. I have 6 topic partition and 6 tasks configured. I am able to do stack dump on stuck process and consistently see the same stack trace pattern for stuck threads. I can see in the connect logs the task misses heartbeat in 10 minutes since its last committed offset and tasks start to re-balance between 3 boxes.

Below are parts of connector config and stack trace. Any help is greatly appreciated!

“tasks.max”: “6”,
“batch.size”: “500”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”:“io.confluent.connect.protobuf.ProtobufConverter”,

at org.apache.kafka.connect.data.Values$Parser.isNext(Values.java:1251)
at org.apache.kafka.connect.data.Values$Parser.canConsume(Values.java:1233)
at org.apache.kafka.connect.data.Values$Parser.canConsume(Values.java:1229)
at org.apache.kafka.connect.data.Values.canParseSingleTokenLiteral(Values.java:751)
at org.apache.kafka.connect.data.Values.parse(Values.java:791)
at org.apache.kafka.connect.data.Values.parseString(Values.java:391)
at org.apache.kafka.connect.storage.SimpleHeaderConverter.toConnectHeader(SimpleHeaderConverter.java:64)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:516)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$3(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$$Lambda$581/0x0000000800627040.call(Unknown Source)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)