I have the following event inside my topic, the event is coming from Debezium that send CRUD events from Postgres. One of the thing I found is when I delete record from Postgres table the topic contain like follows:
rowtime: 2022/11/27 13:42:41.024 Z, key: {"id":997}, value: {"id":997,"firstname":"Abram","lastname":"Grebner","city":"Pueai Noi","country":"Thailand","phone":"722-308-9153","__table":"custo mer","__lsn":7107642472}, partition: 0 rowtime: 2022/11/27 13:43:27.171 Z, key: {"id":997}, value: <null>, partition: 0
id=997
with value: <null>
is deleted events.I can create a kTable to grouped by data for update events as follows:
CREATE TABLE lookup as select id, latest_by_offset(firstname), latest_by_offset(lastname), latest_by_offset(city) from raw group by id emit changes;
How do I create a table to NOT include deleted data (value: <null>
)
This is the connector JSON
"name": "microservice-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "xxxx",
"database.password": "xxxx",
"database.dbname" : "microservice",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "serviceprefix.public.customer",
"topic.prefix": "serviceprefix",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"schema.history.internal.kafka.topic": "schema-changes.microservice",
"schema.include.list": "public",
"plugin.name": "pgoutput",
"database.history.kafka.topic": "schema-changes.microservice",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms.unwrap.add.fields" : "table,lsn",
"transforms.unwrap.delete.handling.mode" : "rewrite",
"transforms.unwrap.drop.tombstones" : "false",
"cleanup.policy" : "compact",
"delete.retention.ms" : "500",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"drop",
"transforms.unwrap.operation.header":"true"
}
}