Hi, I am using a Debezium source from SQL Server and the JDBC sink to another SQL instance. As part of the sink, I map some column names. Deletes are not working, the sink does not seem to honour the column name mappings. I get an error that it is looking for a key value of the pre-mapped column name. Below is my config and the error I get.
{
“name”: “SqlSinkUP1”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “serv24.dbo.tb_UserProfile”,
“connection.url”: “jdbc:sqlserver://######:1433;databaseName=USS_UserSegmentation_5;user=sa;password=####”,
“dialect.name”:" SqlServerDatabaseDialect",
“transforms”: “unwrap,route,RenameField,RenameKey”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”:“true”,
“transforms.unwrap.delete.handling.mode”:“none”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\.([^.]+)\.([^.]+)”,
“transforms.route.replacement”: “USS_UserSegmentation_5.Segmentation.tb_UserGroup”,
“transforms.RenameField.type”: “org.apache.kafka.connect.transforms.ReplaceField$Value”,
“transforms.RenameField.renames”: “UserProfileID:UserGroupId,UserProfileDescription:Name,OperatorID:OperatorId,Note:Description,IsOperatorVisible:IsVisible,PlayerGroupGUID:UserGroupGuid,OperatorID:OperatorId,GamingServerId:GamingServerId”,
“transforms.RenameKey.type”: “org.apache.kafka.connect.transforms.ReplaceField$Key”,
“transforms.RenameKey.renames”: “PlayerGroupGUID:UserGroupGuid,OperatorID:OperatorId,GamingServerId:GamingServerId”,
“insert.mode”: “upsert”,
“delete.enabled”:“true”,
“pk.fields”: “UserGroupGuid, OperatorId, GamingServerId”,
“pk.mode”: “record_key”,
“key.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“key.converter.schema.registry.url”: “http://kafka1:8081”,
“key.converter.schemas.enable”: “true”,
“value.converter”: “io.confluent.connect.json.JsonSchemaConverter”,
“value.converter.schemas.enable”: “true”,
“value.converter.schema.registry.url”: “http://kafka1:8081”
}
}
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE “USS_UserSegmentation_5”.“Segmentation”.“tb_UserGroup” to add missing field SinkRecordField{schema=Schema{INT32}, name=‘UserProfileID’, isPrimaryKey=false}, as the field is not optional and does not have a default value