ValueTokey and Drop$value

In multiple uses of SMT for JDBC Sink Connector where delete.enabled=true.
Will
First dropValueSchema using Drop$Value transformation and then
ValueToKey Transformation
work?
I am unable to delete data by using the above SMT.

I tried doing reverse Transformation
ValueToKey
and then
Drop$Value
for Tombstone messages
but I get a connector failure error that “Cannot ALTER TABLE…field is not optional…”

JDBC Sink Connector Key: String=‘a’ and Value: Avro ={x:‘a’,y:‘b’,z:‘c’}
Note: I have set all avro properties as ‘optional’:true.

Transforms: renameFields,selectFields,valueToKey,DropValue
renameFields.type: Replace$Value
rename: x:PM, y:PN
selectFields.type: Replace$Value
whiteList: PM,PN
valueToKey.type: ValueToKey
fields: PM,PN
DropValue.type:Drop$Value
schema.behavior: force_optional
delete.enabled: true
pk.mode: record_key
pk.fields: PM, PN

Connector fails with the following error: Cannot ALTER TABLE to add missing field SinkRecordField{schema=Schema{STRING}, name=‘y’, isPrimaryKey=false}, as the field is not optional and does not have a default value…
Added avro

{
"type":"record",
"name":"contact",
"namespace":"*.avro.generated",
"fields":[
{
"name":"x",
"type":{
"type":"string",
"optional":true,
"avro.java.String":"String"
}
},
{
"name":"y",
"type":{
"type":"string",
"optional":true,
"avro.java.String":"String"
}
},
{
"name":"z",
"type":{
"type":"string",
"optional":true,
"avro.java.String":"String"
}
}
]

Thanks

Please can you share your configuration, as I’m not entirely clear what you’re trying here.

You might also find this useful: Kafka Connect JDBC Sink deep-dive: Working with Primary Keys

JDBC Sink Connector Key: String=‘a’ and Value: Avro ={x:‘a’,y:‘b’,z:‘c’}

Connector Configuration

Transforms: renameFields,selectFields,valueToKey,DropValue
renameFields.type: Replace$Value
rename: x:PM, y:PN
selectFields.type: Replace$Value
whiteList: PM,PN
valueToKey.type: ValueToKey
fields: PM,PN
DropValue.type:Drop$Value
schema.behavior: force_optional
delete.enabled: true
pk.mode: record_key
pk.fields: PM, PN

Connector fails with the following error: Cannot ALTER TABLE to add missing field SinkRecordField{schema=Schema{STRING}, name=‘y’, isPrimaryKey=false}, as the field is not optional and does not have a default value…

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.