Issue in using Timetsamp Column for a Custom transform

Hey guys,
Please help me to solve this.

I am using Debezium source connector to pull data from SQL databaase.
Apart from other fields in the sql table , there is “CreatedDate” ( datetime2(7) )

Before writing to topic, i have a Custom transform(recordTimetsamp) which expects the value from “CreatedDate” column in “java.sql.Timestamp” type.

The source connector produces error as follows
ERROR apply: record: field: CreatedDate was ‘class java.lang.Long’ but should be ‘java.sql.Timestamp’ .

So I tried to used the kafka transform and then the custom transform as follows

“transforms.convertCreatedDate.target.type”: “Timestamp”,
“transforms.convertCreatedDate.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.convertCreatedDate.field”: “CreatedDate”,
“transforms.convertCreatedDate.format”: “yyyy-MM-dd HH:mm:ss.SSSSSSS”,

The source connector now produces error as follows
ERROR apply: record: field: CreatedDate was ‘class java.util.Date’ but should be ‘java.sql.Timestamp’

When I checked the schema, the type of the “CreatedDate” is as

{
“name”: “CreatedDate”,
“type”: { “connect.name”: “org.apache.kafka.connect.data.Timestamp”,
“connect.version”: 1, “logicalType”: “timestamp-millis”, “type”: “long”
}

This is my SourceConnector
{
“name”: “debzium_sourceV2”,
“config”: {
“connector.class”: “io.debezium.connector.sqlserver.SqlServerConnector”,
“tasks.max”: “1”,
“connection.url”: “------”,
poll.interval.ms": “10000”,
“database.server.name”: “RF”,
“database.hostname”: “–”,
“database.user”: “—”,
“database.dbname”: “–”,
“database.password”: “—”,
“table.include.list”:“Invitation.sent.v1”,
“database.history.kafka.bootstrap.servers”: “broker:29092”,
“database.history.kafka.topic”: “dbHistoryFi”,
“tombstones.on.delete” :“true”,
“topic.creation.default.partitions”: “1”,
“topic.creation.default.replication.factor”: “1”,
“topic.creation.default.compression.type”: “lz4”,
“topic.creation.default.cleanup.policy”: “compact”,
“topic.creation.enable”: “true”,
“errors.tolerance”:“all”,
“value.converter.schema.registry.url”: “http://schema-registry:8081”,
“key.converter.schema.registry.url”: “http://schema-registry:8081”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“transforms”: “unwrap,convertCreatedDate,recordTimestamp,dropPrefix”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”:“false”,
“transforms.unwrap.delete.handling.mode”:“rewrite”,
“transforms.unwrap.operation.header”:“true”,
“transforms.unwrap.add.headers”:“db”,
“transforms.unwrap.add.fields”:“op”,
“transforms.convertCreatedDate.target.type”: “Timestamp”,
“transforms.convertCreatedDate.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.convertCreatedDate.field”: “CreatedDate”,
“transforms.convertCreatedDate.format”: “yyyy-MM-dd HH:mm:ss.SSSSSSS”,
“transforms.dropPrefix.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.dropPrefix.regex”: “—(.*)v1”,
“transforms.dropPrefix.replacement”: “$1zzz”,
“transforms.recordTimestamp.timestamp.column.name”: “CreatedDate”,
“transforms.recordTimestamp.type”: “-------- -.kafka.connect.transform.TimestampExtractor$Value”
}
}

Which transform should be used to get the value from “CreatedDate” in java.sql.timestamp type ?
( because from the docs, i understand org.apache.kafka.connect.data.Timestamp,The corresponding Java type is a java.util.Date )

Strangely , I noticed that for same config properties, there is no error if I use JDBC source connector instead of Debezium
ie, for JDBC source connector , although the schema type is shown as

   "name": "CreatedDate",      "type": {
    "connect.name": "org.apache.kafka.connect.data.Timestamp",
    "connect.version": 1,
    "logicalType": "timestamp-millis",
    "type": "long"

I can use my cutom transform, the value is coming as “java.sql.Timestamp” type

Does JDBC internally converts the Timestamp column Type ?