Kafka connect Timestampconverter not working

I am using Debezium to implement CDC for SQL Server database for which I am using SQL server connector. There are certain fields in the CDC message that is produced by Debezium that I am getting as Epoch values which I want to convert to the timestamp fields. There are also some (datetime) type fields in the database which also Debezium is converting to Epoch values and I want to convert those fields also to exactly how they are in the database as timestamp values. Below is my connector configuration:

{
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "name": "test-consumer-testing-47",
    "table.include.list": "dbo.table1",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "transforms": "unwrap,timestampConverter,Reroute,PartitionRouting",

    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields": "ts_ms,op,source.table,source.ts_ms,source.commit_lsn",
    "transforms.unwrap.delete.handling.mode": "rewrite",

    "transforms.timestampConverter.field": "last_upd_dt",
    "transforms.timestampConverter.target.type": "Timestamp",
    "transforms.timestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "time.precision.mode": "connect",


    "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
    "transforms.Reroute.topic.regex": "(.*)dbo(.*)",
    "transforms.Reroute.topic.replacement": "$1consumer-cdc-topic-47",

    "transforms.PartitionRouting.type": "io.debezium.transforms.partitions.PartitionRouting",
    "transforms.PartitionRouting.partition.payload.fields": "source.commit_lsn",
    "transforms.PartitionRouting.partition.topic.num": 3,
    
    
    "schema.history.internal.kafka.topic": "consumer-schema-history-topic-47",
    "schema.history.internal.kafka.bootstrap.servers": "broker:29092",
    "schema.history.internal.store.only.captured.databases.ddl": true,
    "schema.history.internal.store.only.captured.tables.ddl": true,
    "topic.creation.default.cleanup.policy": "delete",  
    "topic.creation.default.partitions": "3",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.retention.ms": "604800000",
    "topic.creation.enable": "true",
    "topic.prefix": "consumer-47",
    "snapshot.mode": "schema_only",
    "skip.messages.without.change": true,
    "decimal.handling.mode": "double"
  },
  "name": "test-consumer-testing-47"
}

Can someone please help on this?

Thanks

Hi there,
You’re saying it’s not working: are you getting some type of error?

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.