I want to find a way to use Single Message Transformation (SMT) in OGG KAFKA CONNECT (Oracle Golden Gate Kafka Connect) but haven’t found a way yet. Can you give me ideas about this problem? Thank you very much.
Hello there,
Here is a SMT example on Oracle source with Debezium connector. It should be similar with OGG.
Look at here for SMT document: Get started with Single Message Transforms for self-managed connectors | Confluent Documentation
{
"name": "my-source",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"advertised.host.name" : "localhost",
"database.url": "url",
"database.user" : "user",
"database.password" : "pass",
"database.pdb.name" : "pdbname",
"database.dbname" : "dbname",
"database.server.name" : "server_name",
"schema.history.internal.kafka.topic" : "table-history",
"schema.history.internal.store.only.captured.tables.ddl" : "true",
"schema.history.internal.skip.unparseable.ddl" : "true",
"log.mining.strategy" : "online_catalog",
"table.include.list" : "MY_SCHEMA.MY_TABLE",
"sanitize.field.names" : "true",
"time.precision.mode" : "connect",
"key.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable" : "true",
"value.converter.schemas.enable" : "true",
"decimal.handling.mode" : "double",
"internal.log.mining.query.logs.for.snapshot.offset" : "false",
"heartbeat.interval.ms" : "9000",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "source.errors",
"errors.deadletterqueue.topic.replication.factor": 1,
"errors.deadletterqueue.context.headers.enable": true,
"topic.prefix": "TEST",
"transforms" : "MyFilters",
"transforms.MyFilters.type": "io.confluent.connect.transforms.Filter$Value",
"transforms.MyFilters.filter.condition": "$[?(@ && @.after && @.after.STATUS in ['Open'] && @.after.COMPANY in ['Apple','Tesla'])]",
"transforms.MyFilters.filter.type": "include",
"transforms.MyFilters.missing.or.null.behavior": "exclude",
"transforms.MyFilters.predicate": "IsMyTable",
"predicates": "IsMyTable",
"predicates.IsMyTable.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.IsMyTable.pattern": ".*MY_TABLE"
}
}