I’m encountering an issue with my Kafka Streams and KSQL setup where deletions in my source PostgreSQL database are not being reflected in one of my KSQL tables.
I’m using Debezium to capture changes from a PostgreSQL database and stream them into Kafka topics. Then, I use KSQL to process the data and create derived tables, which are eventually consumed by an Elasticsearch sink connector.
I have two connectors and corresponding KSQL queries:
First Connector and Query: Handles deletions properly.
Second Connector and Query: Does not reflect deletions in the KSQL table.
First Connector and Query (Working Deletions):
{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "pgs.us-west-2.rds.amazonaws.com", "database.port": "5432", "database.user": "postgres", "database.password": "sffr", "database.dbname": "test_prod", "slot.name": "debezium_slot", "database.server.name": "cdc-using-debezium", "table.include.list": "public.agreement,public.agreement_config", "plugin.name": "pgoutput", "schema.include.list": "public", "topic.prefix": "cdc-using-debezium", "snapshot.mode": "initial", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081", "database.history.kafka.bootstrap.servers": "cdc-using-debezium-kafka:29092", "database.history.kafka.topic": "schema-changes.postgres", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "rewrite", "transforms.unwrap.add.fields": "table,lsn", "topic.creation.default.cleanup.policy": "compact", "topic.creation.default.replication.factor": 1, "topic.creation.default.partitions": 1 } }
KSQL QUERY
CREATE TABLE enriched_agreement_table WITH (
VALUE_FORMAT = ‘JSON_SR’
) AS
SELECT
a.id AS agreement_id,
LATEST_BY_OFFSET(a.display_id) AS display_id,
LATEST_BY_OFFSET(a.display_name) AS display_name,
LATEST_BY_OFFSET(CAST(ac.id AS STRING)) AS config_id,
LATEST_BY_OFFSET(ac.status) AS status,
LATEST_BY_OFFSET(ac.version) AS version,
LATEST_BY_OFFSET(ac.display_id) AS config_display_id,
LATEST_BY_OFFSET(a.agreement_config_id) AS agreement_config_id,
LATEST_BY_OFFSET(a.__DELETED) AS __DELETED
FROM
agreement_stream a
LEFT JOIN
agreement_config_stream ac
WITHIN 7 DAYS
ON
a.agreement_config_id = ac.id
GROUP BY a.id
HAVING
LATEST_BY_OFFSET(a.__DELETED) IS NULL OR LATEST_BY_OFFSET(a.__DELETED) = ‘false’
EMIT CHANGES;
In this setup, when a record in the agreement table is deleted in PostgreSQL, the deletion is correctly reflected in enriched_agreement_table.
Second Connector and Query (Issue with Deletions):
Connector Configuration:
`{
“name”: “postgres-clauses-data-connector”,
“config”: {
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“tasks.max”: “6”,
“database.hostname”: “${DATABASE_HOSTNAME}”,
“database.port”: “${DATABASE_PORT}”,
“database.user”: “${DATABASE_USER}”,
“database.password”: “${DATABASE_PASSWORD}”,
“database.dbname”: “${DATABASE_DBNAME}”,
“database.server.name”: “${DATABASE_SERVER_NAME}”,
“table.include.list”: “public.agreement,public.agreement_config,public.user,public.agreement_subtype,public.agreement_type,public.agreement_domain,public.agreement_category,public.agreement_field_data”,
“plugin.name”: “${PLUGIN_NAME}”,
“topic.prefix”: “${TOPIC_PREFIX}”,
“snapshot.mode”: “initial”,
“key.converter.schemas.enable”: “${KEY_CONVERTOR_SCHEMAS_ENABLE}”,
“value.converter.schemas.enable”: “${VALUE_CONVERTOR_SCHEMAS_ENABLE}”,
“heartbeat.interval.ms”: “10000”,
“schema.include.list”: “public”,
“key.converter”: “io.confluent.connect.avro.AvroConverter”,
“key.converter.schema.registry.url”: “${CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL}”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.url”: “${CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL}”,
“database.history.kafka.bootstrap.servers”: “${CONNECT_BOOTSTRAP_SERVERS}”,
“database.history.kafka.topic”: “schema-changes.postgres”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.unwrap.delete.handling.mode”: “rewrite”,
“transforms.unwrap.add.fields”: “table,lsn”,
“topic.creation.default.replication.factor”: 1,
“topic.creation.default.partitions”: 1,
“topic.creation.default.cleanup.policy”: “compact”
}
}
KSQLDB Query -
CREATE TABLE CDC_AGREEMENT_LIST WITH (
KAFKA_TOPIC=‘CDC_AGREEMENT_LIST’,
PARTITIONS=1,
REPLICAS=1,
VALUE_FORMAT=‘JSON_SR’
) AS
SELECT
A.ID AS AGREEMENT_ID,
LATEST_BY_OFFSET(A.DISPLAY_ID) AS DISPLAY_ID,
LATEST_BY_OFFSET(A.DISPLAY_NAME) AS DISPLAY_NAME,
LATEST_BY_OFFSET(A.STATUS) AS STATUS,
LATEST_BY_OFFSET(A.EXTRACTION_STATUS) AS EXTRACTION_STATUS,
LATEST_BY_OFFSET(A.SAP_NUMBER) AS SAP_NUMBER,
LATEST_BY_OFFSET(A.LATEST_SAP_PAYLOAD) AS LATEST_SAP_PAYLOAD,
LATEST_BY_OFFSET(A.LATEST_SAP_MESSAGE) AS LATEST_SAP_MESSAGE,
LATEST_BY_OFFSET(A.MESSAGE) AS MESSAGE,
LATEST_BY_OFFSET(A.CONTRACT_TYPE) AS CONTRACT_TYPE,
LATEST_BY_OFFSET(A.SALES_ORGANIZATION) AS SALES_ORGANIZATION,
LATEST_BY_OFFSET(A.DISTRIBUTION_CHANNEL) AS DISTRIBUTION_CHANNEL,
LATEST_BY_OFFSET(A.DIVISION) AS DIVISION,
LATEST_BY_OFFSET(A.SALES_GROUP) AS SALES_GROUP,
LATEST_BY_OFFSET(A.SALES_OFFICE) AS SALES_OFFICE,
LATEST_BY_OFFSET(A.PARENT_VERSION_ROW_ID) AS PARENT_VERSION_ROW_ID,
LATEST_BY_OFFSET(A.CREATED_BY_ID) AS CREATED_BY_ID,
LATEST_BY_OFFSET(A.CREATED_ON) AS CREATED_ON,
LATEST_BY_OFFSET(A.UPDATED_ON) AS UPDATED_ON,
LATEST_BY_OFFSET(A.IS_ARCHIVED) AS IS_ARCHIVED,
LATEST_BY_OFFSET(A.IS_FAVOURITE) AS IS_FAVOURITE,
LATEST_BY_OFFSET(A.APPROVAL_LEVEL) AS APPROVAL_LEVEL,
LATEST_BY_OFFSET(A.APPROVAL_STATUS_NAME) AS APPROVAL_STATUS_NAME,
LATEST_BY_OFFSET(A.APPROVAL_STATUS_COLOR) AS APPROVAL_STATUS_COLOR,
LATEST_BY_OFFSET(A.UPDATED_BY_ID) AS UPDATED_BY_ID,
LATEST_BY_OFFSET(A.OWNER_ID) AS OWNER_ID,
LATEST_BY_OFFSET(U.FIRSTNAME) AS USER_FIRSTNAME,
LATEST_BY_OFFSET(U.LASTNAME) AS USER_LASTNAME,
LATEST_BY_OFFSET(U.URL) AS USER_URL,
LATEST_BY_OFFSET(CAST(AC.ID AS STRING)) AS AGREEMENT_CONFIG_ID,
LATEST_BY_OFFSET(AC.DISPLAY_ID) AS AGREEMENT_CONFIG_DISPLAY_ID,
LATEST_BY_OFFSET(AC.VERSION) AS AGREEMENT_CONFIG_VERSION,
LATEST_BY_OFFSET(AC.STATUS) AS AGREEMENT_CONFIG_STATUS,
LATEST_BY_OFFSET(AC.CREATED_BY_ID) AS AGREEMENT_CONFIG_CREATED_BY_ID,
LATEST_BY_OFFSET(AC.AGREEMENT_TYPE_ID) AS AGREEMENT_CONFIG_AGREEMENT_TYPE_ID,
LATEST_BY_OFFSET(AC.AGREEMENT_SUBTYPE_ID) AS AGREEMENT_CONFIG_AGREEMENT_SUBTYPE_ID,
LATEST_BY_OFFSET(AC.FORM_TYPES) AS AGREEMENT_CONFIG_FORM_TYPES,
LATEST_BY_OFFSET(AC.SAP_CLASS) AS AGREEMENT_CONFIG_SAP_CLASS,
LATEST_BY_OFFSET(AC.CREATED_ON) AS AGREEMENT_CONFIG_CREATED_ON,
LATEST_BY_OFFSET(AC.UPDATED_ON) AS AGREEMENT_CONFIG_UPDATED_ON,
LATEST_BY_OFFSET(AC.UPDATED_BY_FULLNAME) AS AGREEMENT_CONFIG_UPDATED_BY_FULLNAME,
LATEST_BY_OFFSET(AC.EXTRACTION) AS AGREEMENT_CONFIG_EXTRACTION,
LATEST_BY_OFFSET(AC.SEND_TO_SAP) AS AGREEMENT_CONFIG_SEND_TO_SAP,
LATEST_BY_OFFSET(AC.AGREEMENT_CATEGORY_ID) AS AGREEMENT_CONFIG_AGREEMENT_CATEGORY_ID,
LATEST_BY_OFFSET(AC.AGREEMENT_DOMAIN_ID) AS AGREEMENT_CONFIG_AGREEMENT_DOMAIN_ID,
LATEST_BY_OFFSET(AST.NAME) AS AGREEMENT_SUBTYPE_NAME,
LATEST_BY_OFFSET(ATYPE.NAME) AS AGREEMENT_TYPE_NAME,
LATEST_BY_OFFSET(AD.NAME) AS AGREEMENT_DOMAIN_NAME,
LATEST_BY_OFFSET(ACAT.NAME) AS AGREEMENT_CATEGORY_NAME,
LATEST_BY_OFFSET(AFD.VALUE) AS FIELD_VALUE,
LATEST_BY_OFFSET(F.NAME) AS FIELD_NAME,
LATEST_BY_OFFSET(F.TYPE) AS FIELD_TYPE,
LATEST_BY_OFFSET(F.SAP_TECHNICAL_NAME) AS FIELD_SAP_TECHNICAL_NAME,
LATEST_BY_OFFSET(A.__DELETED) AS __DELETED
FROM AGREEMENT_STREAM A
LEFT OUTER JOIN USER_STREAM U WITHIN 30 DAYS ON A.CREATED_BY_ID = U.ID
LEFT OUTER JOIN AGREEMENT_CONFIG_STREAM AC WITHIN 30 DAYS ON A.AGREEMENT_CONFIG_ID = AC.ID
LEFT OUTER JOIN AGREEMENT_SUBTYPE__STREAM AST WITHIN 30 DAYS ON AC.AGREEMENT_SUBTYPE_ID = AST.ID
LEFT OUTER JOIN AGREEMENT_TYPE__STREAM ATYPE WITHIN 30 DAYS ON AC.AGREEMENT_TYPE_ID = ATYPE.ID
LEFT OUTER JOIN AGREEMENT_DOMAIN_STREAM AD WITHIN 30 DAYS ON AC.AGREEMENT_DOMAIN_ID = AD.ID
LEFT OUTER JOIN AGREEMENT_CATEGORY_STREAM ACAT WITHIN 30 DAYS ON AC.AGREEMENT_CATEGORY_ID = ACAT.ID
LEFT OUTER JOIN AGREEMENT_FIELD_DATA_STREAM AFD WITHIN 30 DAYS ON A.ID = AFD.AGREEMENT_ID
LEFT OUTER JOIN FIELD_STREAM F WITHIN 30 DAYS ON AFD.FIELD_ID = F.ID
GROUP BY A.ID
HAVING (LATEST_BY_OFFSET(A.__DELETED) IS NULL) OR (LATEST_BY_OFFSET(A.__DELETED) = ‘false’)
EMIT CHANGES;
In this second query, deletions in the agreement table are not being reflected in the CDC_AGREEMENT_LIST KSQL table. When a record is deleted in PostgreSQL, the __DELETED flag is not effectively removing the record from the KSQL table or updating downstream consumers like Elasticsearch.
though all the query seems and handling the same but why the second one is not handling deletion. the tables are created out of streams.
Connector Configuration: I checked that both connectors have similar configurations, including the transforms.unwrap.delete.handling.mode set to “rewrite”, and transforms.unwrap.drop.tombstones set to “false”.
Cleanup Policy: Even after setting “topic.creation.default.cleanup.policy”: “compact” in the first connector (which is working), the issue persists in the second setup.
Query Adjustments: I considered wrapping each field in a CASE statement to handle deletions explicitly, but this is impractical given the number of fields.