Timestamp converter is not working when use with SMT

I’ve to used timestamp converter with SMT “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”
but message in topic unix timestamp fields not convert to format that i want
here is my example configuration of kafka connector

{
“connector.class”: “io.debezium.connector.mongodb.MongoDbConnector”,
“converters”: “timestampConverter”,
“auto.create.topics.enable”: “false”,
“topic.creation.enable”: “true”,
“topic.creation.default.partitions”: “3”,
“topic.prefix”: “topic-prefix-1”,
“topic.creation.default.replication.factor”: “3”,
“topic.creation.default.compression.type”: “gzip”,
“topic.creation.default.file.delete.delay.ms”: “432000000”,
“topic.creation.default.cleanup.policy”: “delete”,
“topic.creation.default.retention.ms”: “432000000”,
“timestampConverter.debug”: “false”,
“timestampConverter.format.date”: “YYYY-MM-dd”,
“timestampConverter.format.datetime”: “YYYY-MM-dd’T’HH:mm:ss’Z’”,
“timestampConverter.format.time”: “HH:mm:ss”,
“timestampConverter.type”: “oryanmoshe.kafka.connect.util.TimestampConverter”,
“tombstones.on.delete”: “false”,
“mongodb.connection.string” : “xxxxxx”,
“mongodb.name”: “xxxxxx”,
“mongodb.user” : “xxxxxx”,
“mongodb.password” : “xxxxxx”,
“mongodb.authSource”: “xxxxxx”,
“mongodb.connection.mode”: “xxxxxx”,
“database.include.list” : “xxxxxx”,
“name”: “topic-name”,
“collection.include.list”: “xxxxxx”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka-0.kafka-headless.kafka-connector:9092”,
“schema.history.internal.kafka.topic”: “compare-payment-installment-mongo-schema”,
“transforms”: “unwrap, ReplaceField, RenameField, transform-name”,
“transforms.ReplaceField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.ReplaceField.exclude”: “source”,
“transforms.unwrap.type”: “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”,
“transforms.unwrap.collection.expand.json.payload”: “true”,
“transforms.unwrap.add.fields”: “op,ts_ms”,
“transforms.unwrap.add.fields.prefix”: “”,
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: “_id:id”,
“transforms.transform-name.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.transform-name.field”: “ts_ms”,
“transforms.transform-name.format”: “yyyy-MM-dd’T’HH:mm:ss.SSS’Z’”,
“transforms.transform-name.target.type”: “string”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“capture.mode”: “change_streams_update_full_with_pre_image”,
“snapshot.mode”: “never”,
“capture.scope”: “database”,
“tasks.max”: “1”
}
how can i solve this problem ?

Thank you

Could you share the JSON that winds up in Kafka? Wondering if the unwrap SMT is prepending underscores, i.e., adding __ts_ms.

Thanks a lots for answering

I have no problem with ts_ms but some fields that in array object not convert from unix timestamp to date time format that i need

here’s is example message from kafka

{
“id”: “65b8b659746b00aa48cd63f7”,
“field1”: “0003”,
“field2”: “06”,
“field3”: “0014316”,
“field4”: “10”,
“fieldDate1”: 1707400383000,
“fieldArray”: [
{
“rowId”: 17,
“fieldDate2”: 1653504044000,
“field5”: “RHE6505000023”,
“fieldDate3”: 1653436800000,
“field6”: “06”,
“uuid”: “db16fb95-86e6-42e5-ad6c-57221ce773ad”
}
],
“op”: “u”,
“ts_ms”: “2024-02-08T06:53:03.063Z”
}

in this message fieldDate2 and fieldDate3 that in array object not convert to date format but i can convert fieldDate1 already can you have any solution ?

I don’t know of any existing SMTs that will do that; you’d need to write your own AFAIK. TimestampConverter works only on root-level fields, so it can handle ts_ms and fieldDate1 but it doesn’t drill into arrays. Flatten unnests objects but leaves arrays alone so it wouldn’t be able to pull fieldDate2 and fieldDate3 to the root level.