I’ve to used timestamp converter with SMT “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”
but message in topic unix timestamp fields not convert to format that i want
here is my example configuration of kafka connector
{
“connector.class”: “io.debezium.connector.mongodb.MongoDbConnector”,
“converters”: “timestampConverter”,
“auto.create.topics.enable”: “false”,
“topic.creation.enable”: “true”,
“topic.creation.default.partitions”: “3”,
“topic.prefix”: “topic-prefix-1”,
“topic.creation.default.replication.factor”: “3”,
“topic.creation.default.compression.type”: “gzip”,
“topic.creation.default.file.delete.delay.ms”: “432000000”,
“topic.creation.default.cleanup.policy”: “delete”,
“topic.creation.default.retention.ms”: “432000000”,
“timestampConverter.debug”: “false”,
“timestampConverter.format.date”: “YYYY-MM-dd”,
“timestampConverter.format.datetime”: “YYYY-MM-dd’T’HH:mm:ss’Z’”,
“timestampConverter.format.time”: “HH:mm:ss”,
“timestampConverter.type”: “oryanmoshe.kafka.connect.util.TimestampConverter”,
“tombstones.on.delete”: “false”,
“mongodb.connection.string” : “xxxxxx”,
“mongodb.name”: “xxxxxx”,
“mongodb.user” : “xxxxxx”,
“mongodb.password” : “xxxxxx”,
“mongodb.authSource”: “xxxxxx”,
“mongodb.connection.mode”: “xxxxxx”,
“database.include.list” : “xxxxxx”,
“name”: “topic-name”,
“collection.include.list”: “xxxxxx”,
“schema.history.internal.kafka.bootstrap.servers”: “kafka-0.kafka-headless.kafka-connector:9092”,
“schema.history.internal.kafka.topic”: “compare-payment-installment-mongo-schema”,
“transforms”: “unwrap, ReplaceField, RenameField, transform-name”,
“transforms.ReplaceField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.ReplaceField.exclude”: “source”,
“transforms.unwrap.type”: “io.debezium.connector.mongodb.transforms.ExtractNewDocumentState”,
“transforms.unwrap.collection.expand.json.payload”: “true”,
“transforms.unwrap.add.fields”: “op,ts_ms”,
“transforms.unwrap.add.fields.prefix”: “”,
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: “_id:id”,
“transforms.transform-name.type”: “org.apache.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.transform-name.field”: “ts_ms”,
“transforms.transform-name.format”: “yyyy-MM-dd’T’HH:mm:ss.SSS’Z’”,
“transforms.transform-name.target.type”: “string”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“capture.mode”: “change_streams_update_full_with_pre_image”,
“snapshot.mode”: “never”,
“capture.scope”: “database”,
“tasks.max”: “1”
}
how can i solve this problem ?
Thank you