Date Timestamp Transformation error

I have connector config as

{
    "name": "sink-data",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "table.name.format": "Product.PRODUCT_CATALOG",
        "connection.url": "jdbc:oracle:thin:@//**************:1521/Service_name1",
        "connection.user": "DBUser_Name",
        "connection.password": "DB_pwd",
        "topics": "topic_sink",
        "auto.create": "false",
        "auto.evolve": "false",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.field": "PRODUCT_ID",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",      
		"transforms": "TimestampConverter",
		"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
		"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
		"transforms.TimestampConverter.target.type": "Time",
        "transforms.TimestampConverter.target.field": "ADD_DATE"
    }
}

The data on topic is

{"schema":{"type":"struct","fields":[{"type":"int32","optional":true,"field":"PRODUCT_ID"},{"type":"int32","optional":true,"field":"PRODUCT_KEY"},{"type":"int32","optional":true,"field":"CATALOG_TYPE_KEY"},{"type":"int32","optional":true,"field":"ADD_CATALOG_KEY"},{"type":"string","optional":true,"field":"ADD_DATE"},{"type":"string","optional":true,"field":"ADD_CREATE_DATE"},{"type":"int32","optional":true,"field":"REMOVAL_CATALOG_KEY"},{"type":"string","optional":true,"field":"REMOVAL_DATE"},{"type":"string","optional":true,"field":"REMOVAL_CREATE_DATE"},{"type":"int32","optional":true,"field":"VERSION_NR"},{"type":"string","optional":true,"field":"VALID_FROM"},{"type":"string","optional":true,"field":"CATALOG_SOURCE"}],"optional":false,"name":"ReasonSinkJoin"},"payload":{"PRODUCT_ID":773,"PRODUCT_KEY":296140,"CATALOG_TYPE_KEY":1,"ADD_CATALOG_KEY":5,"ADD_DATE":"2004-12-31 00:00:00","ADD_CREATE_DATE":"2004-12-31 00:00:00","REMOVAL_CATALOG_KEY":null,"REMOVAL_DATE":null,"REMOVAL_CREATE_DATE":null,"VERSION_NR":1,"VALID_FROM":"2005-04-26 09:26:07.000000000","CATALOG_SOURCE":null}}

Error I received is

        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
        at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:547)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:498)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Schema Schema{ReasonSinkJoin:STRUCT} does not correspond to a known timestamp type format
        at org.apache.kafka.connect.transforms.TimestampConverter.timestampTypeFromSchema(TimestampConverter.java:469)
        at org.apache.kafka.connect.transforms.TimestampConverter.applyWithSchema(TimestampConverter.java:394)
        at org.apache.kafka.connect.transforms.TimestampConverter.apply(TimestampConverter.java:335)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$transformRecord$0(TransformationChain.java:70)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:183)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:217)

What is I am missing. I tried all combination I could.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.