JDBC sink delete not working with mapped column names

Hi, I am using a Debezium source from SQL Server and the JDBC sink to another SQL instance. As part of the sink, I map some column names. Deletes are not working, the sink does not seem to honour the column name mappings. I get an error that it is looking for a key value of the pre-mapped column name. Below is my config and the error I get.

{
“name”: “SqlSinkUP1”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “serv24.dbo.tb_UserProfile”,
“connection.url”: “jdbc:sqlserver://######:1433;databaseName=USS_UserSegmentation_5;user=sa;password=####”,
“dialect.name”:" SqlServerDatabaseDialect",
“transforms”: “unwrap,route,RenameField,RenameKey”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”:“true”,
“transforms.unwrap.delete.handling.mode”:“none”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\.([^.]+)\.([^.]+)”,
“transforms.route.replacement”: “USS_UserSegmentation_5.Segmentation.tb_UserGroup”,
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: “UserProfileID:UserGroupId,UserProfileDescription:Name,OperatorID:OperatorId,Note:Description,IsOperatorVisible:IsVisible,PlayerGroupGUID:UserGroupGuid,OperatorID:OperatorId,GamingServerId:GamingServerId”,
“transforms.RenameKey.type”: “org.apache.kafka.connect.transforms.ReplaceField$Key”,
“transforms.RenameKey.renames”: “PlayerGroupGUID:UserGroupGuid,OperatorID:OperatorId,GamingServerId:GamingServerId”,
“insert.mode”: “upsert”,
“delete.enabled”:“true”,
“pk.fields”: “UserGroupGuid, OperatorId, GamingServerId”,
“pk.mode”: “record_key”,
“key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“key.converter.schema.registry.url”: “http://kafka1:8081”,
“key.converter.schemas.enable”: “true”,
“value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schemas.enable”: “true”,
“value.converter.schema.registry.url”: “http://kafka1:8081
}
}

io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE “USS_UserSegmentation_5”.“Segmentation”.“tb_UserGroup” to add missing field SinkRecordField{schema=Schema{INT32}, name=‘UserProfileID’, isPrimaryKey=false}, as the field is not optional and does not have a default value

Seems like you partially solved the problem here. Please try to keep threads together if it’s the same problem

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.