When kafka connect sink, deleted rows are not reflected in the destination db

The jdbcSinkConector version 2.4.0 library is used, and kafka connect was built in a Docker environment with the confluentinc/cp-kafka-connect:latest image.

“io.debezium.connector.postgresql.PostgresConnector”, a topic for addition, modification, and deletion has been issued normally through the source connector, but addition and modification within the sink connector are normal, but deletion rows are not reflected.
I need help.

The sink connector currently in use is as follows.
“delete.enabled”: “true”, set but does not work.
{
“name”: “postgres-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:postgresql://172.19.0.5:5432/postgres”,
“connection.user”: “postgres”,
“connection.password”: “1234”,
“slot.name”: “source”,
“topics.regex”: “cnnc.*”,
“insert.mode”: “upsert”,
“auto.create”: “true”,
“delete.enabled”: “true”,
“auto.evolve”: “true”,
“dialect.name”: “PostgreSqlDatabaseDialect”,
“pk.mode”: “record_key”,
“pk.fields”: “opr_no”,
“batch.size”: 1,
“transforms”: “unwrap,route,TimestampConverter”,
“transforms.TimestampConverter.type”: “com.github.howareyouo.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.TimestampConverter.format”: “yyyy-MM-dd’T’HH:mm:ss’Z’”,
“transforms.TimestampConverter.target.type”: “Timestamp”,
“transforms.TimestampConverter.fields”: “udt_dt”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\.([^.]+)\.([^.]+)”,
“transforms.route.replacement”: “$3”
}
}

When deleting a row, two topics are issued.

  1. It is confirmed that the op item in the payload is marked as d and delete, and at the same time, an additional topic containing an empty payload is issued with the same ID.

1.1 delete row data topic
key
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“opr_no”}],“optional”:false,“name”:“cnnc.public.opr.Key”},“payload”:{“opr_no”:“OP166”}}

payload
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “opr_no”
},
{
“type”: “int64”,
“optional”: false,
“name”: “io.debezium.time.Timestamp”,
“version”: 1,
“field”: “udt_dt”
}
],
“optional”: true,
“name”: “cnnc.public.opr.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “opr_no”
},

      {
        "type": "int64",
        "optional": false,
        "name": "io.debezium.time.Timestamp",
        "version": 1,
        "field": "udt_dt"
      }
    ],
    "optional": true,
    "name": "cnnc.public.opr.Value",
    "field": "after"
  },
  {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "version"
      },
      {
        "type": "string",
        "optional": false,
        "field": "connector"
      },
      {
        "type": "string",
        "optional": false,
        "field": "name"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "ts_ms"
      },
      {
        "type": "string",
        "optional": true,
        "name": "io.debezium.data.Enum",
        "version": 1,
        "parameters": {
          "allowed": "true,last,false,incremental"
        },
        "default": "false",
        "field": "snapshot"
      },
      {
        "type": "string",
        "optional": false,
        "field": "db"
      },
      {
        "type": "string",
        "optional": true,
        "field": "sequence"
      },
      {
        "type": "string",
        "optional": false,
        "field": "schema"
      },
      {
        "type": "string",
        "optional": false,
        "field": "table"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "txId"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "lsn"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "xmin"
      }
    ],
    "optional": false,
    "name": "io.debezium.connector.postgresql.Source",
    "field": "source"
  },
  {
    "type": "string",
    "optional": false,
    "field": "op"
  },
  {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
  },
  {
    "type": "struct",
    "fields": [
      {
        "type": "string",
        "optional": false,
        "field": "id"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "total_order"
      },
      {
        "type": "int64",
        "optional": false,
        "field": "data_collection_order"
      }
    ],
    "optional": true,
    "name": "event.block",
    "version": 1,
    "field": "transaction"
  }
],
"optional": false,
"name": "cnnc.public.opr.Envelope",
"version": 1

},
“payload”: {
“before”: {
“opr_no”: “OP166”,
“udt_dt”: 0
},
“after”: null,
“source”: {
“version”: “2.4.0.Final”,
“connector”: “postgresql”,
“name”: “cnnc”,
“ts_ms”: 1699262695390,
“snapshot”: “false”,
“db”: “postgres”,
“sequence”: “["22498848","22498904"]”,
“schema”: “public”,
“table”: “opr”,
“txId”: 845,
“lsn”: 22498904,
“xmin”: null
},
“op”: “d”,
“ts_ms”: 1699262695609,
“transaction”: null
}
}

1.2 empty payload topic
key
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“opr_no”}],“optional”:false,“name”:“cnnc.public.opr.Key”},“payload”:{“opr_no”:“OP166”}}

payload
is empty

Hi @yoon,

By default the io.debezium.transforms.ExtractNewRecordState SMT drops both of these records. The relevant docs page is here. There are config options for how to handle both the the tombstone and the delete change event. E.g., you can convert the change event to a tombstone with "transforms.unwrap.delete.handling.mode": "none" or add "transforms.unwrap.drop.tombstones": "false" so that it propagates the tombstone. Just doing the latter seems simpler.

HTH,
Dave

@dtroiano
Thank you. With your help I found the answer.

Now you can do CDC for postgresql.

@dtroiano

I have one more question.
I wanted to change the DDL via “auto.evolve”: “true”, but it doesn’t work.

A column was added or the type was changed in the source postgres, but it is not reflected in the sink.

Need another option?

With additional questions.
If there are multiple columns using timestamps such as “reg_dt, udt_dt,bgn_dt, end_dt”, is it impossible to process them through regular expressions such as “*_dt”?

“transforms.TimestampConverter.type”: “com.github.howareyouo.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.TimestampConverter.format”: “yyyy-MM-dd’T’HH:mm:ss’Z’”,
“transforms.TimestampConverter.target.type”: “Timestamp”,
“transforms.TimestampConverter.fields”: “reg_dt,udt_dt,bgn_dt,end_dt”,

Hi @yoon,

I wanted to change the DDL via “auto.evolve”: “true”, but it doesn’t work.

Check this section of the docs on auto-evolution to see if any of the limitations apply to your case. Data type changes aren’t attempted, nor is the addition of primary keys. If adding a non-primary key column isn’t working, do you see any helpful connector logging?

If there are multiple columns using timestamps such as “reg_dt, udt_dt,bgn_dt, end_dt”, is it impossible to process them through regular expressions such as “*_dt”?

That converter only supports comma-delimited fields, not regex. You would need to add regex support if you need more flexibility than a list. I’m not sure if that repo is still maintained so you could either attempt to add it there, or start fresh based off of Apache Kafka’s.

HTH,
Dave

Thank you for the reply.

Could you please reply to this post as well?
I need your help

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.