JDBC sink ReplaceFields not working for deletes

Hi, I’m trying to sync a SQL Server table from a topic. Some of the column names in the table are different from the topic. When I am inserting data everything works fine. But as soon as I delete a record, I get an error that one of the column names (the pre-mapped name) is missing like the rename is not being honoured. But as this should be a tombstone record, I am not sure why I am getting this error???

This is my config

{
    "name": "SqlSinkUP1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "serv34.dbo.tb_UserProfile",
        "connection.url": "jdbc:sqlserver://xxx:1433;databaseName=USS_UserSegmentation_5;user=sa;password=xxx",
        "dialect.name":" SqlServerDatabaseDialect",
        "transforms": "unwrap,RenameField",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false",
        "transforms.unwrap.delete.handling.mode":"none",
        "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
        "transforms.RenameField.renames": "UserProfileID:UserGroupId,UserProfileDescription:Name,Rank:Rank,UserGroupGuid:UserGroupGuid,IsDeleted:IsDeleted,Note:Description,IsOperatorVisible:IsVisible",
        "table.name.format": "USS_UserSegmentation_5.Segmentation.tb_UserGroup",
        "insert.mode": "upsert",
        "delete.enabled":"true",
        "auto.evolve":"true",
        "pk.fields": "",
        "pk.mode": "record_key",        
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://kafka1:8081",
        "key.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://kafka1:8081"
    }
}

This is the error : Cannot ALTER TABLE “USS_UserSegmentation_5”.“Segmentation”.“tb_UserGroup” to add missing field SinkRecordField{schema=Schema{INT32}, name=‘UserProfileID’, isPrimaryKey=false}, as the field is not optional and does not have a default value

If I run this same config, but I rename the table on the DB to have the same column names, everything works fine. Is this a bug or am I doing something wrong with mapping the column names?

{
    "name": "SqlSinkUP1",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "serv35.dbo.tb_UserProfile",
        "connection.url": "jdbc:sqlserver://xxx:1433;databaseName=USS_UserSegmentation_5;user=sa;password=xxx",
        "dialect.name":" SqlServerDatabaseDialect",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones":"false",
        "transforms.unwrap.delete.handling.mode":"none",
        "table.name.format": "USS_UserSegmentation_5.Segmentation.tb_UserGroup",
        "insert.mode": "upsert",
        "delete.enabled":"true",
        "auto.evolve":"true",
        "pk.fields": "",
        "pk.mode": "record_key",        
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://kafka1:8081",
        "key.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://kafka1:8081"
    }
}

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.