I have connector config as
{
"name": "sink-data",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"table.name.format": "Product.PRODUCT_CATALOG",
"connection.url": "jdbc:oracle:thin:@//**************:1521/Service_name1",
"connection.user": "DBUser_Name",
"connection.password": "DB_pwd",
"topics": "topic_sink",
"auto.create": "false",
"auto.evolve": "false",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.field": "PRODUCT_ID",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.TimestampConverter.target.type": "Time",
"transforms.TimestampConverter.target.field": "ADD_DATE"
}
}
The data on topic is
{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"PRODUCT_ID"},{"type":"int32","optional":true,"field":"PRODUCT_KEY"},{"type":"int32","optional":true,"field":"CATALOG_TYPE_KEY"},{"type":"int32","optional":true,"field":"ADD_CATALOG_KEY"},{"type":"string","optional":true,"field":"ADD_DATE"},{"type":"string","optional":true,"field":"ADD_CREATE_DATE"},{"type":"int32","optional":true,"field":"REMOVAL_CATALOG_KEY"},{"type":"string","optional":true,"field":"REMOVAL_DATE"},{"type":"string","optional":true,"field":"REMOVAL_CREATE_DATE"},{"type":"int32","optional":true,"field":"VERSION_NR"},{"type":"string","optional":true,"field":"VALID_FROM"},{"type":"string","optional":true,"field":"CATALOG_SOURCE"}],"optional":false,"name":"ReasonSinkJoin"},"payload":{"PRODUCT_ID":773,"PRODUCT_KEY":296140,"CATALOG_TYPE_KEY":1,"ADD_CATALOG_KEY":5,"ADD_DATE":"2004-12-31 00:00:00","ADD_CREATE_DATE":"2004-12-31 00:00:00","REMOVAL_CATALOG_KEY":null,"REMOVAL_DATE":null,"REMOVAL_CREATE_DATE":null,"VERSION_NR":1,"VALID_FROM":"2005-04-26 09:26:07.000000000","CATALOG_SOURCE":null}}
Error I received is
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:547)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Schema Schema{ReasonSinkJoin:STRUCT} does not correspond to a known timestamp type format
at org.apache.kafka.connect.transforms.TimestampConverter.timestampTypeFromSchema(TimestampConverter.java:469)
at org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:394)
at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:335)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)
What is I am missing. I tried all combination I could.