Timestamp converter is not working when use with SMT

I’ve to used timestamp converter with SMT “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”
but message in topic unix timestamp fields not convert to format that i want
here is my example configuration of kafka connector

{
“connector.class”: “io.debezium.connector.mongodb.MongoDbConnector”,
“converters”: “timestampConverter”,
“auto.create.topics.enable”: “false”,
“topic.creation.enable”: “true”,
“topic.creation.default.partitions”: “3”,
“topic.prefix”: “topic-prefix-1”,
“topic.creation.default.replication.factor”: “3”,
“topic.creation.default.compression.type”: “gzip”,
“topic.creation.default.file.delete.delay.ms”: “432000000”,
“topic.creation.default.cleanup.policy”: “delete”,
“topic.creation.default.retention.ms”: “432000000”,
“timestampConverter.debug”: “false”,
“timestampConverter.format.date”: “YYYY-MM-dd”,
“timestampConverter.format.datetime”: “YYYY-MM-dd’T’HH:mm:ss’Z’”,
“timestampConverter.format.time”: “HH:mm:ss”,
“timestampConverter.type”: “oryanmoshe.kafka.connect.util.TimestampConverter”,
“tombstones.on.delete”: “false”,
“mongodb.connection.string” : “xxxxxx”,
“mongodb.name”: “xxxxxx”,
“mongodb.user” : “xxxxxx”,
“mongodb.password” : “xxxxxx”,
“mongodb.authSource”: “xxxxxx”,
“mongodb.connection.mode”: “xxxxxx”,
“database.include.list” : “xxxxxx”,
“name”: “topic-name”,
“collection.include.list”: “xxxxxx”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka-0.kafka-headless.kafka-connector:9092”,
“schema.history.internal.kafka.topic”: “compare-payment-installment-mongo-schema”,
“transforms”: “unwrap, ReplaceField, RenameField, transform-name”,
“transforms.ReplaceField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.ReplaceField.exclude”: “source”,
“transforms.unwrap.type”: “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”,
“transforms.unwrap.collection.expand.json.payload”: “true”,
“transforms.unwrap.add.fields”: “op,ts_ms”,
“transforms.unwrap.add.fields.prefix”: “”,
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: “_id:id”,
“transforms.transform-name.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.transform-name.field”: “ts_ms”,
“transforms.transform-name.format”: “yyyy-MM-dd’T’HH:mm:ss.SSS’Z’”,
“transforms.transform-name.target.type”: “string”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“capture.mode”: “change_streams_update_full_with_pre_image”,
“snapshot.mode”: “never”,
“capture.scope”: “database”,
“tasks.max”: “1”
}
how can i solve this problem ?

Thank you

Could you share the JSON that winds up in Kafka? Wondering if the unwrap SMT is prepending underscores, i.e., adding __ts_ms.

Thanks a lots for answering

I have no problem with ts_ms but some fields that in array object not convert from unix timestamp to date time format that i need

here’s is example message from kafka

{
“id”: “65b8b659746b00aa48cd63f7”,
“field1”: “0003”,
“field2”: “06”,
“field3”: “0014316”,
“field4”: “10”,
“fieldDate1”: 1707400383000,
“fieldArray”: [
{
“rowId”: 17,
“fieldDate2”: 1653504044000,
“field5”: “RHE6505000023”,
“fieldDate3”: 1653436800000,
“field6”: “06”,
“uuid”: “db16fb95-86e6-42e5-ad6c-57221ce773ad”
}
],
“op”: “u”,
“ts_ms”: “2024-02-08T06:53:03.063Z”
}

in this message fieldDate2 and fieldDate3 that in array object not convert to date format but i can convert fieldDate1 already can you have any solution ?

I don’t know of any existing SMTs that will do that; you’d need to write your own AFAIK. TimestampConverter works only on root-level fields, so it can handle ts_ms and fieldDate1 but it doesn’t drill into arrays. Flatten unnests objects but leaves arrays alone so it wouldn’t be able to pull fieldDate2 and fieldDate3 to the root level.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.