The jdbcSinkConector version 2.4.0 library is used, and kafka connect was built in a Docker environment with the confluentinc/cp-kafka-connect:latest image.
“io.debezium.connector.postgresql.PostgresConnector”, a topic for addition, modification, and deletion has been issued normally through the source connector, but addition and modification within the sink connector are normal, but deletion rows are not reflected.
I need help.
The sink connector currently in use is as follows.
“delete.enabled”: “true”, set but does not work.
{
“name”: “postgres-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“connection.url”: “jdbc:postgresql://172.19.0.5:5432/postgres”,
“connection.user”: “postgres”,
“connection.password”: “1234”,
“slot.name”: “source”,
“topics.regex”: “cnnc.*”,
“insert.mode”: “upsert”,
“auto.create”: “true”,
“delete.enabled”: “true”,
“auto.evolve”: “true”,
“dialect.name”: “PostgreSqlDatabaseDialect”,
“pk.mode”: “record_key”,
“pk.fields”: “opr_no”,
“batch.size”: 1,
“transforms”: “unwrap,route,TimestampConverter”,
“transforms.TimestampConverter.type”: “com.github.howareyouo.kafka.connect.transforms.TimestampConverter$Value”,
“transforms.TimestampConverter.format”: “yyyy-MM-dd’T’HH:mm:ss’Z’”,
“transforms.TimestampConverter.target.type”: “Timestamp”,
“transforms.TimestampConverter.fields”: “udt_dt”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\.([^.]+)\.([^.]+)”,
“transforms.route.replacement”: “$3”
}
}
When deleting a row, two topics are issued.
- It is confirmed that the op item in the payload is marked as d and delete, and at the same time, an additional topic containing an empty payload is issued with the same ID.
1.1 delete row data topic
key
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“opr_no”}],“optional”:false,“name”:“cnnc.public.opr.Key”},“payload”:{“opr_no”:“OP166”}}
payload
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “opr_no”
},
{
“type”: “int64”,
“optional”: false,
“name”: “io.debezium.time.Timestamp”,
“version”: 1,
“field”: “udt_dt”
}
],
“optional”: true,
“name”: “cnnc.public.opr.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “opr_no”
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"field": "udt_dt"
}
],
"optional": true,
"name": "cnnc.public.opr.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "cnnc.public.opr.Envelope",
"version": 1
},
“payload”: {
“before”: {
“opr_no”: “OP166”,
“udt_dt”: 0
},
“after”: null,
“source”: {
“version”: “2.4.0.Final”,
“connector”: “postgresql”,
“name”: “cnnc”,
“ts_ms”: 1699262695390,
“snapshot”: “false”,
“db”: “postgres”,
“sequence”: “["22498848","22498904"]”,
“schema”: “public”,
“table”: “opr”,
“txId”: 845,
“lsn”: 22498904,
“xmin”: null
},
“op”: “d”,
“ts_ms”: 1699262695609,
“transaction”: null
}
}
1.2 empty payload topic
key
{“schema”:{“type”:“struct”,“fields”:[{“type”:“string”,“optional”:false,“field”:“opr_no”}],“optional”:false,“name”:“cnnc.public.opr.Key”},“payload”:{“opr_no”:“OP166”}}
payload
is empty