Dear All:
I I follow this document to achieve: Add a key to data ingested through Kafka Connect using ksqlDB
The following error was reported during sink time:
CREATE SINK CONNECTOR SINK_FOO_06 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:mysql://10.192.39.xx:3306/lrt_t',
'connection.user' = 'xxxx',
'connection.password' = 'xx_Admin',
'topics' = 'postgres_cities',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://localhost:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://localhost:8081',
'auto.create' = 'true',
'pk.mode' = 'record_key',
'pk.fields' = '',
'insert.mode' = 'upsert',
'delete.enabled' = 'true'
);
ksql> list connectors;
Connector Name | Type | Class | Status
----------------------------------------------------------------------------------------------------------------
JDBC_SOURCE_POSTGRES_01 | SOURCE | io.confluent.connect.jdbc.JdbcSourceConnector | RUNNING (1/1 tasks RUNNING)
SINK_FOO_06 | SINK | io.confluent.connect.jdbc.JdbcSinkConnector | WARNING (0/1 tasks RUNNING)
----------------------------------------------------------------------------------------------------------------
ksql> describe connector SINK_FOO_06;
Name : SINK_FOO_06
Class : io.confluent.connect.jdbc.JdbcSinkConnector
Type : sink
State : RUNNING
WorkerId : 10.192.15.69:8083
Task ID | State | Error Trace
-------------------------------------------------------------------------------------------------------------------------------------------------------------
0 | FAILED | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:506)
at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:302)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:160)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:172)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:530)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
-------------------------------------------------------------------------------------------------------------------------------------------------------------