Table for Debezium deleted event

I have the following event inside my topic, the event is coming from Debezium that send CRUD events from Postgres. One of the thing I found is when I delete record from Postgres table the topic contain like follows:

rowtime: 2022/11/27 13:42:41.024 Z, key: {"id":997}, value: {"id":997,"firstname":"Abram","lastname":"Grebner","city":"Pueai Noi","country":"Thailand","phone":"722-308-9153","__table":"custo mer","__lsn":7107642472}, partition: 0 rowtime: 2022/11/27 13:43:27.171 Z, key: {"id":997}, value: <null>, partition: 0

id=997 with value: <null> is deleted events.I can create a kTable to grouped by data for update events as follows:

CREATE TABLE lookup as select id, latest_by_offset(firstname), latest_by_offset(lastname), latest_by_offset(city) from raw group by id emit changes;

How do I create a table to NOT include deleted data (value: <null>)

This is the connector JSON


    "name": "microservice-connector",
	"config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "xxxx",
        "database.password": "xxxx",
        "database.dbname" : "microservice",
        "database.history.kafka.bootstrap.servers": "localhost:9092",
        "database.history.kafka.topic": "serviceprefix.public.customer",        
        "topic.prefix": "serviceprefix",
		"schema.history.internal.kafka.bootstrap.servers": "localhost:9092", 
        "schema.history.internal.kafka.topic": "schema-changes.microservice",
        "schema.include.list": "public",         
		"plugin.name": "pgoutput",
        "database.history.kafka.topic": "schema-changes.microservice",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "transforms.unwrap.add.fields" : "table,lsn",
		"transforms.unwrap.delete.handling.mode" : "rewrite",
	    "transforms.unwrap.drop.tombstones" : "false",
	    "cleanup.policy" : "compact",
	    "delete.retention.ms" : "500",
	     "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode":"drop",
    "transforms.unwrap.operation.header":"true"


	}
}


This topic was automatically closed after 30 days. New replies are no longer allowed.